[PATCH v7 5/5] erofs-utils: mkfs: introduce inner-file multi-threaded compression
Noboru Asai
asai at sijam.com
Fri Mar 15 18:18:10 AEDT 2024
I think it is easier to understand the source code if the names of
variables and pointers in the same structure are unified.
struct z_erofs_compress_ictx inode_ctx; // i stands for inode?
struct z_erofs_compress_ictx *ictx;
struct z_erofs_compress_sctx seg_ctx;
struct z_erofs_compress_sctx *sctx;
2024年3月15日(金) 10:11 Gao Xiang <hsiangkao at linux.alibaba.com>:
>
> From: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
>
> Currently, the creation of EROFS compressed image creation is
> single-threaded, which suffers from performance issues. This patch
> attempts to address it by compressing the large file in parallel.
>
> Specifically, each input file larger than 16MB is splited into segments,
> and each worker thread compresses a segment as if it were a separate
> file. Finally, the main thread merges all the compressed segments.
>
> Multi-threaded compression is not compatible with -Ededupe,
> -E(all-)fragments and -Eztailpacking for now.
>
> Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
> Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
> Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
> ---
> v7:
> - support -Eztailpacking;
> - wq_private -> wq_tls;
> - minor updates.
>
> include/erofs/compress.h | 3 +-
> lib/compress.c | 548 ++++++++++++++++++++++++++++++++-------
> lib/compressor.c | 2 +
> mkfs/main.c | 8 +-
> 4 files changed, 464 insertions(+), 97 deletions(-)
>
> diff --git a/include/erofs/compress.h b/include/erofs/compress.h
> index b3272f7..3253611 100644
> --- a/include/erofs/compress.h
> +++ b/include/erofs/compress.h
> @@ -14,7 +14,8 @@ extern "C"
>
> #include "internal.h"
>
> -#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
> +#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
> +#define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
>
> void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
> int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
> diff --git a/lib/compress.c b/lib/compress.c
> index 4101009..0d796c8 100644
> --- a/lib/compress.c
> +++ b/lib/compress.c
> @@ -20,6 +20,9 @@
> #include "erofs/block_list.h"
> #include "erofs/compress_hints.h"
> #include "erofs/fragments.h"
> +#ifdef EROFS_MT_ENABLED
> +#include "erofs/workqueue.h"
> +#endif
>
> /* compressing configuration specified by users */
> struct erofs_compress_cfg {
> @@ -33,29 +36,77 @@ struct z_erofs_extent_item {
> struct z_erofs_inmem_extent e;
> };
>
> -struct z_erofs_vle_compress_ctx {
> - u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
> +struct z_erofs_compress_ictx {
> + struct erofs_inode *inode;
> + int fd;
> + unsigned int pclustersize;
> +
> + u32 tof_chksum;
> + bool fix_dedupedfrag;
> + bool fragemitted;
> +
> + /* fields for write indexes */
> + u8 *metacur;
> + struct list_head extents;
> + u16 clusterofs;
> +};
> +
> +struct z_erofs_compress_sctx { /* segment context */
> + struct z_erofs_compress_ictx *ictx;
> +
> + u8 *queue;
> struct list_head extents;
> struct z_erofs_extent_item *pivot;
>
> - struct erofs_inode *inode;
> - struct erofs_compress_cfg *ccfg;
> + struct erofs_compress *chandle;
> + char *destbuf;
>
> - u8 *metacur;
> unsigned int head, tail;
> erofs_off_t remaining;
> - unsigned int pclustersize;
> erofs_blk_t blkaddr; /* pointing to the next blkaddr */
> u16 clusterofs;
>
> - u32 tof_chksum;
> - bool fix_dedupedfrag;
> - bool fragemitted;
> + int seg_num, seg_idx;
> +
> + void *membuf;
> + erofs_off_t memoff;
> +};
> +
> +#ifdef EROFS_MT_ENABLED
> +struct erofs_compress_wq_tls {
> + u8 *queue;
> + char *destbuf;
> + struct erofs_compress_cfg *ccfg;
> };
>
> +struct erofs_compress_work {
> + /* Note: struct erofs_work must be the first member */
> + struct erofs_work work;
> + struct z_erofs_compress_sctx ctx;
> + struct erofs_compress_work *next;
> +
> + unsigned int alg_id;
> + char *alg_name;
> + unsigned int comp_level;
> + unsigned int dict_size;
> +
> + int errcode;
> +};
> +
> +static struct {
> + struct erofs_workqueue wq;
> + struct erofs_compress_work *idle;
> + pthread_mutex_t mutex;
> + pthread_cond_t cond;
> + int nfini;
> +} z_erofs_mt_ctrl;
> +#endif
> +
> +static bool z_erofs_mt_enabled;
> +
> #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0)
>
> -static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
> +static void z_erofs_write_indexes_final(struct z_erofs_compress_ictx *ctx)
> {
> const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
> struct z_erofs_lcluster_index di;
> @@ -71,7 +122,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
> ctx->metacur += sizeof(di);
> }
>
> -static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
> +static void z_erofs_write_extent(struct z_erofs_compress_ictx *ctx,
> struct z_erofs_inmem_extent *e)
> {
> struct erofs_inode *inode = ctx->inode;
> @@ -170,7 +221,7 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
> ctx->clusterofs = clusterofs + count;
> }
>
> -static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
> +static void z_erofs_write_indexes(struct z_erofs_compress_ictx *ctx)
> {
> struct z_erofs_extent_item *ei, *n;
>
> @@ -184,15 +235,16 @@ static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
> z_erofs_write_indexes_final(ctx);
> }
>
> -static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx)
> +static bool z_erofs_need_refill(struct z_erofs_compress_sctx *ctx)
> {
> const bool final = !ctx->remaining;
> unsigned int qh_aligned, qh_after;
> + struct erofs_inode *inode = ctx->ictx->inode;
>
> if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ)
> return false;
>
> - qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi));
> + qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi));
> qh_after = ctx->head - qh_aligned;
> memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned);
> ctx->tail -= qh_aligned;
> @@ -204,7 +256,7 @@ static struct z_erofs_extent_item dummy_pivot = {
> .e.length = 0
> };
>
> -static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
> +static void z_erofs_commit_extent(struct z_erofs_compress_sctx *ctx,
> struct z_erofs_extent_item *ei)
> {
> if (ei == &dummy_pivot)
> @@ -212,14 +264,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
>
> list_add_tail(&ei->list, &ctx->extents);
> ctx->clusterofs = (ctx->clusterofs + ei->e.length) &
> - (erofs_blksiz(ctx->inode->sbi) - 1);
> -
> + (erofs_blksiz(ctx->ictx->inode->sbi) - 1);
> }
>
> -static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx,
> +static int z_erofs_compress_dedupe(struct z_erofs_compress_sctx *ctx,
> unsigned int *len)
> {
> - struct erofs_inode *inode = ctx->inode;
> + struct erofs_inode *inode = ctx->ictx->inode;
> const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1;
> struct erofs_sb_info *sbi = inode->sbi;
> struct z_erofs_extent_item *ei = ctx->pivot;
> @@ -315,16 +366,17 @@ out:
> return 0;
> }
>
> -static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
> +static int write_uncompressed_extent(struct z_erofs_compress_sctx *ctx,
> unsigned int len, char *dst)
> {
> - struct erofs_sb_info *sbi = ctx->inode->sbi;
> + struct erofs_inode *inode = ctx->ictx->inode;
> + struct erofs_sb_info *sbi = inode->sbi;
> unsigned int count = min(erofs_blksiz(sbi), len);
> unsigned int interlaced_offset, rightpart;
> int ret;
>
> /* write interlaced uncompressed data if needed */
> - if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
> + if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
> interlaced_offset = ctx->clusterofs;
> else
> interlaced_offset = 0;
> @@ -335,11 +387,17 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
> memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
> memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
>
> - erofs_dbg("Writing %u uncompressed data to block %u",
> - count, ctx->blkaddr);
> - ret = blk_write(sbi, dst, ctx->blkaddr, 1);
> - if (ret)
> - return ret;
> + if (ctx->membuf) {
> + erofs_dbg("Writing %u uncompressed data to membuf", count);
> + memcpy(ctx->membuf + ctx->memoff, dst, erofs_blksiz(sbi));
> + ctx->memoff += erofs_blksiz(sbi);
> + } else {
> + erofs_dbg("Writing %u uncompressed data to block %u", count,
> + ctx->blkaddr);
> + ret = blk_write(sbi, dst, ctx->blkaddr, 1);
> + if (ret)
> + return ret;
> + }
> return count;
> }
>
> @@ -379,12 +437,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode *inode, void *data,
> return len;
> }
>
> -static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
> +static void tryrecompress_trailing(struct z_erofs_compress_sctx *ctx,
> struct erofs_compress *ec,
> void *in, unsigned int *insize,
> void *out, unsigned int *compressedsize)
> {
> - struct erofs_sb_info *sbi = ctx->inode->sbi;
> + struct erofs_sb_info *sbi = ctx->ictx->inode->sbi;
> static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
> unsigned int count;
> int ret = *compressedsize;
> @@ -406,10 +464,11 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
> *compressedsize = ret;
> }
>
> -static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
> +static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compress_sctx *ctx,
> unsigned int len)
> {
> - struct erofs_inode *inode = ctx->inode;
> + struct z_erofs_compress_ictx *ictx = ctx->ictx;
> + struct erofs_inode *inode = ictx->inode;
> struct erofs_sb_info *sbi = inode->sbi;
> const unsigned int newsize = ctx->remaining + len;
>
> @@ -417,9 +476,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
>
> /* try to fix again if it gets larger (should be rare) */
> if (inode->fragment_size < newsize) {
> - ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
> - roundup(newsize - inode->fragment_size,
> - erofs_blksiz(sbi)));
> + ictx->pclustersize = min_t(erofs_off_t,
> + z_erofs_get_max_pclustersize(inode),
> + roundup(newsize - inode->fragment_size,
> + erofs_blksiz(sbi)));
> return false;
> }
>
> @@ -436,29 +496,32 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
> return true;
> }
>
> -static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
> +static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx,
> struct z_erofs_inmem_extent *e)
> {
> - static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
> - struct erofs_inode *inode = ctx->inode;
> + static char g_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
> + char *dstbuf = ctx->destbuf ?: g_dstbuf;
> + struct z_erofs_compress_ictx *ictx = ctx->ictx;
> + struct erofs_inode *inode = ictx->inode;
> struct erofs_sb_info *sbi = inode->sbi;
> unsigned int blksz = erofs_blksiz(sbi);
> char *const dst = dstbuf + blksz;
> - struct erofs_compress *const h = &ctx->ccfg->handle;
> + struct erofs_compress *const h = ctx->chandle;
> unsigned int len = ctx->tail - ctx->head;
> bool is_packed_inode = erofs_is_packed_inode(inode);
> bool final = !ctx->remaining;
> - bool may_packing = (cfg.c_fragments && final && !is_packed_inode);
> + bool may_packing = (cfg.c_fragments && final && !is_packed_inode &&
> + !z_erofs_mt_enabled);
> bool may_inline = (cfg.c_ztailpacking && final && !may_packing);
> unsigned int compressedsize;
> int ret;
>
> - if (len <= ctx->pclustersize) {
> + if (len <= ictx->pclustersize) {
> if (!final || !len)
> return 1;
> if (may_packing) {
> - if (inode->fragment_size && !ctx->fix_dedupedfrag) {
> - ctx->pclustersize = roundup(len, blksz);
> + if (inode->fragment_size && !ictx->fix_dedupedfrag) {
> + ictx->pclustersize = roundup(len, blksz);
> goto fix_dedupedfrag;
> }
> e->length = len;
> @@ -470,7 +533,7 @@ static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
>
> e->length = min(len, cfg.c_max_decompressed_extent_bytes);
> ret = erofs_compress_destsize(h, ctx->queue + ctx->head,
> - &e->length, dst, ctx->pclustersize);
> + &e->length, dst, ictx->pclustersize);
> if (ret <= 0) {
> erofs_err("failed to compress %s: %s", inode->i_srcpath,
> erofs_strerror(ret));
> @@ -507,16 +570,16 @@ nocompression:
> e->compressedblks = 1;
> e->raw = true;
> } else if (may_packing && len == e->length &&
> - compressedsize < ctx->pclustersize &&
> - (!inode->fragment_size || ctx->fix_dedupedfrag)) {
> + compressedsize < ictx->pclustersize &&
> + (!inode->fragment_size || ictx->fix_dedupedfrag)) {
> frag_packing:
> ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head,
> - len, ctx->tof_chksum);
> + len, ictx->tof_chksum);
> if (ret < 0)
> return ret;
> e->compressedblks = 0; /* indicate a fragment */
> e->raw = false;
> - ctx->fragemitted = true;
> + ictx->fragemitted = true;
> /* tailpcluster should be less than 1 block */
> } else if (may_inline && len == e->length && compressedsize < blksz) {
> if (ctx->clusterofs + len <= blksz) {
> @@ -545,8 +608,8 @@ frag_packing:
> */
> if (may_packing && len == e->length &&
> (compressedsize & (blksz - 1)) &&
> - ctx->tail < sizeof(ctx->queue)) {
> - ctx->pclustersize = roundup(compressedsize, blksz);
> + ctx->tail < Z_EROFS_COMPR_QUEUE_SZ) {
> + ictx->pclustersize = roundup(compressedsize, blksz);
> goto fix_dedupedfrag;
> }
>
> @@ -569,34 +632,45 @@ frag_packing:
> }
>
> /* write compressed data */
> - erofs_dbg("Writing %u compressed data to %u of %u blocks",
> - e->length, ctx->blkaddr, e->compressedblks);
> + if (ctx->membuf) {
> + erofs_off_t sz = e->compressedblks * blksz;
> + erofs_dbg("Writing %u compressed data to membuf of %u blocks",
> + e->length, e->compressedblks);
>
> - ret = blk_write(sbi, dst - padding, ctx->blkaddr,
> - e->compressedblks);
> - if (ret)
> - return ret;
> + memcpy(ctx->membuf + ctx->memoff, dst - padding, sz);
> + ctx->memoff += sz;
> + } else {
> + erofs_dbg("Writing %u compressed data to %u of %u blocks",
> + e->length, ctx->blkaddr, e->compressedblks);
> +
> + ret = blk_write(sbi, dst - padding, ctx->blkaddr,
> + e->compressedblks);
> + if (ret)
> + return ret;
> + }
> e->raw = false;
> may_inline = false;
> may_packing = false;
> }
> e->partial = false;
> e->blkaddr = ctx->blkaddr;
> + if (ctx->blkaddr != EROFS_NULL_ADDR)
> + ctx->blkaddr += e->compressedblks;
> if (!may_inline && !may_packing && !is_packed_inode)
> (void)z_erofs_dedupe_insert(e, ctx->queue + ctx->head);
> - ctx->blkaddr += e->compressedblks;
> ctx->head += e->length;
> return 0;
>
> fix_dedupedfrag:
> DBG_BUGON(!inode->fragment_size);
> ctx->remaining += inode->fragment_size;
> - ctx->fix_dedupedfrag = true;
> + ictx->fix_dedupedfrag = true;
> return 1;
> }
>
> -static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
> +static int z_erofs_compress_one(struct z_erofs_compress_sctx *ctx)
> {
> + struct z_erofs_compress_ictx *ictx = ctx->ictx;
> unsigned int len = ctx->tail - ctx->head;
> struct z_erofs_extent_item *ei;
>
> @@ -624,7 +698,7 @@ static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
>
> len -= ei->e.length;
> ctx->pivot = ei;
> - if (ctx->fix_dedupedfrag && !ctx->fragemitted &&
> + if (ictx->fix_dedupedfrag && !ictx->fragemitted &&
> z_erofs_fixup_deduped_fragment(ctx, len))
> break;
>
> @@ -912,13 +986,268 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode)
> inode->eof_tailraw = NULL;
> }
>
> +int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
> + u64 offset, erofs_blk_t blkaddr)
> +{
> + int fd = ctx->ictx->fd;
> +
> + ctx->blkaddr = blkaddr;
> + while (ctx->remaining) {
> + const u64 rx = min_t(u64, ctx->remaining,
> + Z_EROFS_COMPR_QUEUE_SZ - ctx->tail);
> + int ret;
> +
> + ret = (offset == -1 ?
> + read(fd, ctx->queue + ctx->tail, rx) :
> + pread(fd, ctx->queue + ctx->tail, rx, offset));
> + if (ret != rx)
> + return -errno;
> +
> + ctx->remaining -= rx;
> + ctx->tail += rx;
> + if (offset != -1)
> + offset += rx;
> +
> + ret = z_erofs_compress_one(ctx);
> + if (ret)
> + return ret;
> + }
> + DBG_BUGON(ctx->head != ctx->tail);
> +
> + if (ctx->pivot) {
> + z_erofs_commit_extent(ctx, ctx->pivot);
> + ctx->pivot = NULL;
> + }
> + return 0;
> +}
> +
> +#ifdef EROFS_MT_ENABLED
> +void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
> +{
> + struct erofs_compress_wq_tls *tls;
> +
> + tls = calloc(1, sizeof(*tls));
> + if (!tls)
> + return NULL;
> +
> + tls->queue = malloc(Z_EROFS_COMPR_QUEUE_SZ);
> + if (!tls->queue)
> + goto err_free_priv;
> +
> + tls->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ +
> + EROFS_MAX_BLOCK_SIZE);
> + if (!tls->destbuf)
> + goto err_free_queue;
> +
> + tls->ccfg = calloc(EROFS_MAX_COMPR_CFGS, sizeof(*tls->ccfg));
> + if (!tls->ccfg)
> + goto err_free_destbuf;
> + return tls;
> +
> +err_free_destbuf:
> + free(tls->destbuf);
> +err_free_queue:
> + free(tls->queue);
> +err_free_priv:
> + free(tls);
> + return NULL;
> +}
> +
> +int z_erofs_mt_wq_tls_init_compr(struct erofs_sb_info *sbi,
> + struct erofs_compress_wq_tls *tls,
> + unsigned int alg_id, char *alg_name,
> + unsigned int comp_level,
> + unsigned int dict_size)
> +{
> + struct erofs_compress_cfg *lc = &tls->ccfg[alg_id];
> + int ret;
> +
> + if (likely(lc->enable))
> + return 0;
> +
> + ret = erofs_compressor_init(sbi, &lc->handle, alg_name,
> + comp_level, dict_size);
> + if (ret)
> + return ret;
> + lc->algorithmtype = alg_id;
> + lc->enable = true;
> + return 0;
> +}
> +
> +void *z_erofs_mt_wq_tls_free(struct erofs_workqueue *wq, void *priv)
> +{
> + struct erofs_compress_wq_tls *tls = priv;
> + int i;
> +
> + for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++)
> + if (tls->ccfg[i].enable)
> + erofs_compressor_exit(&tls->ccfg[i].handle);
> +
> + free(tls->ccfg);
> + free(tls->destbuf);
> + free(tls->queue);
> + free(tls);
> + return NULL;
> +}
> +
> +void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
> +{
> + struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
> + struct erofs_compress_wq_tls *tls = tlsp;
> + struct z_erofs_compress_sctx *ctx = &cwork->ctx;
> + u64 offset = ctx->seg_idx * cfg.c_segment_size;
> + int ret = 0;
> +
> + ret = z_erofs_mt_wq_tls_init_compr(ctx->ictx->inode->sbi, tls,
> + cwork->alg_id, cwork->alg_name,
> + cwork->comp_level,
> + cwork->dict_size);
> + if (ret)
> + goto out;
> +
> + ctx->queue = tls->queue;
> + ctx->destbuf = tls->destbuf;
> + ctx->chandle = &tls->ccfg[cwork->alg_id].handle;
> +
> + ctx->membuf = malloc(ctx->remaining);
> + if (!ctx->membuf) {
> + ret = -ENOMEM;
> + goto out;
> + }
> + ctx->memoff = 0;
> +
> + ret = z_erofs_compress_segment(ctx, offset, EROFS_NULL_ADDR);
> +
> +out:
> + cwork->errcode = ret;
> + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
> + ++z_erofs_mt_ctrl.nfini;
> + pthread_cond_signal(&z_erofs_mt_ctrl.cond);
> + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
> +}
> +
> +int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
> + struct z_erofs_compress_sctx *ctx)
> +{
> + struct z_erofs_extent_item *ei, *n;
> + struct erofs_sb_info *sbi = ictx->inode->sbi;
> + erofs_blk_t blkoff = 0;
> + int ret = 0, ret2;
> +
> + list_for_each_entry_safe(ei, n, &ctx->extents, list) {
> + list_del(&ei->list);
> + list_add_tail(&ei->list, &ictx->extents);
> +
> + if (ei->e.blkaddr != EROFS_NULL_ADDR) /* deduped extents */
> + continue;
> +
> + ei->e.blkaddr = ctx->blkaddr;
> + ctx->blkaddr += ei->e.compressedblks;
> +
> + ret2 = blk_write(sbi, ctx->membuf + blkoff * erofs_blksiz(sbi),
> + ei->e.blkaddr, ei->e.compressedblks);
> + blkoff += ei->e.compressedblks;
> + if (ret2) {
> + ret = ret2;
> + continue;
> + }
> + }
> + free(ctx->membuf);
> + return ret;
> +}
> +
> +int z_erofs_mt_compress(struct z_erofs_compress_ictx *ctx,
> + struct erofs_compress_cfg *ccfg,
> + erofs_blk_t blkaddr,
> + erofs_blk_t *compressed_blocks)
> +{
> + struct erofs_compress_work *cur, *head = NULL, **last = &head;
> + struct erofs_inode *inode = ctx->inode;
> + int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
> + int ret, i;
> +
> + z_erofs_mt_ctrl.nfini = 0;
> +
> + for (i = 0; i < nsegs; i++) {
> + if (z_erofs_mt_ctrl.idle) {
> + cur = z_erofs_mt_ctrl.idle;
> + z_erofs_mt_ctrl.idle = cur->next;
> + cur->next = NULL;
> + } else {
> + cur = calloc(1, sizeof(*cur));
> + if (!cur)
> + return -ENOMEM;
> + }
> + *last = cur;
> + last = &cur->next;
> +
> + cur->ctx = (struct z_erofs_compress_sctx) {
> + .ictx = ctx,
> + .seg_num = nsegs,
> + .seg_idx = i,
> + .pivot = &dummy_pivot,
> + };
> + init_list_head(&cur->ctx.extents);
> +
> + if (i == nsegs - 1)
> + cur->ctx.remaining = inode->i_size -
> + inode->fragment_size -
> + i * cfg.c_segment_size;
> + else
> + cur->ctx.remaining = cfg.c_segment_size;
> +
> + cur->alg_id = ccfg->handle.alg->id;
> + cur->alg_name = ccfg->handle.alg->name;
> + cur->comp_level = ccfg->handle.compression_level;
> + cur->dict_size = ccfg->handle.dict_size;
> +
> + cur->work.fn = z_erofs_mt_workfn;
> + erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
> + }
> +
> + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
> + while (z_erofs_mt_ctrl.nfini != nsegs)
> + pthread_cond_wait(&z_erofs_mt_ctrl.cond,
> + &z_erofs_mt_ctrl.mutex);
> + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
> +
> + ret = 0;
> + while (head) {
> + cur = head;
> + head = cur->next;
> +
> + if (cur->errcode) {
> + ret = cur->errcode;
> + } else {
> + int ret2;
> +
> + cur->ctx.blkaddr = blkaddr;
> + ret2 = z_erofs_merge_segment(ctx, &cur->ctx);
> + if (ret2)
> + ret = ret2;
> +
> + *compressed_blocks += cur->ctx.blkaddr - blkaddr;
> + blkaddr = cur->ctx.blkaddr;
> + }
> +
> + cur->next = z_erofs_mt_ctrl.idle;
> + z_erofs_mt_ctrl.idle = cur;
> + }
> + return ret;
> +}
> +#endif
> +
> int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> {
> + static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
> struct erofs_buffer_head *bh;
> - static struct z_erofs_vle_compress_ctx ctx;
> - erofs_blk_t blkaddr, compressed_blocks;
> + static struct z_erofs_compress_ictx ctx;
> + static struct z_erofs_compress_sctx sctx;
> + struct erofs_compress_cfg *ccfg;
> + erofs_blk_t blkaddr, compressed_blocks = 0;
> unsigned int legacymetasize;
> int ret;
> + bool ismt = false;
> struct erofs_sb_info *sbi = inode->sbi;
> u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
> sizeof(struct z_erofs_lcluster_index) +
> @@ -963,8 +1292,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> }
> }
> #endif
> - ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
> - inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype;
> + ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
> + inode->z_algorithmtype[0] = ccfg[0].algorithmtype;
> inode->z_algorithmtype[1] = 0;
>
> inode->idata_size = 0;
> @@ -983,50 +1312,45 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
> ctx.inode = inode;
> ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
> - ctx.blkaddr = blkaddr;
> ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
> - ctx.head = ctx.tail = 0;
> - ctx.clusterofs = 0;
> - ctx.pivot = &dummy_pivot;
> init_list_head(&ctx.extents);
> - ctx.remaining = inode->i_size - inode->fragment_size;
> + ctx.fd = fd;
> ctx.fix_dedupedfrag = false;
> ctx.fragemitted = false;
> + sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, };
> + init_list_head(&sctx.extents);
> +
> if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
> !inode->fragment_size) {
> ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
> if (ret)
> goto err_free_idata;
> +#ifdef EROFS_MT_ENABLED
> + } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
> + ismt = true;
> + ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks);
> + if (ret)
> + goto err_free_idata;
> +#endif
> } else {
> - while (ctx.remaining) {
> - const u64 rx = min_t(u64, ctx.remaining,
> - sizeof(ctx.queue) - ctx.tail);
> -
> - ret = read(fd, ctx.queue + ctx.tail, rx);
> - if (ret != rx) {
> - ret = -errno;
> - goto err_bdrop;
> - }
> - ctx.remaining -= rx;
> - ctx.tail += rx;
> -
> - ret = z_erofs_compress_one(&ctx);
> - if (ret)
> - goto err_free_idata;
> - }
> + sctx.queue = g_queue;
> + sctx.destbuf = NULL;
> + sctx.chandle = &ccfg->handle;
> + sctx.remaining = inode->i_size - inode->fragment_size;
> + sctx.seg_num = 1;
> + sctx.seg_idx = 0;
> + sctx.pivot = &dummy_pivot;
> +
> + ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
> + if (ret)
> + goto err_free_idata;
> + compressed_blocks = sctx.blkaddr - blkaddr;
> }
> - DBG_BUGON(ctx.head != ctx.tail);
>
> /* fall back to no compression mode */
> - compressed_blocks = ctx.blkaddr - blkaddr;
> DBG_BUGON(compressed_blocks < !!inode->idata_size);
> compressed_blocks -= !!inode->idata_size;
>
> - if (ctx.pivot) {
> - z_erofs_commit_extent(&ctx, ctx.pivot);
> - ctx.pivot = NULL;
> - }
> -
> /* generate an extent for the deduplicated fragment */
> if (inode->fragment_size && !ctx.fragemitted) {
> struct z_erofs_extent_item *ei;
> @@ -1042,13 +1366,16 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
> .compressedblks = 0,
> .raw = false,
> .partial = false,
> - .blkaddr = ctx.blkaddr,
> + .blkaddr = sctx.blkaddr,
> };
> init_list_head(&ei->list);
> - z_erofs_commit_extent(&ctx, ei);
> + z_erofs_commit_extent(&sctx, ei);
> }
> z_erofs_fragments_commit(inode);
>
> + if (!ismt)
> + list_splice_tail(&sctx.extents, &ctx.extents);
> +
> z_erofs_write_indexes(&ctx);
> legacymetasize = ctx.metacur - compressmeta;
> /* estimate if data compression saves space or not */
> @@ -1257,8 +1584,25 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
> return -EINVAL;
> }
>
> - if (erofs_sb_has_compr_cfgs(sbi))
> - return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
> + if (erofs_sb_has_compr_cfgs(sbi)) {
> + ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
> + if (ret)
> + return ret;
> + }
> +
> + z_erofs_mt_enabled = false;
> +#ifdef EROFS_MT_ENABLED
> + if (cfg.c_mt_workers > 1) {
> + pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
> + pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
> + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
> + cfg.c_mt_workers,
> + cfg.c_mt_workers << 2,
> + z_erofs_mt_wq_tls_alloc,
> + z_erofs_mt_wq_tls_free);
> + z_erofs_mt_enabled = !ret;
> + }
> +#endif
> return 0;
> }
>
> @@ -1271,5 +1615,19 @@ int z_erofs_compress_exit(void)
> if (ret)
> return ret;
> }
> +
> + if (z_erofs_mt_enabled) {
> +#ifdef EROFS_MT_ENABLED
> + ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
> + if (ret)
> + return ret;
> + while (z_erofs_mt_ctrl.idle) {
> + struct erofs_compress_work *tmp =
> + z_erofs_mt_ctrl.idle->next;
> + free(z_erofs_mt_ctrl.idle);
> + z_erofs_mt_ctrl.idle = tmp;
> + }
> +#endif
> + }
> return 0;
> }
> diff --git a/lib/compressor.c b/lib/compressor.c
> index 58eae2a..175259e 100644
> --- a/lib/compressor.c
> +++ b/lib/compressor.c
> @@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
>
> /* should be written in "minimum compression ratio * 100" */
> c->compress_threshold = 100;
> + c->compression_level = -1;
> + c->dict_size = 0;
>
> if (!alg_name) {
> c->alg = NULL;
> diff --git a/mkfs/main.c b/mkfs/main.c
> index 126a049..5dbaf9f 100644
> --- a/mkfs/main.c
> +++ b/mkfs/main.c
> @@ -678,7 +678,7 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
>
> processors = erofs_get_available_processors();
> if (cfg.c_mt_workers > processors)
> - erofs_warn("the number of workers %d is more than the number of processors %d, performance may be impacted.",
> + erofs_warn("%d workers exceed %d processors, potentially impacting performance.",
> cfg.c_mt_workers, processors);
> break;
> }
> @@ -838,6 +838,12 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
> }
> cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits;
> }
> +#ifdef EROFS_MT_ENABLED
> + if (cfg.c_mt_workers > 1 && (cfg.c_dedupe || cfg.c_fragments)) {
> + erofs_warn("Note that dedupe/fragments are NOT supported in multi-threaded mode for now, reseting --workers=1.");
> + cfg.c_mt_workers = 1;
> + }
> +#endif
> return 0;
> }
>
> --
> 2.39.3
>
More information about the Linux-erofs
mailing list