[PATCH v7 5/5] erofs-utils: mkfs: introduce inner-file multi-threaded compression

Gao Xiang hsiangkao at linux.alibaba.com
Fri Mar 15 12:10:19 AEDT 2024


From: Yifan Zhao <zhaoyifan at sjtu.edu.cn>

Currently, the creation of EROFS compressed image creation is
single-threaded, which suffers from performance issues. This patch
attempts to address it by compressing the large file in parallel.

Specifically, each input file larger than 16MB is splited into segments,
and each worker thread compresses a segment as if it were a separate
file. Finally, the main thread merges all the compressed segments.

Multi-threaded compression is not compatible with -Ededupe,
-E(all-)fragments and -Eztailpacking for now.

Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
---
v7:
 - support -Eztailpacking;
 - wq_private -> wq_tls;
 - minor updates.

 include/erofs/compress.h |   3 +-
 lib/compress.c           | 548 ++++++++++++++++++++++++++++++++-------
 lib/compressor.c         |   2 +
 mkfs/main.c              |   8 +-
 4 files changed, 464 insertions(+), 97 deletions(-)

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index b3272f7..3253611 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -14,7 +14,8 @@ extern "C"
 
 #include "internal.h"
 
-#define EROFS_CONFIG_COMPR_MAX_SZ           (4000 * 1024)
+#define EROFS_CONFIG_COMPR_MAX_SZ	(4000 * 1024)
+#define Z_EROFS_COMPR_QUEUE_SZ		(EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
diff --git a/lib/compress.c b/lib/compress.c
index 4101009..0d796c8 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -20,6 +20,9 @@
 #include "erofs/block_list.h"
 #include "erofs/compress_hints.h"
 #include "erofs/fragments.h"
+#ifdef EROFS_MT_ENABLED
+#include "erofs/workqueue.h"
+#endif
 
 /* compressing configuration specified by users */
 struct erofs_compress_cfg {
@@ -33,29 +36,77 @@ struct z_erofs_extent_item {
 	struct z_erofs_inmem_extent e;
 };
 
-struct z_erofs_vle_compress_ctx {
-	u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
+struct z_erofs_compress_ictx {
+	struct erofs_inode *inode;
+	int fd;
+	unsigned int pclustersize;
+
+	u32 tof_chksum;
+	bool fix_dedupedfrag;
+	bool fragemitted;
+
+	/* fields for write indexes */
+	u8 *metacur;
+	struct list_head extents;
+	u16 clusterofs;
+};
+
+struct z_erofs_compress_sctx {		/* segment context */
+	struct z_erofs_compress_ictx *ictx;
+
+	u8 *queue;
 	struct list_head extents;
 	struct z_erofs_extent_item *pivot;
 
-	struct erofs_inode *inode;
-	struct erofs_compress_cfg *ccfg;
+	struct erofs_compress *chandle;
+	char *destbuf;
 
-	u8 *metacur;
 	unsigned int head, tail;
 	erofs_off_t remaining;
-	unsigned int pclustersize;
 	erofs_blk_t blkaddr;		/* pointing to the next blkaddr */
 	u16 clusterofs;
 
-	u32 tof_chksum;
-	bool fix_dedupedfrag;
-	bool fragemitted;
+	int seg_num, seg_idx;
+
+	void *membuf;
+	erofs_off_t memoff;
+};
+
+#ifdef EROFS_MT_ENABLED
+struct erofs_compress_wq_tls {
+	u8 *queue;
+	char *destbuf;
+	struct erofs_compress_cfg *ccfg;
 };
 
+struct erofs_compress_work {
+	/* Note: struct erofs_work must be the first member */
+	struct erofs_work work;
+	struct z_erofs_compress_sctx ctx;
+	struct erofs_compress_work *next;
+
+	unsigned int alg_id;
+	char *alg_name;
+	unsigned int comp_level;
+	unsigned int dict_size;
+
+	int errcode;
+};
+
+static struct {
+	struct erofs_workqueue wq;
+	struct erofs_compress_work *idle;
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+	int nfini;
+} z_erofs_mt_ctrl;
+#endif
+
+static bool z_erofs_mt_enabled;
+
 #define Z_EROFS_LEGACY_MAP_HEADER_SIZE	Z_EROFS_FULL_INDEX_ALIGN(0)
 
-static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes_final(struct z_erofs_compress_ictx *ctx)
 {
 	const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
 	struct z_erofs_lcluster_index di;
@@ -71,7 +122,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
 	ctx->metacur += sizeof(di);
 }
 
-static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
+static void z_erofs_write_extent(struct z_erofs_compress_ictx *ctx,
 				 struct z_erofs_inmem_extent *e)
 {
 	struct erofs_inode *inode = ctx->inode;
@@ -170,7 +221,7 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
 	ctx->clusterofs = clusterofs + count;
 }
 
-static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes(struct z_erofs_compress_ictx *ctx)
 {
 	struct z_erofs_extent_item *ei, *n;
 
@@ -184,15 +235,16 @@ static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
 	z_erofs_write_indexes_final(ctx);
 }
 
-static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx)
+static bool z_erofs_need_refill(struct z_erofs_compress_sctx *ctx)
 {
 	const bool final = !ctx->remaining;
 	unsigned int qh_aligned, qh_after;
+	struct erofs_inode *inode = ctx->ictx->inode;
 
 	if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ)
 		return false;
 
-	qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi));
+	qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi));
 	qh_after = ctx->head - qh_aligned;
 	memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned);
 	ctx->tail -= qh_aligned;
