[PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression

Gao Xiang xiang at kernel.org
Mon Apr 22 10:34:48 AEST 2024


From: Yifan Zhao <zhaoyifan at sjtu.edu.cn>

This patch separates the compression process into two parts.

Specifically, erofs_begin_compressed_file() will trigger compression.
erofs_write_compressed_file() will wait for the compression finish and
write compressed (meta)data.

Note that it's possible that erofs_begin_compressed_file() and
erofs_write_compressed_file() run with different threads even the
global inode context is used, thus add another synchronization point.

Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
---
 include/erofs/compress.h |   5 +-
 lib/compress.c           | 138 ++++++++++++++++++++++++++++-----------
 lib/inode.c              |  17 ++++-
 3 files changed, 118 insertions(+), 42 deletions(-)

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 871db54..c9831a7 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -17,8 +17,11 @@ extern "C"
 #define EROFS_CONFIG_COMPR_MAX_SZ	(4000 * 1024)
 #define Z_EROFS_COMPR_QUEUE_SZ		(EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
+struct z_erofs_compress_ictx;
+
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx);
 
 int z_erofs_compress_init(struct erofs_sb_info *sbi,
 			  struct erofs_buffer_head *bh);
diff --git a/lib/compress.c b/lib/compress.c
index 4ac4760..7fef698 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -109,6 +109,7 @@ struct erofs_compress_work {
 static struct {
 	struct erofs_workqueue wq;
 	struct erofs_compress_work *idle;
+	pthread_mutex_t mutex;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -1312,11 +1313,13 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 	pthread_cond_init(&ictx->cond, NULL);
 
 	for (i = 0; i < nsegs; i++) {
+		pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
 		cur = z_erofs_mt_ctrl.idle;
 		if (cur) {
 			z_erofs_mt_ctrl.idle = cur->next;
 			cur->next = NULL;
 		}
+		pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
 		if (!cur) {
 			cur = calloc(1, sizeof(*cur));
 			if (!cur)
@@ -1364,8 +1367,10 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
 	pthread_mutex_unlock(&ictx->mutex);
 
 	bh = erofs_balloc(DATA, 0, 0, 0);
-	if (IS_ERR(bh))
-		return PTR_ERR(bh);
+	if (IS_ERR(bh)) {
+		ret = PTR_ERR(bh);
+		goto out;
+	}
 
 	DBG_BUGON(!head);
 	blkaddr = erofs_mapbh(bh->block);
@@ -1389,27 +1394,31 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
 			blkaddr = cur->ctx.blkaddr;
 		}
 
+		pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
 		cur->next = z_erofs_mt_ctrl.idle;
 		z_erofs_mt_ctrl.idle = cur;
-	} while(head);
+		pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+	} while (head);
 
 	if (ret)
-		return ret;
-
-	return erofs_commit_compressed_file(ictx, bh,
+		goto out;
+	ret = erofs_commit_compressed_file(ictx, bh,
 			blkaddr - compressed_blocks, compressed_blocks);
+
+out:
+	close(ictx->fd);
+	free(ictx);
+	return ret;
 }
 #endif
 
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
+static struct z_erofs_compress_ictx g_ictx;
+
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
-	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
-	struct erofs_buffer_head *bh;
-	static struct z_erofs_compress_ictx ctx;
-	static struct z_erofs_compress_sctx sctx;
-	erofs_blk_t blkaddr;
-	int ret;
 	struct erofs_sb_info *sbi = inode->sbi;
+	struct z_erofs_compress_ictx *ictx;
+	int ret;
 
 	/* initialize per-file compression setting */
 	inode->z_advise = 0;
@@ -1440,43 +1449,87 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		}
 	}
 #endif
-	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ctx.ccfg->algorithmtype;
-	inode->z_algorithmtype[1] = 0;
-
 	inode->idata_size = 0;
 	inode->fragment_size = 0;
 
+	if (z_erofs_mt_enabled) {
+		ictx = malloc(sizeof(*ictx));
+		if (!ictx)
+			return ERR_PTR(-ENOMEM);
+		ictx->fd = dup(fd);
+	} else {
+#ifdef EROFS_MT_ENABLED
+		pthread_mutex_lock(&g_ictx.mutex);
+		if (g_ictx.seg_num)
+			pthread_cond_wait(&g_ictx.cond, &g_ictx.mutex);
+		g_ictx.seg_num = 1;
+		pthread_mutex_unlock(&g_ictx.mutex);
+#endif
+		ictx = &g_ictx;
+		ictx->fd = fd;
+	}
+
+	ictx->ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ictx->ccfg->algorithmtype;
+	inode->z_algorithmtype[1] = 0;
+
 	/*
 	 * Handle tails in advance to avoid writing duplicated
 	 * parts into the packed inode.
 	 */
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
-		ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
+		ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
 		if (ret < 0)
-			return ret;
+			goto err_free_ictx;
 	}
 
