[PATCH v3 2/2] erofs-utils: mkfs: implement multi-threaded fragments
Gao Xiang
hsiangkao at linux.alibaba.com
Sun Mar 23 15:34:51 AEDT 2025
Currently, only `-Eall-fragments` is allowed for multi-threaded
compression. However, in many cases, we don't want the entire file
merged into the packed inode, as it may impact runtime performance.
Let's implement multi-threaded compression for `-Efragments` now,
although it's still not very fast and need to optimize performance
even further for this option.
Note that the image sizes could be larger without `-Ededupe` compared
to `-Eall-fragments` since the head parts aren't deduplicated for now.
Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
---
change since v2:
- fix up broken `-Ededupe` fallback.
lib/compress.c | 137 +++++++++++++++++++++++++++++++-----------------
lib/fragments.c | 14 ++---
lib/inode.c | 2 +-
3 files changed, 93 insertions(+), 60 deletions(-)
diff --git a/lib/compress.c b/lib/compress.c
index 0b48c06..32f58b5 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -110,9 +110,10 @@ struct erofs_compress_work {
};
static struct {
- struct erofs_workqueue wq;
+ struct erofs_workqueue wq, fwq;
struct erofs_compress_work *idle;
pthread_mutex_t mutex;
+ bool hasfwq;
} z_erofs_mt_ctrl;
#endif
@@ -577,11 +578,11 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
if (len <= ctx->pclustersize) {
if (!final || !len)
return 1;
- if (inode->fragment_size && !ictx->fix_dedupedfrag) {
- ctx->pclustersize = roundup(len, blksz);
- goto fix_dedupedfrag;
- }
if (may_packing) {
+ if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+ ctx->pclustersize = roundup(len, blksz);
+ goto fix_dedupedfrag;
+ }
e->length = len;
goto frag_packing;
}
@@ -1056,7 +1057,22 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
u64 offset, erofs_blk_t blkaddr)
{
struct z_erofs_compress_ictx *ictx = ctx->ictx;
+ struct erofs_inode *inode = ictx->inode;
+ bool frag = cfg.c_fragments && !erofs_is_packed_inode(inode) &&
+ ctx->seg_idx >= ictx->seg_num - 1;
int fd = ictx->fd;
+ int ret;
+
+ DBG_BUGON(offset != -1 && frag && inode->fragment_size);
+ if (offset != -1 && frag && !inode->fragment_size &&
+ cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+ ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
+ if (ret < 0)
+ return ret;
+ if (inode->fragment_size > ctx->remaining)
+ inode->fragment_size = ctx->remaining;
+ ctx->remaining -= inode->fragment_size;
+ }
ctx->blkaddr = blkaddr;
while (ctx->remaining) {
@@ -1088,8 +1104,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
}
/* generate an extra extent for the deduplicated fragment */
- if (ctx->seg_idx >= ictx->seg_num - 1 &&
- ictx->inode->fragment_size && !ictx->fragemitted) {
+ if (frag && inode->fragment_size && !ictx->fragemitted) {
struct z_erofs_extent_item *ei;
ei = malloc(sizeof(*ei));
@@ -1097,7 +1112,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
return -ENOMEM;
ei->e = (struct z_erofs_inmem_extent) {
- .length = ictx->inode->fragment_size,
+ .length = inode->fragment_size,
.compressedblks = 0,
.raw = false,
.partial = false,
@@ -1207,6 +1222,8 @@ err_free_idata:
return ret;
}
+static struct z_erofs_compress_ictx g_ictx;
+
#ifdef EROFS_MT_ENABLED
void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
{
@@ -1354,9 +1371,12 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
struct erofs_compress_work *cur, *head = NULL, **last = &head;
struct erofs_compress_cfg *ccfg = ictx->ccfg;
struct erofs_inode *inode = ictx->inode;
- int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mkfs_segment_size);
- int i;
+ unsigned int segsz = cfg.c_mkfs_segment_size;
+ int nsegs, i;
+ nsegs = DIV_ROUND_UP(inode->i_size - inode->fragment_size, segsz);
+ if (!nsegs)
+ nsegs = 1;
ictx->seg_num = nsegs;
pthread_mutex_init(&ictx->mutex, NULL);
pthread_cond_init(&ictx->cond, NULL);
@@ -1385,13 +1405,6 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
};
init_list_head(&cur->ctx.extents);
- if (i == nsegs - 1)
- cur->ctx.remaining = inode->i_size -
- inode->fragment_size -
- i * cfg.c_mkfs_segment_size;
- else
- cur->ctx.remaining = cfg.c_mkfs_segment_size;
-
cur->alg_id = ccfg->handle.alg->id;
cur->alg_name = ccfg->handle.alg->name;
cur->comp_level = ccfg->handle.compression_level;
@@ -1399,6 +1412,17 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
cur->errcode = 1; /* mark as "in progress" */
cur->work.fn = z_erofs_mt_workfn;
+ if (i >= nsegs - 1) {
+ cur->ctx.remaining = inode->i_size -
+ inode->fragment_size - (u64)i * segsz;
+ if (z_erofs_mt_ctrl.hasfwq) {
+ erofs_queue_work(&z_erofs_mt_ctrl.fwq,
+ &cur->work);
+ continue;
+ }
+ } else {
+ cur->ctx.remaining = segsz;
+ }
erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
}
ictx->mtworks = head;
@@ -1460,14 +1484,53 @@ out:
free(ictx);
return ret;
}
-#endif
-static struct z_erofs_compress_ictx g_ictx;
+static int z_erofs_mt_init(void)
+{
+ unsigned int workers = cfg.c_mt_workers;
+ int ret;
+
+ if (workers < 1)
+ return 0;
+ if (workers >= 1 && cfg.c_dedupe) {
+ erofs_warn("multi-threaded dedupe is NOT implemented for now");
+ cfg.c_mt_workers = 0;
+ } else {
+ if (cfg.c_fragments && workers > 1) {
+ ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 32,
+ z_erofs_mt_wq_tls_alloc,
+ z_erofs_mt_wq_tls_free);
+ if (ret)
+ return ret;
+ z_erofs_mt_ctrl.hasfwq = true;
+ --workers;
+ }
+
+ ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers,
+ workers << 2,
+ z_erofs_mt_wq_tls_alloc,
+ z_erofs_mt_wq_tls_free);
+ if (ret)
+ return ret;
+ z_erofs_mt_enabled = true;
+ }
+ pthread_mutex_init(&g_ictx.mutex, NULL);
+ pthread_cond_init(&g_ictx.cond, NULL);
+ return 0;
+}
+#else
+static int z_erofs_mt_init(void)
+{
+ return 0;
+}
+#endif
void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
{
struct erofs_sb_info *sbi = inode->sbi;
struct z_erofs_compress_ictx *ictx;
+ bool all_fragments = cfg.c_all_fragments &&
+ !erofs_is_packed_inode(inode);
int ret;
/* initialize per-file compression setting */
@@ -1502,8 +1565,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
inode->idata_size = 0;
inode->fragment_size = 0;
- if (!z_erofs_mt_enabled ||
- (cfg.c_all_fragments && !erofs_is_packed_inode(inode))) {
+ if (!z_erofs_mt_enabled || all_fragments) {
#ifdef EROFS_MT_ENABLED
pthread_mutex_lock(&g_ictx.mutex);
if (g_ictx.seg_num)
@@ -1529,7 +1591,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
* parts into the packed inode.
*/
if (cfg.c_fragments && !erofs_is_packed_inode(inode) &&
- cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+ ictx == &g_ictx && cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
if (ret < 0)
goto err_free_ictx;
@@ -1547,8 +1609,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
ictx->fix_dedupedfrag = false;
ictx->fragemitted = false;
- if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
- !inode->fragment_size) {
+ if (all_fragments && !inode->fragment_size) {
ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
if (ret)
goto err_free_idata;
@@ -1819,30 +1880,7 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
}
z_erofs_mt_enabled = false;
-#ifdef EROFS_MT_ENABLED
- if (cfg.c_mt_workers >= 1 && (cfg.c_dedupe ||
- (cfg.c_fragments && !cfg.c_all_fragments))) {
- if (cfg.c_dedupe)
- erofs_warn("multi-threaded dedupe is NOT implemented for now");
- if (cfg.c_fragments)
- erofs_warn("multi-threaded fragments is NOT implemented for now");
- cfg.c_mt_workers = 0;
- }
-
- if (cfg.c_mt_workers >= 1) {
- ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
- cfg.c_mt_workers,
- cfg.c_mt_workers << 2,
- z_erofs_mt_wq_tls_alloc,
- z_erofs_mt_wq_tls_free);
- if (ret)
- return ret;
- z_erofs_mt_enabled = true;
- }
- pthread_mutex_init(&g_ictx.mutex, NULL);
- pthread_cond_init(&g_ictx.cond, NULL);
-#endif
- return 0;
+ return z_erofs_mt_init();
}
int z_erofs_compress_exit(void)
@@ -1858,6 +1896,9 @@ int z_erofs_compress_exit(void)
if (z_erofs_mt_enabled) {
#ifdef EROFS_MT_ENABLED
ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
+ if (ret)
+ return ret;
+ ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq);
if (ret)
return ret;
while (z_erofs_mt_ctrl.idle) {
diff --git a/lib/fragments.c b/lib/fragments.c
index fecebb5..41b9912 100644
--- a/lib/fragments.c
+++ b/lib/fragments.c
@@ -146,21 +146,13 @@ int z_erofs_fragments_dedupe(struct erofs_inode *inode, int fd, u32 *tofcrc)
if (inode->i_size <= EROFS_TOF_HASHLEN)
return 0;
- if (erofs_lseek64(fd, inode->i_size - EROFS_TOF_HASHLEN, SEEK_SET) < 0)
- return -errno;
-
- ret = read(fd, data_to_hash, EROFS_TOF_HASHLEN);
+ ret = pread(fd, data_to_hash, EROFS_TOF_HASHLEN,
+ inode->i_size - EROFS_TOF_HASHLEN);
if (ret != EROFS_TOF_HASHLEN)
return -errno;
*tofcrc = erofs_crc32c(~0, data_to_hash, EROFS_TOF_HASHLEN);
- ret = z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
- if (ret < 0)
- return ret;
- ret = lseek(fd, 0, SEEK_SET);
- if (ret < 0)
- return -errno;
- return 0;
+ return z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
}
static int z_erofs_fragments_dedupe_insert(struct list_head *hash, void *data,
diff --git a/lib/inode.c b/lib/inode.c
index 8c9a8ec..c4edd43 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -1312,7 +1312,7 @@ struct erofs_mkfs_dfops {
bool idle; /* initialize as false before the dfops worker runs */
};
-#define EROFS_MT_QUEUE_SIZE 128
+#define EROFS_MT_QUEUE_SIZE 256
static void erofs_mkfs_flushjobs(struct erofs_sb_info *sbi)
{
--
2.43.5
More information about the Linux-erofs
mailing list