@@ -204,7 +256,7 @@ static struct z_erofs_extent_item dummy_pivot = {
 	.e.length = 0
 };
 
-static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
+static void z_erofs_commit_extent(struct z_erofs_compress_sctx *ctx,
 				  struct z_erofs_extent_item *ei)
 {
 	if (ei == &dummy_pivot)
@@ -212,14 +264,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
 
 	list_add_tail(&ei->list, &ctx->extents);
 	ctx->clusterofs = (ctx->clusterofs + ei->e.length) &
-			(erofs_blksiz(ctx->inode->sbi) - 1);
-
+			  (erofs_blksiz(ctx->ictx->inode->sbi) - 1);
 }
 
-static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx,
+static int z_erofs_compress_dedupe(struct z_erofs_compress_sctx *ctx,
 				   unsigned int *len)
 {
-	struct erofs_inode *inode = ctx->inode;
+	struct erofs_inode *inode = ctx->ictx->inode;
 	const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1;
 	struct erofs_sb_info *sbi = inode->sbi;
 	struct z_erofs_extent_item *ei = ctx->pivot;
@@ -315,16 +366,17 @@ out:
 	return 0;
 }
 
-static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
+static int write_uncompressed_extent(struct z_erofs_compress_sctx *ctx,
 				     unsigned int len, char *dst)
 {
-	struct erofs_sb_info *sbi = ctx->inode->sbi;
+	struct erofs_inode *inode = ctx->ictx->inode;
+	struct erofs_sb_info *sbi = inode->sbi;
 	unsigned int count = min(erofs_blksiz(sbi), len);
 	unsigned int interlaced_offset, rightpart;
 	int ret;
 
 	/* write interlaced uncompressed data if needed */
-	if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
+	if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
 		interlaced_offset = ctx->clusterofs;
 	else
 		interlaced_offset = 0;
@@ -335,11 +387,17 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
 	memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
 	memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
 
-	erofs_dbg("Writing %u uncompressed data to block %u",
-		  count, ctx->blkaddr);
-	ret = blk_write(sbi, dst, ctx->blkaddr, 1);
-	if (ret)
-		return ret;
+	if (ctx->membuf) {
+		erofs_dbg("Writing %u uncompressed data to membuf", count);
+		memcpy(ctx->membuf + ctx->memoff, dst, erofs_blksiz(sbi));
+		ctx->memoff += erofs_blksiz(sbi);
+	} else {
+		erofs_dbg("Writing %u uncompressed data to block %u", count,
+			  ctx->blkaddr);
+		ret = blk_write(sbi, dst, ctx->blkaddr, 1);
+		if (ret)
+			return ret;
+	}
 	return count;
 }
 