-	ctx.inode = inode;
-	ctx.fd = fd;
-	ctx.fpos = fpos;
-	init_list_head(&ctx.extents);
-	ctx.fix_dedupedfrag = false;
-	ctx.fragemitted = false;
+	ictx->inode = inode;
+	ictx->fpos = fpos;
+	init_list_head(&ictx->extents);
+	ictx->fix_dedupedfrag = false;
+	ictx->fragemitted = false;
 
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
-		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
+		ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
 		if (ret)
 			goto err_free_idata;
+	}
 #ifdef EROFS_MT_ENABLED
-	} else if (z_erofs_mt_enabled) {
-		ret = z_erofs_mt_compress(&ctx);
+	if (ictx != &g_ictx) {
+		ret = z_erofs_mt_compress(ictx);
 		if (ret)
 			goto err_free_idata;
-		return erofs_mt_write_compressed_file(&ctx);
+	}
 #endif
+	return ictx;
+
+err_free_idata:
+	if (inode->idata) {
+		free(inode->idata);
+		inode->idata = NULL;
 	}
+err_free_ictx:
+	if (ictx != &g_ictx)
+		free(ictx);
+	return ERR_PTR(ret);
+}
+
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx)
+{
+	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
+	struct erofs_buffer_head *bh;
+	static struct z_erofs_compress_sctx sctx;
+	struct erofs_compress_cfg *ccfg = ictx->ccfg;
+	struct erofs_inode *inode = ictx->inode;
+	erofs_blk_t blkaddr;
+	int ret;
+
+#ifdef EROFS_MT_ENABLED
+	if (ictx != &g_ictx)
+		return erofs_mt_write_compressed_file(ictx);
+#endif
+
 	/* allocate main data buffer */
 	bh = erofs_balloc(DATA, 0, 0, 0);
 	if (IS_ERR(bh)) {
@@ -1485,11 +1538,11 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	}
 	blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
 
-	ctx.seg_num = 1;
+	ictx->seg_num = 1;
 	sctx = (struct z_erofs_compress_sctx) {
-		.ictx = &ctx,
+		.ictx = ictx,
 		.queue = g_queue,
-		.chandle = &ctx.ccfg->handle,
+		.chandle = &ccfg->handle,
 		.remaining = inode->i_size - inode->fragment_size,
 		.seg_idx = 0,
 		.pivot = &dummy_pivot,
@@ -1499,19 +1552,26 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 
 	ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
 	if (ret)
-		goto err_bdrop;
-	list_splice_tail(&sctx.extents, &ctx.extents);
+		goto err_free_idata;
 
-	return erofs_commit_compressed_file(&ctx, bh, blkaddr,
-					    sctx.blkaddr - blkaddr);
+	list_splice_tail(&sctx.extents, &ictx->extents);
+	ret = erofs_commit_compressed_file(ictx, bh, blkaddr,
+					   sctx.blkaddr - blkaddr);
+	goto out;
 
-err_bdrop:
-	erofs_bdrop(bh, true);	/* revoke buffer */
 err_free_idata:
+	erofs_bdrop(bh, true);	/* revoke buffer */
 	if (inode->idata) {
 		free(inode->idata);
 		inode->idata = NULL;
 	}
+out:
+#ifdef EROFS_MT_ENABLED
+	pthread_mutex_lock(&ictx->mutex);
+	ictx->seg_num = 0;
+	pthread_cond_signal(&ictx->cond);
+	pthread_mutex_unlock(&ictx->mutex);
+#endif
 	return ret;
 }
 
@@ -1666,6 +1726,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 					    z_erofs_mt_wq_tls_free);
 		z_erofs_mt_enabled = !ret;
 	}
+	pthread_mutex_init(&g_ictx.mutex, NULL);
+	pthread_cond_init(&g_ictx.cond, NULL);
 #endif
 	return 0;
 }
diff --git a/lib/inode.c b/lib/inode.c
index 1ff05e1..0d044f4 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -499,10 +499,15 @@ int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
 	DBG_BUGON(!inode->i_size);
 
 	if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
+		void *ictx;
 		int ret;
 
-		ret = erofs_write_compressed_file(inode, fd, fpos);
-		if (!ret || ret != -ENOSPC)
+		ictx = erofs_begin_compressed_file(inode, fd, fpos);
+		if (IS_ERR(ictx))
+			return PTR_ERR(ictx);
+
+		ret = erofs_write_compressed_file(ictx);
+		if (ret != -ENOSPC)
 			return ret;
 
 		if (lseek(fd, fpos, SEEK_SET) < 0)
@@ -1362,6 +1367,7 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 {
 	struct stat st;
 	struct erofs_inode *inode;
+	void *ictx;
 	int ret;
 
 	ret = lseek(fd, 0, SEEK_SET);
@@ -1392,7 +1398,12 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 		inode->nid = inode->sbi->packed_nid;
 	}
 
-	ret = erofs_write_compressed_file(inode, fd, 0);
+	ictx = erofs_begin_compressed_file(inode, fd, 0);
+	if (IS_ERR(ictx))
+		return ERR_CAST(ictx);
+
+	DBG_BUGON(!ictx);
+	ret = erofs_write_compressed_file(ictx);
 	if (ret == -ENOSPC) {
 		ret = lseek(fd, 0, SEEK_SET);
 		if (ret < 0)
-- 
2.30.2



More information about the Linux-erofs mailing list