[PATCH v3 4/4] erofs-utils: mkfs: introduce inner-file multi-threaded compression
Gao Xiang
hsiangkao at linux.alibaba.com
Thu Feb 29 02:38:50 AEDT 2024
On 2024/2/28 23:13, Yifan Zhao wrote:
>
> On 2/28/24 17:57, Gao Xiang wrote:
>>
>>
>> On 2024/2/25 22:27, Yifan Zhao wrote:
>>> Currently, the creation of EROFS compressed image creation is
>>> single-threaded, which suffers from performance issues. This patch
>>> attempts to address it by compressing the large file in parallel.
>>>
>>> Specifically, each input file larger than 16MB is splited into segments,
>>> and each worker thread compresses a segment as if it were a separate
>>> file. Finally, the main thread merges all the compressed segments.
>>>
>>> Multi-threaded compression is not compatible with -Ededupe,
>>> -E(all-)fragments and -Eztailpacking for now.
>>>
>>> Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
>>> Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
>>> ---
>>> include/erofs/compress.h | 1 +
>>> lib/compress.c | 690 ++++++++++++++++++++++++++++++++-------
>>> lib/compressor.c | 2 +
>>> 3 files changed, 575 insertions(+), 118 deletions(-)
>>>
>>> diff --git a/include/erofs/compress.h b/include/erofs/compress.h
>>> index 046640b..2699334 100644
>>> --- a/include/erofs/compress.h
>>> +++ b/include/erofs/compress.h
>>> @@ -15,6 +15,7 @@ extern "C"
>>> #include "internal.h"
>>> #define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
>>> +#define EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
>>> void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
>>> int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
>>> diff --git a/lib/compress.c b/lib/compress.c
>>> index 9611102..f98feae 100644
>>> --- a/lib/compress.c
>>> +++ b/lib/compress.c
>>> @@ -8,6 +8,9 @@
>>> #ifndef _LARGEFILE64_SOURCE
>>> #define _LARGEFILE64_SOURCE
>>> #endif
>>> +#ifndef _GNU_SOURCE
>>> +#define _GNU_SOURCE
>>> +#endif
>>> #include <string.h>
>>> #include <stdlib.h>
>>> #include <unistd.h>
>>> @@ -20,6 +23,16 @@
>>> #include "erofs/block_list.h"
>>> #include "erofs/compress_hints.h"
>>> #include "erofs/fragments.h"
>>> +#ifdef EROFS_MT_ENABLED
>>> +#include "erofs/workqueue.h"
>>> +#endif
>>> +#ifdef HAVE_LINUX_FALLOC_H
>>> +#include <linux/falloc.h>
>>> +#endif
>>> +
>>> +#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE)
>>> +#define USE_PER_WORKER_TMPFILE 1
>>> +#endif
>>> /* compressing configuration specified by users */
>>> struct erofs_compress_cfg {
>>> @@ -33,29 +46,84 @@ struct z_erofs_extent_item {
>>> struct z_erofs_inmem_extent e;
>>> };
>>> +struct z_erofs_file_compress_ctx {
>>
>> struct z_erofs_compressed_inode_ctx would be better
>>
>>> + struct erofs_inode *inode;
>>> + int fd;
>>> + unsigned int pclustersize;
>>> +
>>> + u32 tof_chksum;
>>> + bool fix_dedupedfrag;
>>> + bool fragemitted;
>>> +};
>>> +
>>> struct z_erofs_vle_compress_ctx {
>>
>> I think we'd better to rename this as
>>
>> struct z_erofs_compressed_segment_ctx
>>
>>> - u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
>>> + struct z_erofs_file_compress_ctx *fctx;
>>> +
>>> + u8 *queue;
>>> struct list_head extents;
>>> struct z_erofs_extent_item *pivot;
>>> - struct erofs_inode *inode;
>>> - struct erofs_compress_cfg *ccfg;
>>> + struct erofs_compress *chandle;
>>> + char *destbuf;
>>> - u8 *metacur;
>>> unsigned int head, tail;
>>> erofs_off_t remaining;
>>> - unsigned int pclustersize;
>>> erofs_blk_t blkaddr; /* pointing to the next blkaddr */
>>> + erofs_blk_t compressed_blocks;
>>> u16 clusterofs;
>>> - u32 tof_chksum;
>>> - bool fix_dedupedfrag;
>>> - bool fragemitted;
>>> + int seg_num, seg_idx;
>>> + FILE *tmpfile;
>>> + off_t tmpfile_off;
>>> +};
>>> +
>>> +struct z_erofs_write_index_ctx {
>>
>> why we need this structure, I'd like to fold it in
>>
>> struct z_erofs_compressed_inode_ctx.
>>
>>> + struct erofs_inode *inode;
>>> + struct list_head *extents;
>>> + u16 clusterofs;
>>> + erofs_blk_t blkaddr, blkoff;
>>
>> I don't like this approach, let's just fix
>> extents->blkaddr in a loop together.
>>
>>> + u8 *metacur;
>>> };
>>> +#ifdef EROFS_MT_ENABLED
>>> +struct erofs_compress_wq_private {
>>> + bool init;
>>> + u8 *queue;
>>> + char *destbuf;
>>> + struct erofs_compress_cfg *ccfg;
>>> + FILE* tmpfile;
>>> +};
>>> +
>>> +struct erofs_compress_work {
>>> + /* Note: struct erofs_work must be the first member */
>>> + struct erofs_work work;
>>> + struct z_erofs_vle_compress_ctx ctx;
>>> +
>>> + unsigned int alg_id;
>>> + char *alg_name;
>>> + unsigned int comp_level;
>>> + unsigned int dict_size;
>>> +
>>> + int ret;
>>> +
>>> + struct erofs_compress_work *next;
>>> +};
>>> +
>>> +static struct {
>>> + struct erofs_workqueue wq;
>>> + struct erofs_compress_work *idle;
>>
>> Does this need a mutex protection?
>>
> I think `idle` is written by the main thread only in this patch and the mutex is unnecessary.
>
>>> + pthread_mutex_t mutex;
>>> + pthread_cond_t cond;
>>> + int nfini;
>>> +} z_erofs_mt_ctrl;
>>> +#endif
>>> +
>>> +static bool z_erofs_mt_enabled;
>>> +static u8 *z_erofs_global_queue;
>>> +
>>> #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0)
>>> -static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
>>> +static void z_erofs_write_indexes_final(struct z_erofs_write_index_ctx *ctx)
>>> {
>>> const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
>>> struct z_erofs_lcluster_index di;
>>> @@ -71,7 +139,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
>>> ctx->metacur += sizeof(di);
>>> }
>>> -static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> +static void z_erofs_write_extent(struct z_erofs_write_index_ctx *ctx,
>>> struct z_erofs_inmem_extent *e)
>>> {
>>> struct erofs_inode *inode = ctx->inode;
>>> @@ -99,10 +167,15 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> di.di_advise = cpu_to_le16(advise);
>>> if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL &&
>>> - !e->compressedblks)
>>> + !e->compressedblks) {
>>> di.di_u.blkaddr = cpu_to_le32(inode->fragmentoff >> 32);
>>> - else
>>> + } else if (z_erofs_mt_enabled) {
>>> + di.di_u.blkaddr =
>>> + cpu_to_le32(ctx->blkaddr + ctx->blkoff);
>>> + ctx->blkoff += e->compressedblks;
>>
>> so we don't need this at all.
>>
>>> + } else {
>>> di.di_u.blkaddr = cpu_to_le32(e->blkaddr);
>>> + }
>>> memcpy(ctx->metacur, &di, sizeof(di));
>>> ctx->metacur += sizeof(di);
>>> @@ -144,10 +217,15 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> Z_EROFS_LCLUSTER_TYPE_HEAD1;
>>> if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL &&
>>> - !e->compressedblks)
>>> + !e->compressedblks) {
>>> di.di_u.blkaddr = cpu_to_le32(inode->fragmentoff >> 32);
>>> - else
>>> + } else if (z_erofs_mt_enabled) {
>>> + di.di_u.blkaddr =
>>> + cpu_to_le32(ctx->blkaddr + ctx->blkoff);
>>> + ctx->blkoff += e->compressedblks;
>>
>> same here.
>>
>>> + } else {
>>> di.di_u.blkaddr = cpu_to_le32(e->blkaddr);
>>> + }
>>> if (e->partial) {
>>> DBG_BUGON(e->raw);
>>> @@ -170,12 +248,12 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> ctx->clusterofs = clusterofs + count;
>>> }
>>> -static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
>>> +static void z_erofs_write_indexes(struct z_erofs_write_index_ctx *ctx)
>>> {
>>> struct z_erofs_extent_item *ei, *n;
>>> ctx->clusterofs = 0;
>>> - list_for_each_entry_safe(ei, n, &ctx->extents, list) {
>>> + list_for_each_entry_safe(ei, n, ctx->extents, list) {
>>> z_erofs_write_extent(ctx, &ei->e);
>>> list_del(&ei->list);
>>> @@ -188,11 +266,12 @@ static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx)
>>> {
>>> const bool final = !ctx->remaining;
>>> unsigned int qh_aligned, qh_after;
>>> + struct erofs_inode *inode = ctx->fctx->inode;
>>> if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ)
>>> return false;
>>> - qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi));
>>> + qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi));
>>> qh_after = ctx->head - qh_aligned;
>>> memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned);
>>> ctx->tail -= qh_aligned;
>>> @@ -212,14 +291,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> list_add_tail(&ei->list, &ctx->extents);
>>> ctx->clusterofs = (ctx->clusterofs + ei->e.length) &
>>> - (erofs_blksiz(ctx->inode->sbi) - 1);
>>> -
>>> + (erofs_blksiz(ctx->fctx->inode->sbi) - 1);
>>> }
>>> static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx,
>>> unsigned int *len)
>>> {
>>> - struct erofs_inode *inode = ctx->inode;
>>> + struct erofs_inode *inode = ctx->fctx->inode;
>>> const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1;
>>> struct erofs_sb_info *sbi = inode->sbi;
>>> struct z_erofs_extent_item *ei = ctx->pivot;
>>> @@ -318,13 +396,14 @@ out:
>>> static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> unsigned int len, char *dst)
>>> {
>>> - struct erofs_sb_info *sbi = ctx->inode->sbi;
>>> + struct erofs_inode *inode = ctx->fctx->inode;
>>> + struct erofs_sb_info *sbi = inode->sbi;
>>> unsigned int count = min(erofs_blksiz(sbi), len);
>>> unsigned int interlaced_offset, rightpart;
>>> int ret;
>>> /* write interlaced uncompressed data if needed */
>>> - if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
>>> + if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
>>> interlaced_offset = ctx->clusterofs;
>>> else
>>> interlaced_offset = 0;
>>> @@ -335,11 +414,19 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
>>> memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
>>> memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
>>> - erofs_dbg("Writing %u uncompressed data to block %u",
>>> - count, ctx->blkaddr);
>>> - ret = blk_write(sbi, dst, ctx->blkaddr, 1);
>>> - if (ret)
>>> - return ret;
>>> + if (ctx->tmpfile) {
>>> + erofs_dbg("Writing %u uncompressed data to tmpfile", count);
>>> + ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
>>> + if (ret != 1)
>>> + return -EIO;
>>> + fflush(ctx->tmpfile);
>>> + } else {
>>> + erofs_dbg("Writing %u uncompressed data to block %u", count,
>>> + ctx->blkaddr);
>>> + ret = blk_write(sbi, dst, ctx->blkaddr, 1);
>>> + if (ret)
>>> + return ret;
>>> + }
>>> return count;
>>> }
>>> @@ -384,8 +471,8 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
>>> void *in, unsigned int *insize,
>>> void *out, unsigned int *compressedsize)
>>> {
>>> - struct erofs_sb_info *sbi = ctx->inode->sbi;
>>> - static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
>>> + struct erofs_sb_info *sbi = ctx->fctx->inode->sbi;
>>> + char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
>>
>> does tryrecompress_trailing() work? if it doesn't work,
>> let's leave the old code as-is.
>>
>>> unsigned int count;
>>> int ret = *compressedsize;
>>> @@ -409,7 +496,7 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
>>> static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
>>> unsigned int len)
>>> {
>>> - struct erofs_inode *inode = ctx->inode;
>>> + struct erofs_inode *inode = ctx->fctx->inode;
>>> struct erofs_sb_info *sbi = inode->sbi;
>>> const unsigned int newsize = ctx->remaining + len;
>>> @@ -417,9 +504,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
>>> /* try to fix again if it gets larger (should be rare) */
>>> if (inode->fragment_size < newsize) {
>>> - ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
>>> - roundup(newsize - inode->fragment_size,
>>> - erofs_blksiz(sbi)));
>>> + ctx->fctx->pclustersize =
>>> + min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
>>> + roundup(newsize - inode->fragment_size,
>>> + erofs_blksiz(sbi)));
>>> return false;
>>> }
>>> @@ -439,26 +527,31 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
>>> static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
>>> struct z_erofs_inmem_extent *e)
>>> {
>>> - static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
>>> - struct erofs_inode *inode = ctx->inode;
>>> + static char
>>> + global_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
>>> + char *dstbuf = ctx->destbuf ? ctx->destbuf : global_dstbuf;
>>
>> char *dstbuf = ctx->destbuf ? : global_dstbuf;
>>
> I believe this grammar is a GCC extension [1] which is not part of the standard. Will it sacrifice portability ?
>
> [1] https://gcc.gnu.org/onlinedocs/gcc/Conditionals.html
Yes, yet I don't care about MSVC or whatever.
GCC and clang both can work. and this expension is
common in the kernel.
Thanks,
Gao Xiang
More information about the Linux-erofs
mailing list