@@ -379,12 +437,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode *inode, void *data,
 	return len;
 }
 
-static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
+static void tryrecompress_trailing(struct z_erofs_compress_sctx *ctx,
 				   struct erofs_compress *ec,
 				   void *in, unsigned int *insize,
 				   void *out, unsigned int *compressedsize)
 {
-	struct erofs_sb_info *sbi = ctx->inode->sbi;
+	struct erofs_sb_info *sbi = ctx->ictx->inode->sbi;
 	static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
 	unsigned int count;
 	int ret = *compressedsize;
@@ -406,10 +464,11 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
 	*compressedsize = ret;
 }
 
-static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
+static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compress_sctx *ctx,
 					   unsigned int len)
 {
-	struct erofs_inode *inode = ctx->inode;
+	struct z_erofs_compress_ictx *ictx = ctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
 	struct erofs_sb_info *sbi = inode->sbi;
 	const unsigned int newsize = ctx->remaining + len;
 
@@ -417,9 +476,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
 
 	/* try to fix again if it gets larger (should be rare) */
 	if (inode->fragment_size < newsize) {
-		ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
-					  roundup(newsize - inode->fragment_size,
-						  erofs_blksiz(sbi)));
+		ictx->pclustersize = min_t(erofs_off_t,
+				z_erofs_get_max_pclustersize(inode),
+				roundup(newsize - inode->fragment_size,
+					erofs_blksiz(sbi)));
 		return false;
 	}
 
@@ -436,29 +496,32 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
 	return true;
 }
 
