[PATCH v3 2/2] erofs-utils: mkfs: implement multi-threaded fragments

Gao Xiang hsiangkao at linux.alibaba.com
Sun Mar 23 15:34:51 AEDT 2025


Currently, only `-Eall-fragments` is allowed for multi-threaded
compression.  However, in many cases, we don't want the entire file
merged into the packed inode, as it may impact runtime performance.

Let's implement multi-threaded compression for `-Efragments` now,
although it's still not very fast and need to optimize performance
even further for this option.

Note that the image sizes could be larger without `-Ededupe` compared
to `-Eall-fragments` since the head parts aren't deduplicated for now.

Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
---
change since v2:
 - fix up broken `-Ededupe` fallback.

 lib/compress.c  | 137 +++++++++++++++++++++++++++++++-----------------
 lib/fragments.c |  14 ++---
 lib/inode.c     |   2 +-
 3 files changed, 93 insertions(+), 60 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index 0b48c06..32f58b5 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -110,9 +110,10 @@ struct erofs_compress_work {
 };
 
 static struct {
-	struct erofs_workqueue wq;
+	struct erofs_workqueue wq, fwq;
 	struct erofs_compress_work *idle;
 	pthread_mutex_t mutex;
+	bool hasfwq;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -577,11 +578,11 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
 	if (len <= ctx->pclustersize) {
 		if (!final || !len)
 			return 1;
-		if (inode->fragment_size && !ictx->fix_dedupedfrag) {
-			ctx->pclustersize = roundup(len, blksz);
-			goto fix_dedupedfrag;
-		}
 		if (may_packing) {
+			if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+				ctx->pclustersize = roundup(len, blksz);
+				goto fix_dedupedfrag;
+			}
 			e->length = len;
 			goto frag_packing;
 		}
@@ -1056,7 +1057,22 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
 			     u64 offset, erofs_blk_t blkaddr)
 {
 	struct z_erofs_compress_ictx *ictx = ctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
+	bool frag = cfg.c_fragments && !erofs_is_packed_inode(inode) &&
+		ctx->seg_idx >= ictx->seg_num - 1;
 	int fd = ictx->fd;
+	int ret;
+
+	DBG_BUGON(offset != -1 && frag && inode->fragment_size);
+	if (offset != -1 && frag && !inode->fragment_size &&
+	    cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+		ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
+		if (ret < 0)
+			return ret;
+		if (inode->fragment_size > ctx->remaining)
+			inode->fragment_size = ctx->remaining;
+		ctx->remaining -= inode->fragment_size;
+	}
 
 	ctx->blkaddr = blkaddr;
 	while (ctx->remaining) {
@@ -1088,8 +1104,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
 	}
 
 	/* generate an extra extent for the deduplicated fragment */
-	if (ctx->seg_idx >= ictx->seg_num - 1 &&
-	    ictx->inode->fragment_size && !ictx->fragemitted) {
+	if (frag && inode->fragment_size && !ictx->fragemitted) {
 		struct z_erofs_extent_item *ei;
 
 		ei = malloc(sizeof(*ei));
@@ -1097,7 +1112,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
 			return -ENOMEM;
 
 		ei->e = (struct z_erofs_inmem_extent) {
-			.length = ictx->inode->fragment_size,
+			.length = inode->fragment_size,
 			.compressedblks = 0,
 			.raw = false,
 			.partial = false,
@@ -1207,6 +1222,8 @@ err_free_idata:
 	return ret;
 }
 
+static struct z_erofs_compress_ictx g_ictx;
+
 #ifdef EROFS_MT_ENABLED
 void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
 {
@@ -1354,9 +1371,12 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 	struct erofs_compress_work *cur, *head = NULL, **last = &head;
 	struct erofs_compress_cfg *ccfg = ictx->ccfg;
 	struct erofs_inode *inode = ictx->inode;
-	int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mkfs_segment_size);
-	int i;
+	unsigned int segsz = cfg.c_mkfs_segment_size;
+	int nsegs, i;
 
+	nsegs = DIV_ROUND_UP(inode->i_size - inode->fragment_size, segsz);
+	if (!nsegs)
+		nsegs = 1;
 	ictx->seg_num = nsegs;
 	pthread_mutex_init(&ictx->mutex, NULL);
 	pthread_cond_init(&ictx->cond, NULL);
@@ -1385,13 +1405,6 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 		};
 		init_list_head(&cur->ctx.extents);
 
-		if (i == nsegs - 1)
-			cur->ctx.remaining = inode->i_size -
-					      inode->fragment_size -
-					      i * cfg.c_mkfs_segment_size;
-		else
-			cur->ctx.remaining = cfg.c_mkfs_segment_size;
-
 		cur->alg_id = ccfg->handle.alg->id;
 		cur->alg_name = ccfg->handle.alg->name;
 		cur->comp_level = ccfg->handle.compression_level;
@@ -1399,6 +1412,17 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 		cur->errcode = 1;	/* mark as "in progress" */
 
 		cur->work.fn = z_erofs_mt_workfn;
+		if (i >= nsegs - 1) {
+			cur->ctx.remaining = inode->i_size -
+					inode->fragment_size - (u64)i * segsz;
+			if (z_erofs_mt_ctrl.hasfwq) {
+				erofs_queue_work(&z_erofs_mt_ctrl.fwq,
+						 &cur->work);
+				continue;
+			}
+		} else {
+			cur->ctx.remaining = segsz;
+		}
 		erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
 	}
 	ictx->mtworks = head;
@@ -1460,14 +1484,53 @@ out:
 	free(ictx);
 	return ret;
 }
