[PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression
Gao Xiang
xiang at kernel.org
Mon Apr 22 10:34:48 AEST 2024
From: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
This patch separates the compression process into two parts.
Specifically, erofs_begin_compressed_file() will trigger compression.
erofs_write_compressed_file() will wait for the compression finish and
write compressed (meta)data.
Note that it's possible that erofs_begin_compressed_file() and
erofs_write_compressed_file() run with different threads even the
global inode context is used, thus add another synchronization point.
Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
---
include/erofs/compress.h | 5 +-
lib/compress.c | 138 ++++++++++++++++++++++++++++-----------
lib/inode.c | 17 ++++-
3 files changed, 118 insertions(+), 42 deletions(-)
diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 871db54..c9831a7 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -17,8 +17,11 @@ extern "C"
#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
#define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
+struct z_erofs_compress_ictx;
+
void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx);
int z_erofs_compress_init(struct erofs_sb_info *sbi,
struct erofs_buffer_head *bh);
diff --git a/lib/compress.c b/lib/compress.c
index 4ac4760..7fef698 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -109,6 +109,7 @@ struct erofs_compress_work {
static struct {
struct erofs_workqueue wq;
struct erofs_compress_work *idle;
+ pthread_mutex_t mutex;
} z_erofs_mt_ctrl;
#endif
@@ -1312,11 +1313,13 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
pthread_cond_init(&ictx->cond, NULL);
for (i = 0; i < nsegs; i++) {
+ pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
cur = z_erofs_mt_ctrl.idle;
if (cur) {
z_erofs_mt_ctrl.idle = cur->next;
cur->next = NULL;
}
+ pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
if (!cur) {
cur = calloc(1, sizeof(*cur));
if (!cur)
@@ -1364,8 +1367,10 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
pthread_mutex_unlock(&ictx->mutex);
bh = erofs_balloc(DATA, 0, 0, 0);
- if (IS_ERR(bh))
- return PTR_ERR(bh);
+ if (IS_ERR(bh)) {
+ ret = PTR_ERR(bh);
+ goto out;
+ }
DBG_BUGON(!head);
blkaddr = erofs_mapbh(bh->block);
@@ -1389,27 +1394,31 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
blkaddr = cur->ctx.blkaddr;
}
+ pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
cur->next = z_erofs_mt_ctrl.idle;
z_erofs_mt_ctrl.idle = cur;
- } while(head);
+ pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+ } while (head);
if (ret)
- return ret;
-
- return erofs_commit_compressed_file(ictx, bh,
+ goto out;
+ ret = erofs_commit_compressed_file(ictx, bh,
blkaddr - compressed_blocks, compressed_blocks);
+
+out:
+ close(ictx->fd);
+ free(ictx);
+ return ret;
}
#endif
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
+static struct z_erofs_compress_ictx g_ictx;
+
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
{
- static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
- struct erofs_buffer_head *bh;
- static struct z_erofs_compress_ictx ctx;
- static struct z_erofs_compress_sctx sctx;
- erofs_blk_t blkaddr;
- int ret;
struct erofs_sb_info *sbi = inode->sbi;
+ struct z_erofs_compress_ictx *ictx;
+ int ret;
/* initialize per-file compression setting */
inode->z_advise = 0;
@@ -1440,43 +1449,87 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
}
}
#endif
- ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
- inode->z_algorithmtype[0] = ctx.ccfg->algorithmtype;
- inode->z_algorithmtype[1] = 0;
-
inode->idata_size = 0;
inode->fragment_size = 0;
+ if (z_erofs_mt_enabled) {
+ ictx = malloc(sizeof(*ictx));
+ if (!ictx)
+ return ERR_PTR(-ENOMEM);
+ ictx->fd = dup(fd);
+ } else {
+#ifdef EROFS_MT_ENABLED
+ pthread_mutex_lock(&g_ictx.mutex);
+ if (g_ictx.seg_num)
+ pthread_cond_wait(&g_ictx.cond, &g_ictx.mutex);
+ g_ictx.seg_num = 1;
+ pthread_mutex_unlock(&g_ictx.mutex);
+#endif
+ ictx = &g_ictx;
+ ictx->fd = fd;
+ }
+
+ ictx->ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+ inode->z_algorithmtype[0] = ictx->ccfg->algorithmtype;
+ inode->z_algorithmtype[1] = 0;
+
/*
* Handle tails in advance to avoid writing duplicated
* parts into the packed inode.
*/
if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
- ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
+ ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
if (ret < 0)
- return ret;
+ goto err_free_ictx;
}
- ctx.inode = inode;
- ctx.fd = fd;
- ctx.fpos = fpos;
- init_list_head(&ctx.extents);
- ctx.fix_dedupedfrag = false;
- ctx.fragemitted = false;
+ ictx->inode = inode;
+ ictx->fpos = fpos;
+ init_list_head(&ictx->extents);
+ ictx->fix_dedupedfrag = false;
+ ictx->fragemitted = false;
if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
!inode->fragment_size) {
- ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
+ ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
if (ret)
goto err_free_idata;
+ }
#ifdef EROFS_MT_ENABLED
- } else if (z_erofs_mt_enabled) {
- ret = z_erofs_mt_compress(&ctx);
+ if (ictx != &g_ictx) {
+ ret = z_erofs_mt_compress(ictx);
if (ret)
goto err_free_idata;
- return erofs_mt_write_compressed_file(&ctx);
+ }
#endif
+ return ictx;
+
+err_free_idata:
+ if (inode->idata) {
+ free(inode->idata);
+ inode->idata = NULL;
}
+err_free_ictx:
+ if (ictx != &g_ictx)
+ free(ictx);
+ return ERR_PTR(ret);
+}
+
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx)
+{
+ static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
+ struct erofs_buffer_head *bh;
+ static struct z_erofs_compress_sctx sctx;
+ struct erofs_compress_cfg *ccfg = ictx->ccfg;
+ struct erofs_inode *inode = ictx->inode;
+ erofs_blk_t blkaddr;
+ int ret;
+
+#ifdef EROFS_MT_ENABLED
+ if (ictx != &g_ictx)
+ return erofs_mt_write_compressed_file(ictx);
+#endif
+
/* allocate main data buffer */
bh = erofs_balloc(DATA, 0, 0, 0);
if (IS_ERR(bh)) {
@@ -1485,11 +1538,11 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
}
blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
- ctx.seg_num = 1;
+ ictx->seg_num = 1;
sctx = (struct z_erofs_compress_sctx) {
- .ictx = &ctx,
+ .ictx = ictx,
.queue = g_queue,
- .chandle = &ctx.ccfg->handle,
+ .chandle = &ccfg->handle,
.remaining = inode->i_size - inode->fragment_size,
.seg_idx = 0,
.pivot = &dummy_pivot,
@@ -1499,19 +1552,26 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
if (ret)
- goto err_bdrop;
- list_splice_tail(&sctx.extents, &ctx.extents);
+ goto err_free_idata;
- return erofs_commit_compressed_file(&ctx, bh, blkaddr,
- sctx.blkaddr - blkaddr);
+ list_splice_tail(&sctx.extents, &ictx->extents);
+ ret = erofs_commit_compressed_file(ictx, bh, blkaddr,
+ sctx.blkaddr - blkaddr);
+ goto out;
-err_bdrop:
- erofs_bdrop(bh, true); /* revoke buffer */
err_free_idata:
+ erofs_bdrop(bh, true); /* revoke buffer */
if (inode->idata) {
free(inode->idata);
inode->idata = NULL;
}
+out:
+#ifdef EROFS_MT_ENABLED
+ pthread_mutex_lock(&ictx->mutex);
+ ictx->seg_num = 0;
+ pthread_cond_signal(&ictx->cond);
+ pthread_mutex_unlock(&ictx->mutex);
+#endif
return ret;
}
@@ -1666,6 +1726,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
z_erofs_mt_wq_tls_free);
z_erofs_mt_enabled = !ret;
}
+ pthread_mutex_init(&g_ictx.mutex, NULL);
+ pthread_cond_init(&g_ictx.cond, NULL);
#endif
return 0;
}
diff --git a/lib/inode.c b/lib/inode.c
index 1ff05e1..0d044f4 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -499,10 +499,15 @@ int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
DBG_BUGON(!inode->i_size);
if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
+ void *ictx;
int ret;
- ret = erofs_write_compressed_file(inode, fd, fpos);
- if (!ret || ret != -ENOSPC)
+ ictx = erofs_begin_compressed_file(inode, fd, fpos);
+ if (IS_ERR(ictx))
+ return PTR_ERR(ictx);
+
+ ret = erofs_write_compressed_file(ictx);
+ if (ret != -ENOSPC)
return ret;
if (lseek(fd, fpos, SEEK_SET) < 0)
@@ -1362,6 +1367,7 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
{
struct stat st;
struct erofs_inode *inode;
+ void *ictx;
int ret;
ret = lseek(fd, 0, SEEK_SET);
@@ -1392,7 +1398,12 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
inode->nid = inode->sbi->packed_nid;
}
- ret = erofs_write_compressed_file(inode, fd, 0);
+ ictx = erofs_begin_compressed_file(inode, fd, 0);
+ if (IS_ERR(ictx))
+ return ERR_CAST(ictx);
+
+ DBG_BUGON(!ictx);
+ ret = erofs_write_compressed_file(ictx);
if (ret == -ENOSPC) {
ret = lseek(fd, 0, SEEK_SET);
if (ret < 0)
--
2.30.2
More information about the Linux-erofs
mailing list