-static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
+static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
 				  struct z_erofs_inmem_extent *e)
 {
-	static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
-	struct erofs_inode *inode = ctx->inode;
+	static char g_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
+	char *dstbuf = ctx->destbuf ?: g_dstbuf;
+	struct z_erofs_compress_ictx *ictx = ctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
 	struct erofs_sb_info *sbi = inode->sbi;
 	unsigned int blksz = erofs_blksiz(sbi);
 	char *const dst = dstbuf + blksz;
-	struct erofs_compress *const h = &ctx->ccfg->handle;
+	struct erofs_compress *const h = ctx->chandle;
 	unsigned int len = ctx->tail - ctx->head;
 	bool is_packed_inode = erofs_is_packed_inode(inode);
 	bool final = !ctx->remaining;
-	bool may_packing = (cfg.c_fragments && final && !is_packed_inode);
+	bool may_packing = (cfg.c_fragments && final && !is_packed_inode &&
+			    !z_erofs_mt_enabled);
 	bool may_inline = (cfg.c_ztailpacking && final && !may_packing);
 	unsigned int compressedsize;
 	int ret;
 
-	if (len <= ctx->pclustersize) {
+	if (len <= ictx->pclustersize) {
 		if (!final || !len)
 			return 1;
 		if (may_packing) {
-			if (inode->fragment_size && !ctx->fix_dedupedfrag) {
-				ctx->pclustersize = roundup(len, blksz);
+			if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+				ictx->pclustersize = roundup(len, blksz);
 				goto fix_dedupedfrag;
 			}
 			e->length = len;
@@ -470,7 +533,7 @@ static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
 
 	e->length = min(len, cfg.c_max_decompressed_extent_bytes);
 	ret = erofs_compress_destsize(h, ctx->queue + ctx->head,
-				      &e->length, dst, ctx->pclustersize);
+				      &e->length, dst, ictx->pclustersize);
 	if (ret <= 0) {
 		erofs_err("failed to compress %s: %s", inode->i_srcpath,
 			  erofs_strerror(ret));
@@ -507,16 +570,16 @@ nocompression:
 		e->compressedblks = 1;
 		e->raw = true;
 	} else if (may_packing && len == e->length &&
-		   compressedsize < ctx->pclustersize &&
-		   (!inode->fragment_size || ctx->fix_dedupedfrag)) {
+		   compressedsize < ictx->pclustersize &&
+		   (!inode->fragment_size || ictx->fix_dedupedfrag)) {
 frag_packing:
 		ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head,
-					     len, ctx->tof_chksum);
+					     len, ictx->tof_chksum);
 		if (ret < 0)
 			return ret;
 		e->compressedblks = 0; /* indicate a fragment */
 		e->raw = false;
-		ctx->fragemitted = true;
+		ictx->fragemitted = true;
 	/* tailpcluster should be less than 1 block */
 	} else if (may_inline && len == e->length && compressedsize < blksz) {
 		if (ctx->clusterofs + len <= blksz) {
@@ -545,8 +608,8 @@ frag_packing:
 		 */
 		if (may_packing && len == e->length &&
 		    (compressedsize & (blksz - 1)) &&
-		    ctx->tail < sizeof(ctx->queue)) {
-			ctx->pclustersize = roundup(compressedsize, blksz);
+		    ctx->tail < Z_EROFS_COMPR_QUEUE_SZ) {
+			ictx->pclustersize = roundup(compressedsize, blksz);
 			goto fix_dedupedfrag;
 		}
 
@@ -569,34 +632,45 @@ frag_packing:
 		}
 
 		/* write compressed data */
-		erofs_dbg("Writing %u compressed data to %u of %u blocks",
-			  e->length, ctx->blkaddr, e->compressedblks);
+		if (ctx->membuf) {
+			erofs_off_t sz = e->compressedblks * blksz;
+			erofs_dbg("Writing %u compressed data to membuf of %u blocks",
+				  e->length, e->compressedblks);
 
-		ret = blk_write(sbi, dst - padding, ctx->blkaddr,
-				e->compressedblks);
-		if (ret)
-			return ret;
+			memcpy(ctx->membuf + ctx->memoff, dst - padding, sz);
+			ctx->memoff += sz;
+		} else {
+			erofs_dbg("Writing %u compressed data to %u of %u blocks",
+				  e->length, ctx->blkaddr, e->compressedblks);
+
+			ret = blk_write(sbi, dst - padding, ctx->blkaddr,
+					e->compressedblks);
+			if (ret)
+				return ret;
+		}
 		e->raw = false;
 		may_inline = false;
 		may_packing = false;
 	}
 	e->partial = false;
 	e->blkaddr = ctx->blkaddr;
+	if (ctx->blkaddr != EROFS_NULL_ADDR)
+		ctx->blkaddr += e->compressedblks;
 	if (!may_inline && !may_packing && !is_packed_inode)
 		(void)z_erofs_dedupe_insert(e, ctx->queue + ctx->head);
-	ctx->blkaddr += e->compressedblks;
 	ctx->head += e->length;
 	return 0;
 
 fix_dedupedfrag:
 	DBG_BUGON(!inode->fragment_size);
 	ctx->remaining += inode->fragment_size;
-	ctx->fix_dedupedfrag = true;
+	ictx->fix_dedupedfrag = true;
 	return 1;
 }
 
-static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
+static int z_erofs_compress_one(struct z_erofs_compress_sctx *ctx)
 {
+	struct z_erofs_compress_ictx *ictx = ctx->ictx;
 	unsigned int len = ctx->tail - ctx->head;
 	struct z_erofs_extent_item *ei;
 
@@ -624,7 +698,7 @@ static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
 
 		len -= ei->e.length;
 		ctx->pivot = ei;
-		if (ctx->fix_dedupedfrag && !ctx->fragemitted &&
+		if (ictx->fix_dedupedfrag && !ictx->fragemitted &&
 		    z_erofs_fixup_deduped_fragment(ctx, len))
 			break;
 
@@ -912,13 +986,268 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode)
 	inode->eof_tailraw = NULL;
 }
 