-#endif
 
-static struct z_erofs_compress_ictx g_ictx;
+static int z_erofs_mt_init(void)
+{
+	unsigned int workers = cfg.c_mt_workers;
+	int ret;
+
+	if (workers < 1)
+		return 0;
+	if (workers >= 1 && cfg.c_dedupe) {
+		erofs_warn("multi-threaded dedupe is NOT implemented for now");
+		cfg.c_mt_workers = 0;
+	} else {
+		if (cfg.c_fragments && workers > 1) {
+			ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 32,
+						    z_erofs_mt_wq_tls_alloc,
+						    z_erofs_mt_wq_tls_free);
+			if (ret)
+				return ret;
+			z_erofs_mt_ctrl.hasfwq = true;
+			--workers;
+		}
+
+		ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers,
+					    workers << 2,
+					    z_erofs_mt_wq_tls_alloc,
+					    z_erofs_mt_wq_tls_free);
+		if (ret)
+			return ret;
+		z_erofs_mt_enabled = true;
+	}
+	pthread_mutex_init(&g_ictx.mutex, NULL);
+	pthread_cond_init(&g_ictx.cond, NULL);
+	return 0;
+}
+#else
+static int z_erofs_mt_init(void)
+{
+	return 0;
+}
+#endif
 
 void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
 	struct erofs_sb_info *sbi = inode->sbi;
 	struct z_erofs_compress_ictx *ictx;
+	bool all_fragments = cfg.c_all_fragments &&
+					!erofs_is_packed_inode(inode);
 	int ret;
 
 	/* initialize per-file compression setting */
@@ -1502,8 +1565,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	inode->idata_size = 0;
 	inode->fragment_size = 0;
 