+int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
+			     u64 offset, erofs_blk_t blkaddr)
+{
+	int fd = ctx->ictx->fd;
+
+	ctx->blkaddr = blkaddr;
+	while (ctx->remaining) {
+		const u64 rx = min_t(u64, ctx->remaining,
+				     Z_EROFS_COMPR_QUEUE_SZ - ctx->tail);
+		int ret;
+
+		ret = (offset == -1 ?
+			read(fd, ctx->queue + ctx->tail, rx) :
+			pread(fd, ctx->queue + ctx->tail, rx, offset));
+		if (ret != rx)
+			return -errno;
+
+		ctx->remaining -= rx;
+		ctx->tail += rx;
+		if (offset != -1)
+			offset += rx;
+
+		ret = z_erofs_compress_one(ctx);
+		if (ret)
+			return ret;
+	}
+	DBG_BUGON(ctx->head != ctx->tail);
+
+	if (ctx->pivot) {
+		z_erofs_commit_extent(ctx, ctx->pivot);
+		ctx->pivot = NULL;
+	}
+	return 0;
+}
+
+#ifdef EROFS_MT_ENABLED
+void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
+{
+	struct erofs_compress_wq_tls *tls;
+
+	tls = calloc(1, sizeof(*tls));
+	if (!tls)
+		return NULL;
+
+	tls->queue = malloc(Z_EROFS_COMPR_QUEUE_SZ);
+	if (!tls->queue)
+		goto err_free_priv;
+
+	tls->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ +
+			      EROFS_MAX_BLOCK_SIZE);
+	if (!tls->destbuf)
+		goto err_free_queue;
+
+	tls->ccfg = calloc(EROFS_MAX_COMPR_CFGS, sizeof(*tls->ccfg));
+	if (!tls->ccfg)
+		goto err_free_destbuf;
+	return tls;
+
+err_free_destbuf:
+	free(tls->destbuf);
+err_free_queue:
+	free(tls->queue);
+err_free_priv:
+	free(tls);
+	return NULL;
+}
+
+int z_erofs_mt_wq_tls_init_compr(struct erofs_sb_info *sbi,
+				 struct erofs_compress_wq_tls *tls,
+				 unsigned int alg_id, char *alg_name,
+				 unsigned int comp_level,
+				 unsigned int dict_size)
+{
+	struct erofs_compress_cfg *lc = &tls->ccfg[alg_id];
+	int ret;
+
+	if (likely(lc->enable))
+		return 0;
+
+	ret = erofs_compressor_init(sbi, &lc->handle, alg_name,
+				    comp_level, dict_size);
+	if (ret)
+		return ret;
+	lc->algorithmtype = alg_id;
+	lc->enable = true;
+	return 0;
+}
+
+void *z_erofs_mt_wq_tls_free(struct erofs_workqueue *wq, void *priv)
+{
+	struct erofs_compress_wq_tls *tls = priv;
+	int i;
+
+	for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++)
+		if (tls->ccfg[i].enable)
+			erofs_compressor_exit(&tls->ccfg[i].handle);
+
+	free(tls->ccfg);
+	free(tls->destbuf);
+	free(tls->queue);
+	free(tls);
+	return NULL;
+}
+
+void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
+{
+	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
+	struct erofs_compress_wq_tls *tls = tlsp;
+	struct z_erofs_compress_sctx *ctx = &cwork->ctx;
+	u64 offset = ctx->seg_idx * cfg.c_segment_size;
+	int ret = 0;
+
+	ret = z_erofs_mt_wq_tls_init_compr(ctx->ictx->inode->sbi, tls,
+					   cwork->alg_id, cwork->alg_name,
+					   cwork->comp_level,
+					   cwork->dict_size);
+	if (ret)
+		goto out;
+
+	ctx->queue = tls->queue;
+	ctx->destbuf = tls->destbuf;
+	ctx->chandle = &tls->ccfg[cwork->alg_id].handle;
+
+	ctx->membuf = malloc(ctx->remaining);
+	if (!ctx->membuf) {
+		ret = -ENOMEM;
+		goto out;
+	}
+	ctx->memoff = 0;
+
+	ret = z_erofs_compress_segment(ctx, offset, EROFS_NULL_ADDR);
+
+out:
+	cwork->errcode = ret;
+	pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
+	++z_erofs_mt_ctrl.nfini;
+	pthread_cond_signal(&z_erofs_mt_ctrl.cond);
+	pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+}
+
+int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
+			  struct z_erofs_compress_sctx *ctx)
+{
+	struct z_erofs_extent_item *ei, *n;
+	struct erofs_sb_info *sbi = ictx->inode->sbi;
+	erofs_blk_t blkoff = 0;
+	int ret = 0, ret2;
+
+	list_for_each_entry_safe(ei, n, &ctx->extents, list) {
+		list_del(&ei->list);
+		list_add_tail(&ei->list, &ictx->extents);
+
+		if (ei->e.blkaddr != EROFS_NULL_ADDR)	/* deduped extents */
+			continue;
+
+		ei->e.blkaddr = ctx->blkaddr;
+		ctx->blkaddr += ei->e.compressedblks;
+
+		ret2 = blk_write(sbi, ctx->membuf + blkoff * erofs_blksiz(sbi),
+				 ei->e.blkaddr, ei->e.compressedblks);
+		blkoff += ei->e.compressedblks;
+		if (ret2) {
+			ret = ret2;
+			continue;
+		}
+	}
+	free(ctx->membuf);
+	return ret;
+}
+
+int z_erofs_mt_compress(struct z_erofs_compress_ictx *ctx,
+			struct erofs_compress_cfg *ccfg,
+			erofs_blk_t blkaddr,
+			erofs_blk_t *compressed_blocks)
+{
+	struct erofs_compress_work *cur, *head = NULL, **last = &head;
+	struct erofs_inode *inode = ctx->inode;
+	int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
+	int ret, i;
+
+	z_erofs_mt_ctrl.nfini = 0;
+
+	for (i = 0; i < nsegs; i++) {
+		if (z_erofs_mt_ctrl.idle) {
+			cur = z_erofs_mt_ctrl.idle;
+			z_erofs_mt_ctrl.idle = cur->next;
+			cur->next = NULL;
+		} else {
+			cur = calloc(1, sizeof(*cur));
+			if (!cur)
+				return -ENOMEM;
+		}
+		*last = cur;
+		last = &cur->next;
+
+		cur->ctx = (struct z_erofs_compress_sctx) {
+			.ictx = ctx,
+			.seg_num = nsegs,
+			.seg_idx = i,
+			.pivot = &dummy_pivot,
+		};
+		init_list_head(&cur->ctx.extents);
+
+		if (i == nsegs - 1)
+			cur->ctx.remaining = inode->i_size -
+					      inode->fragment_size -
+					      i * cfg.c_segment_size;
+		else
+			cur->ctx.remaining = cfg.c_segment_size;
+
+		cur->alg_id = ccfg->handle.alg->id;
+		cur->alg_name = ccfg->handle.alg->name;
+		cur->comp_level = ccfg->handle.compression_level;
+		cur->dict_size = ccfg->handle.dict_size;
+
+		cur->work.fn = z_erofs_mt_workfn;
+		erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
+	}
+
+	pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
+	while (z_erofs_mt_ctrl.nfini != nsegs)
+		pthread_cond_wait(&z_erofs_mt_ctrl.cond,
+				  &z_erofs_mt_ctrl.mutex);
+	pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+
+	ret = 0;
+	while (head) {
+		cur = head;
+		head = cur->next;
+
+		if (cur->errcode) {
+			ret = cur->errcode;
+		} else {
+			int ret2;
+
+			cur->ctx.blkaddr = blkaddr;
+			ret2 = z_erofs_merge_segment(ctx, &cur->ctx);
+			if (ret2)
+				ret = ret2;
+
+			*compressed_blocks += cur->ctx.blkaddr - blkaddr;
+			blkaddr = cur->ctx.blkaddr;
+		}
+
+		cur->next = z_erofs_mt_ctrl.idle;
+		z_erofs_mt_ctrl.idle = cur;
+	}
+	return ret;
+}
+#endif
+
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 {
+	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
 	struct erofs_buffer_head *bh;
-	static struct z_erofs_vle_compress_ctx ctx;
-	erofs_blk_t blkaddr, compressed_blocks;
+	static struct z_erofs_compress_ictx ctx;
+	static struct z_erofs_compress_sctx sctx;
+	struct erofs_compress_cfg *ccfg;
+	erofs_blk_t blkaddr, compressed_blocks = 0;
 	unsigned int legacymetasize;
 	int ret;
+	bool ismt = false;
 	struct erofs_sb_info *sbi = inode->sbi;
 	u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
 				  sizeof(struct z_erofs_lcluster_index) +
@@ -963,8 +1292,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 		}
 	}
 #endif
-	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype;
+	ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ccfg[0].algorithmtype;
 	inode->z_algorithmtype[1] = 0;
 
 	inode->idata_size = 0;
@@ -983,50 +1312,45 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
 	ctx.inode = inode;
 	ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
-	ctx.blkaddr = blkaddr;
 	ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
-	ctx.head = ctx.tail = 0;
-	ctx.clusterofs = 0;
-	ctx.pivot = &dummy_pivot;
 	init_list_head(&ctx.extents);
-	ctx.remaining = inode->i_size - inode->fragment_size;
+	ctx.fd = fd;
 	ctx.fix_dedupedfrag = false;
 	ctx.fragemitted = false;
+	sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, };
+	init_list_head(&sctx.extents);
+
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
 		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
 		if (ret)
 			goto err_free_idata;
+#ifdef EROFS_MT_ENABLED
+	} else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
+		ismt = true;
+		ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks);
+		if (ret)
+			goto err_free_idata;
+#endif
 	} else {
-		while (ctx.remaining) {
-			const u64 rx = min_t(u64, ctx.remaining,
-					     sizeof(ctx.queue) - ctx.tail);
-
-			ret = read(fd, ctx.queue + ctx.tail, rx);
-			if (ret != rx) {
-				ret = -errno;
-				goto err_bdrop;
-			}
-			ctx.remaining -= rx;
-			ctx.tail += rx;
-
-			ret = z_erofs_compress_one(&ctx);
-			if (ret)
-				goto err_free_idata;
-		}
+		sctx.queue = g_queue;
+		sctx.destbuf = NULL;
+		sctx.chandle = &ccfg->handle;
+		sctx.remaining = inode->i_size - inode->fragment_size;
+		sctx.seg_num = 1;
+		sctx.seg_idx = 0;
+		sctx.pivot = &dummy_pivot;
+
+		ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
+		if (ret)
+			goto err_free_idata;
+		compressed_blocks = sctx.blkaddr - blkaddr;
 	}