-	if (!z_erofs_mt_enabled ||
-	    (cfg.c_all_fragments && !erofs_is_packed_inode(inode))) {
+	if (!z_erofs_mt_enabled || all_fragments) {
 #ifdef EROFS_MT_ENABLED
 		pthread_mutex_lock(&g_ictx.mutex);
 		if (g_ictx.seg_num)
@@ -1529,7 +1591,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	 * parts into the packed inode.
 	 */
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode) &&
-	    cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+	    ictx == &g_ictx && cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
 		ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
 		if (ret < 0)
 			goto err_free_ictx;
@@ -1547,8 +1609,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	ictx->fix_dedupedfrag = false;
 	ictx->fragemitted = false;
 
-	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
-	    !inode->fragment_size) {
+	if (all_fragments && !inode->fragment_size) {
 		ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
 		if (ret)
 			goto err_free_idata;
@@ -1819,30 +1880,7 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 	}
 
 	z_erofs_mt_enabled = false;
-#ifdef EROFS_MT_ENABLED
-	if (cfg.c_mt_workers >= 1 && (cfg.c_dedupe ||
-				      (cfg.c_fragments && !cfg.c_all_fragments))) {
-		if (cfg.c_dedupe)
-			erofs_warn("multi-threaded dedupe is NOT implemented for now");
-		if (cfg.c_fragments)
-			erofs_warn("multi-threaded fragments is NOT implemented for now");
-		cfg.c_mt_workers = 0;
-	}
-
-	if (cfg.c_mt_workers >= 1) {
-		ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
-					    cfg.c_mt_workers,
-					    cfg.c_mt_workers << 2,
-					    z_erofs_mt_wq_tls_alloc,
-					    z_erofs_mt_wq_tls_free);
-		if (ret)
-			return ret;
-		z_erofs_mt_enabled = true;
-	}
-	pthread_mutex_init(&g_ictx.mutex, NULL);
-	pthread_cond_init(&g_ictx.cond, NULL);
-#endif
-	return 0;
+	return z_erofs_mt_init();
 }
 
 int z_erofs_compress_exit(void)
@@ -1858,6 +1896,9 @@ int z_erofs_compress_exit(void)
 	if (z_erofs_mt_enabled) {
 #ifdef EROFS_MT_ENABLED
 		ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
+		if (ret)
+			return ret;
+		ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq);
 		if (ret)
 			return ret;
 		while (z_erofs_mt_ctrl.idle) {
diff --git a/lib/fragments.c b/lib/fragments.c
index fecebb5..41b9912 100644
--- a/lib/fragments.c
+++ b/lib/fragments.c
@@ -146,21 +146,13 @@ int z_erofs_fragments_dedupe(struct erofs_inode *inode, int fd, u32 *tofcrc)
 	if (inode->i_size <= EROFS_TOF_HASHLEN)
 		return 0;
 
-	if (erofs_lseek64(fd, inode->i_size - EROFS_TOF_HASHLEN, SEEK_SET) < 0)
-		return -errno;
-
-	ret = read(fd, data_to_hash, EROFS_TOF_HASHLEN);
+	ret = pread(fd, data_to_hash, EROFS_TOF_HASHLEN,
+		    inode->i_size - EROFS_TOF_HASHLEN);
 	if (ret != EROFS_TOF_HASHLEN)
 		return -errno;
 
 	*tofcrc = erofs_crc32c(~0, data_to_hash, EROFS_TOF_HASHLEN);
-	ret = z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
-	if (ret < 0)
-		return ret;
-	ret = lseek(fd, 0, SEEK_SET);
-	if (ret < 0)
-		return -errno;
-	return 0;
+	return z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
 }
 
 static int z_erofs_fragments_dedupe_insert(struct list_head *hash, void *data,
diff --git a/lib/inode.c b/lib/inode.c
index 8c9a8ec..c4edd43 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -1312,7 +1312,7 @@ struct erofs_mkfs_dfops {
 	bool idle;	/* initialize as false before the dfops worker runs */
 };
 
-#define EROFS_MT_QUEUE_SIZE 128
+#define EROFS_MT_QUEUE_SIZE 256
 
 static void erofs_mkfs_flushjobs(struct erofs_sb_info *sbi)
 {
-- 
2.43.5



More information about the Linux-erofs mailing list