-	DBG_BUGON(ctx.head != ctx.tail);
 
 	/* fall back to no compression mode */
-	compressed_blocks = ctx.blkaddr - blkaddr;
 	DBG_BUGON(compressed_blocks < !!inode->idata_size);
 	compressed_blocks -= !!inode->idata_size;
 
-	if (ctx.pivot) {
-		z_erofs_commit_extent(&ctx, ctx.pivot);
-		ctx.pivot = NULL;
-	}
-
 	/* generate an extent for the deduplicated fragment */
 	if (inode->fragment_size && !ctx.fragemitted) {
 		struct z_erofs_extent_item *ei;
@@ -1042,13 +1366,16 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 			.compressedblks = 0,
 			.raw = false,
 			.partial = false,
-			.blkaddr = ctx.blkaddr,
+			.blkaddr = sctx.blkaddr,
 		};
 		init_list_head(&ei->list);
-		z_erofs_commit_extent(&ctx, ei);
+		z_erofs_commit_extent(&sctx, ei);
 	}
 	z_erofs_fragments_commit(inode);
 
+	if (!ismt)
+		list_splice_tail(&sctx.extents, &ctx.extents);
+
 	z_erofs_write_indexes(&ctx);
 	legacymetasize = ctx.metacur - compressmeta;
 	/* estimate if data compression saves space or not */
@@ -1257,8 +1584,25 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 		return -EINVAL;
 	}
 
-	if (erofs_sb_has_compr_cfgs(sbi))
-		return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
+	if (erofs_sb_has_compr_cfgs(sbi)) {
+		ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
+		if (ret)
+			return ret;
+	}
+
+	z_erofs_mt_enabled = false;
+#ifdef EROFS_MT_ENABLED
+	if (cfg.c_mt_workers > 1) {
+		pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
+		pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
+		ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
+					    cfg.c_mt_workers,
+					    cfg.c_mt_workers << 2,
+					    z_erofs_mt_wq_tls_alloc,
+					    z_erofs_mt_wq_tls_free);
+		z_erofs_mt_enabled = !ret;
+	}
+#endif
 	return 0;
 }
 
@@ -1271,5 +1615,19 @@ int z_erofs_compress_exit(void)
 		if (ret)
 			return ret;
 	}
+
+	if (z_erofs_mt_enabled) {
+#ifdef EROFS_MT_ENABLED
+		ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
+		if (ret)
+			return ret;
+		while (z_erofs_mt_ctrl.idle) {
+			struct erofs_compress_work *tmp =
+				z_erofs_mt_ctrl.idle->next;
+			free(z_erofs_mt_ctrl.idle);
+			z_erofs_mt_ctrl.idle = tmp;
+		}
+#endif
+	}
 	return 0;
 }
diff --git a/lib/compressor.c b/lib/compressor.c
index 58eae2a..175259e 100644
--- a/lib/compressor.c
+++ b/lib/compressor.c
@@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
 
 	/* should be written in "minimum compression ratio * 100" */
 	c->compress_threshold = 100;
+	c->compression_level = -1;
+	c->dict_size = 0;
 
 	if (!alg_name) {
 		c->alg = NULL;
diff --git a/mkfs/main.c b/mkfs/main.c
index 126a049..5dbaf9f 100644
--- a/mkfs/main.c
+++ b/mkfs/main.c
@@ -678,7 +678,7 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
 
 			processors = erofs_get_available_processors();
 			if (cfg.c_mt_workers > processors)
-				erofs_warn("the number of workers %d is more than the number of processors %d, performance may be impacted.",
+				erofs_warn("%d workers exceed %d processors, potentially impacting performance.",
 					   cfg.c_mt_workers, processors);
 			break;
 		}
@@ -838,6 +838,12 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
 		}
 		cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits;
 	}
+#ifdef EROFS_MT_ENABLED
+	if (cfg.c_mt_workers > 1 && (cfg.c_dedupe || cfg.c_fragments)) {
+		erofs_warn("Note that dedupe/fragments are NOT supported in multi-threaded mode for now, reseting --workers=1.");
+		cfg.c_mt_workers = 1;
+	}
+#endif
 	return 0;
 }
 
-- 
2.39.3



More information about the Linux-erofs mailing list