[PATCH v3 2/2] erofs-utils: mkfs: introduce inter-file multi-threaded compression
Yifan Zhao
zhaoyifan at sjtu.edu.cn
Sun Mar 24 04:31:57 AEDT 2024
On 2024/3/23 11:46, Huang Jianan wrote:
> Yifan Zhao <zhaoyifan at sjtu.edu.cn> 于2024年3月22日周五 18:25写道:
>> This patch allows parallelizing the compression process of different
>> files in mkfs. Specifically, a traverser thread traverses the files and
>> issues the compression task, which is handled by the workers. Then, the
>> main thread consumes them and writes the compressed data to the device.
>>
>> To this end, the logic of erofs_write_compressed_file() has been
>> modified to split the creation and completion logic of the compression
>> task.
>>
>> Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
>> Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
>> ---
>> include/erofs/compress.h | 16 ++
>> include/erofs/inode.h | 17 ++
>> include/erofs/internal.h | 3 +
>> lib/compress.c | 336 +++++++++++++++++++++++++--------------
>> lib/inode.c | 258 ++++++++++++++++++++++++++++--
>> 5 files changed, 503 insertions(+), 127 deletions(-)
>>
>> diff --git a/include/erofs/compress.h b/include/erofs/compress.h
>> index 871db54..8d5a54b 100644
>> --- a/include/erofs/compress.h
>> +++ b/include/erofs/compress.h
>> @@ -17,6 +17,22 @@ extern "C"
>> #define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
>> #define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
>>
>> +#ifdef EROFS_MT_ENABLED
>> +struct z_erofs_mt_file {
>> + pthread_mutex_t mutex;
>> + pthread_cond_t cond;
>> + int total;
>> + int nfini;
>> +
>> + int fd;
>> + struct erofs_compress_work *head;
>> +
>> + struct z_erofs_mt_file *next;
>> +};
>> +
>> +int z_erofs_mt_reap(struct z_erofs_mt_file *desc);
>> +#endif
>> +
>> void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
>> int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
>>
>> diff --git a/include/erofs/inode.h b/include/erofs/inode.h
>> index d5a732a..101ff59 100644
>> --- a/include/erofs/inode.h
>> +++ b/include/erofs/inode.h
>> @@ -41,6 +41,23 @@ struct erofs_inode *erofs_new_inode(void);
>> struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path);
>> struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name);
>>
>> +#ifdef EROFS_MT_ENABLED
>> +struct erofs_inode_fifo {
>> + pthread_mutex_t lock;
>> + pthread_cond_t full, empty;
>> +
>> + void *buf;
>> +
>> + size_t size, elem_size;
>> + size_t head, tail;
>> +};
>> +
>> +struct erofs_inode_fifo *erofs_alloc_inode_fifo(size_t size, size_t elem_size);
>> +void erofs_push_inode_fifo(struct erofs_inode_fifo *q, void *elem);
>> +void *erofs_pop_inode_fifo(struct erofs_inode_fifo *q);
>> +void erofs_destroy_inode_fifo(struct erofs_inode_fifo *q);
>> +#endif
>> +
>> #ifdef __cplusplus
>> }
>> #endif
>> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
>> index 4cd2059..2580588 100644
>> --- a/include/erofs/internal.h
>> +++ b/include/erofs/internal.h
>> @@ -250,6 +250,9 @@ struct erofs_inode {
>> #ifdef WITH_ANDROID
>> uint64_t capabilities;
>> #endif
>> +#ifdef EROFS_MT_ENABLED
>> + struct z_erofs_mt_file *mt_desc;
>> +#endif
>> };
>>
>> static inline erofs_off_t erofs_iloc(struct erofs_inode *inode)
>> diff --git a/lib/compress.c b/lib/compress.c
>> index e064293..d89e404 100644
>> --- a/lib/compress.c
>> +++ b/lib/compress.c
>> @@ -85,6 +85,7 @@ struct erofs_compress_work {
>> struct erofs_work work;
>> struct z_erofs_compress_sctx ctx;
>> struct erofs_compress_work *next;
>> + struct z_erofs_mt_file *mtfile_desc;
>>
>> unsigned int alg_id;
>> char *alg_name;
>> @@ -96,14 +97,14 @@ struct erofs_compress_work {
>>
>> static struct {
>> struct erofs_workqueue wq;
>> - struct erofs_compress_work *idle;
>> - pthread_mutex_t mutex;
>> - pthread_cond_t cond;
>> - int nfini;
>> + struct erofs_compress_work *work_idle;
>> + pthread_mutex_t work_mutex;
>> + struct z_erofs_mt_file *file_idle;
>> + pthread_mutex_t file_mutex;
>> } z_erofs_mt_ctrl;
>> #endif
>>
>> -static bool z_erofs_mt_enabled;
>> +bool z_erofs_mt_enabled;
>>
>> #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0)
>>
>> @@ -1025,6 +1026,90 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
>> return 0;
>> }
>>
>> +int z_erofs_finalize_compression(struct z_erofs_compress_ictx *ictx,
>> + struct erofs_buffer_head *bh,
>> + erofs_blk_t blkaddr,
>> + erofs_blk_t compressed_blocks)
>> +{
>> + struct erofs_inode *inode = ictx->inode;
>> + struct erofs_sb_info *sbi = inode->sbi;
>> + u8 *compressmeta = ictx->metacur - Z_EROFS_LEGACY_MAP_HEADER_SIZE;
>> + unsigned int legacymetasize;
>> + int ret = 0;
>> +
>> + /* fall back to no compression mode */
>> + DBG_BUGON(compressed_blocks < !!inode->idata_size);
>> + compressed_blocks -= !!inode->idata_size;
>> +
>> + z_erofs_write_indexes(ictx);
>> + legacymetasize = ictx->metacur - compressmeta;
>> + /* estimate if data compression saves space or not */
>> + if (!inode->fragment_size &&
>> + compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
>> + legacymetasize >= inode->i_size) {
>> + z_erofs_dedupe_commit(true);
>> +
>> + if (inode->idata) {
>> + free(inode->idata);
>> + inode->idata = NULL;
>> + }
>> + erofs_bdrop(bh, true); /* revoke buffer */
>> + free(compressmeta);
>> + inode->compressmeta = NULL;
>> +
>> + return -ENOSPC;
>> + }
>> + z_erofs_dedupe_commit(false);
>> + z_erofs_write_mapheader(inode, compressmeta);
>> +
>> + if (!ictx->fragemitted)
>> + sbi->saved_by_deduplication += inode->fragment_size;
>> +
>> + /* if the entire file is a fragment, a simplified form is used. */
>> + if (inode->i_size <= inode->fragment_size) {
>> + DBG_BUGON(inode->i_size < inode->fragment_size);
>> + DBG_BUGON(inode->fragmentoff >> 63);
>> + *(__le64 *)compressmeta =
>> + cpu_to_le64(inode->fragmentoff | 1ULL << 63);
>> + inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
>> + legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
>> + }
>> +
>> + if (compressed_blocks) {
>> + ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
>> + DBG_BUGON(ret != erofs_blksiz(sbi));
>> + } else {
>> + if (!cfg.c_fragments && !cfg.c_dedupe)
>> + DBG_BUGON(!inode->idata_size);
>> + }
>> +
>> + erofs_info("compressed %s (%llu bytes) into %u blocks",
>> + inode->i_srcpath, (unsigned long long)inode->i_size,
>> + compressed_blocks);
>> +
>> + if (inode->idata_size) {
>> + bh->op = &erofs_skip_write_bhops;
>> + inode->bh_data = bh;
>> + } else {
>> + erofs_bdrop(bh, false);
>> + }
>> +
>> + inode->u.i_blocks = compressed_blocks;
>> +
>> + if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
>> + inode->extent_isize = legacymetasize;
>> + } else {
>> + ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
>> + legacymetasize,
>> + compressmeta);
>> + DBG_BUGON(ret);
>> + }
>> + inode->compressmeta = compressmeta;
>> + if (!erofs_is_packed_inode(inode))
>> + erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
>> + return 0;
>> +}
>> +
>> #ifdef EROFS_MT_ENABLED
>> void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
>> {
>> @@ -1099,6 +1184,7 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
>> struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
>> struct erofs_compress_wq_tls *tls = tlsp;
>> struct z_erofs_compress_sctx *sctx = &cwork->ctx;
>> + struct z_erofs_mt_file *mtfile_desc = cwork->mtfile_desc;
>> struct erofs_sb_info *sbi = sctx->ictx->inode->sbi;
>> int ret = 0;
>>
>> @@ -1124,10 +1210,10 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
>>
>> out:
>> cwork->errcode = ret;
>> - pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
>> - ++z_erofs_mt_ctrl.nfini;
>> - pthread_cond_signal(&z_erofs_mt_ctrl.cond);
>> - pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
>> + pthread_mutex_lock(&mtfile_desc->mutex);
>> + ++mtfile_desc->nfini;
>> + pthread_cond_signal(&mtfile_desc->cond);
>> + pthread_mutex_unlock(&mtfile_desc->mutex);
>> }
>>
>> int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
>> @@ -1161,27 +1247,60 @@ int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
>> }
>>
>> int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
>> - struct erofs_compress_cfg *ccfg,
>> - erofs_blk_t blkaddr,
>> - erofs_blk_t *compressed_blocks)
>> + struct erofs_compress_cfg *ccfg)
>> {
>> struct erofs_compress_work *cur, *head = NULL, **last = &head;
>> struct erofs_inode *inode = ictx->inode;
>> + struct z_erofs_mt_file *mtfile_desc = NULL;
>> int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
>> - int ret, i;
>> + int i;
>>
>> - z_erofs_mt_ctrl.nfini = 0;
>> + pthread_mutex_lock(&z_erofs_mt_ctrl.file_mutex);
>> + if (z_erofs_mt_ctrl.file_idle) {
>> + mtfile_desc = z_erofs_mt_ctrl.file_idle;
>> + z_erofs_mt_ctrl.file_idle = mtfile_desc->next;
>> + mtfile_desc->next = NULL;
>> + }
>> + pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex);
>> + if (!mtfile_desc) {
>> + mtfile_desc = calloc(1, sizeof(*mtfile_desc));
>> + if (!mtfile_desc) {
>> + return -ENOMEM;
>> + }
>> + }
>> + inode->mt_desc = mtfile_desc;
>> +
>> + mtfile_desc->fd = ictx->fd;
>> + mtfile_desc->total = nsegs;
>> + mtfile_desc->nfini = 0;
>> + pthread_mutex_init(&mtfile_desc->mutex, NULL);
>> + pthread_cond_init(&mtfile_desc->cond, NULL);
>>
>> for (i = 0; i < nsegs; i++) {
>> - if (z_erofs_mt_ctrl.idle) {
>> - cur = z_erofs_mt_ctrl.idle;
>> - z_erofs_mt_ctrl.idle = cur->next;
>> + cur = NULL;
>> +
>> + pthread_mutex_lock(&z_erofs_mt_ctrl.work_mutex);
>> + if (z_erofs_mt_ctrl.work_idle) {
>> + cur = z_erofs_mt_ctrl.work_idle;
>> + z_erofs_mt_ctrl.work_idle = cur->next;
>> cur->next = NULL;
>> - } else {
>> + }
>> + pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex);
>> + if (!cur) {
>> cur = calloc(1, sizeof(*cur));
>> - if (!cur)
>> + if (!cur) {
>> + while (head) {
>> + cur = head;
>> + head = cur->next;
>> + free(cur);
>> + }
>> + free(mtfile_desc);
>> return -ENOMEM;
>> + }
>> }
>> +
>> + if (i == 0)
>> + mtfile_desc->head = cur;
>> *last = cur;
>> last = &cur->next;
>>
>> @@ -1205,21 +1324,29 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
>> cur->comp_level = ccfg->handle.compression_level;
>> cur->dict_size = ccfg->handle.dict_size;
>>
>> + cur->mtfile_desc = mtfile_desc;
>> cur->work.fn = z_erofs_mt_workfn;
>> erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
>> }
>>
>> - pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
>> - while (z_erofs_mt_ctrl.nfini != nsegs)
>> - pthread_cond_wait(&z_erofs_mt_ctrl.cond,
>> - &z_erofs_mt_ctrl.mutex);
>> - pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
>> + return 0;
>> +}
>>
>> - ret = 0;
>> - while (head) {
>> - cur = head;
>> - head = cur->next;
>> +int z_erofs_mt_reap(struct z_erofs_mt_file *desc) {
>> + struct erofs_buffer_head *bh = NULL;
>> + struct erofs_compress_work *cur = desc->head, *tmp;
>> + struct z_erofs_compress_ictx *ictx = cur->ctx.ictx;
>> + erofs_blk_t blkaddr, compressed_blocks = 0;
>> + int ret = 0;
>>
>> + bh = erofs_balloc(DATA, 0, 0, 0);
>> + if (IS_ERR(bh)) {
>> + ret = PTR_ERR(bh);
>> + goto out;
>> + }
>> + blkaddr = erofs_mapbh(bh->block);
>> +
>> + while (cur) {
>> if (cur->errcode) {
>> ret = cur->errcode;
>> } else {
>> @@ -1230,13 +1357,30 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
>> if (ret2)
>> ret = ret2;
>>
>> - *compressed_blocks += cur->ctx.blkaddr - blkaddr;
>> + compressed_blocks += cur->ctx.blkaddr - blkaddr;
>> blkaddr = cur->ctx.blkaddr;
>> }
>>
>> - cur->next = z_erofs_mt_ctrl.idle;
>> - z_erofs_mt_ctrl.idle = cur;
>> + tmp = cur->next;
>> + pthread_mutex_lock(&z_erofs_mt_ctrl.work_mutex);
>> + cur->next = z_erofs_mt_ctrl.work_idle;
>> + z_erofs_mt_ctrl.work_idle = cur;
>> + pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex);
>> + cur = tmp;
>> }
>> + if (ret)
>> + goto out;
>> +
>> + ret = z_erofs_finalize_compression(
>> + ictx, bh, blkaddr - compressed_blocks, compressed_blocks);
>> +
>> +out:
>> + free(ictx);
>> + pthread_mutex_lock(&z_erofs_mt_ctrl.file_mutex);
>> + desc->next = z_erofs_mt_ctrl.file_idle;
>> + z_erofs_mt_ctrl.file_idle = desc;
>> + pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex);
>> +
>> return ret;
>> }
>> #endif
>> @@ -1249,9 +1393,7 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>> static struct z_erofs_compress_sctx sctx;
>> struct erofs_compress_cfg *ccfg;
>> erofs_blk_t blkaddr, compressed_blocks = 0;
>> - unsigned int legacymetasize;
>> int ret;
>> - bool ismt = false;
>> struct erofs_sb_info *sbi = inode->sbi;
>> u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
>> sizeof(struct z_erofs_lcluster_index) +
>> @@ -1260,11 +1402,17 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>> if (!compressmeta)
>> return -ENOMEM;
>>
>> - /* allocate main data buffer */
>> - bh = erofs_balloc(DATA, 0, 0, 0);
>> - if (IS_ERR(bh)) {
>> - ret = PTR_ERR(bh);
>> - goto err_free_meta;
>> + if (!z_erofs_mt_enabled) {
>> + /* allocate main data buffer */
>> + bh = erofs_balloc(DATA, 0, 0, 0);
>> + if (IS_ERR(bh)) {
>> + ret = PTR_ERR(bh);
>> + goto err_free_meta;
>> + }
>> + blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
>> + } else {
>> + bh = NULL;
>> + blkaddr = EROFS_NULL_ADDR;
>> }
>>
>> /* initialize per-file compression setting */
>> @@ -1313,7 +1461,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>> goto err_bdrop;
>> }
>>
>> - blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
>> ctx.inode = inode;
>> ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
>> ctx.fd = fd;
>> @@ -1331,11 +1478,24 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>> if (ret)
>> goto err_free_idata;
>> #ifdef EROFS_MT_ENABLED
>> - } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
>> - ismt = true;
>> - ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks);
>> - if (ret)
>> + } else if (z_erofs_mt_enabled) {
>> + struct z_erofs_compress_ictx *l_ictx;
>> +
>> + l_ictx = malloc(sizeof(*l_ictx));
>> + if (!l_ictx) {
>> + ret = -ENOMEM;
>> goto err_free_idata;
>> + }
>> +
>> + memcpy(l_ictx, &ctx, sizeof(*l_ictx));
>> + init_list_head(&l_ictx->extents);
>> +
>> + ret = z_erofs_mt_compress(l_ictx, ccfg);
>> + if (ret) {
>> + free(l_ictx);
>> + goto err_free_idata;
>> + }
>> + return 0;
>> #endif
>> } else {
>> sctx.queue = g_queue;
>> @@ -1352,10 +1512,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>> compressed_blocks = sctx.blkaddr - blkaddr;
>> }
>>
>> - /* fall back to no compression mode */
>> - DBG_BUGON(compressed_blocks < !!inode->idata_size);
>> - compressed_blocks -= !!inode->idata_size;
>> -
>> /* generate an extent for the deduplicated fragment */
>> if (inode->fragment_size && !ctx.fragemitted) {
>> struct z_erofs_extent_item *ei;
>> @@ -1377,69 +1533,10 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>> z_erofs_commit_extent(&sctx, ei);
>> }
>> z_erofs_fragments_commit(inode);
>> + list_splice_tail(&sctx.extents, &ctx.extents);
>>
>> - if (!ismt)
>> - list_splice_tail(&sctx.extents, &ctx.extents);
>> -
>> - z_erofs_write_indexes(&ctx);
>> - legacymetasize = ctx.metacur - compressmeta;
>> - /* estimate if data compression saves space or not */
>> - if (!inode->fragment_size &&
>> - compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
>> - legacymetasize >= inode->i_size) {
>> - z_erofs_dedupe_commit(true);
>> - ret = -ENOSPC;
>> - goto err_free_idata;
>> - }
>> - z_erofs_dedupe_commit(false);
>> - z_erofs_write_mapheader(inode, compressmeta);
>> -
>> - if (!ctx.fragemitted)
>> - sbi->saved_by_deduplication += inode->fragment_size;
>> -
>> - /* if the entire file is a fragment, a simplified form is used. */
>> - if (inode->i_size <= inode->fragment_size) {
>> - DBG_BUGON(inode->i_size < inode->fragment_size);
>> - DBG_BUGON(inode->fragmentoff >> 63);
>> - *(__le64 *)compressmeta =
>> - cpu_to_le64(inode->fragmentoff | 1ULL << 63);
>> - inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
>> - legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
>> - }
>> -
>> - if (compressed_blocks) {
>> - ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
>> - DBG_BUGON(ret != erofs_blksiz(sbi));
>> - } else {
>> - if (!cfg.c_fragments && !cfg.c_dedupe)
>> - DBG_BUGON(!inode->idata_size);
>> - }
>> -
>> - erofs_info("compressed %s (%llu bytes) into %u blocks",
>> - inode->i_srcpath, (unsigned long long)inode->i_size,
>> - compressed_blocks);
>> -
>> - if (inode->idata_size) {
>> - bh->op = &erofs_skip_write_bhops;
>> - inode->bh_data = bh;
>> - } else {
>> - erofs_bdrop(bh, false);
>> - }
>> -
>> - inode->u.i_blocks = compressed_blocks;
>> -
>> - if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
>> - inode->extent_isize = legacymetasize;
>> - } else {
>> - ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
>> - legacymetasize,
>> - compressmeta);
>> - DBG_BUGON(ret);
>> - }
>> - inode->compressmeta = compressmeta;
>> - if (!erofs_is_packed_inode(inode))
>> - erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
>> - return 0;
>> + return z_erofs_finalize_compression(&ctx, bh, blkaddr,
>> + compressed_blocks);
>>
>> err_free_idata:
>> if (inode->idata) {
>> @@ -1447,7 +1544,8 @@ err_free_idata:
>> inode->idata = NULL;
>> }
>> err_bdrop:
>> - erofs_bdrop(bh, true); /* revoke buffer */
>> + if (bh)
>> + erofs_bdrop(bh, true); /* revoke buffer */
>> err_free_meta:
>> free(compressmeta);
>> inode->compressmeta = NULL;
>> @@ -1598,8 +1696,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
>> z_erofs_mt_enabled = false;
>> #ifdef EROFS_MT_ENABLED
>> if (cfg.c_mt_workers > 1) {
>> - pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
>> - pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
>> + pthread_mutex_init(&z_erofs_mt_ctrl.file_mutex, NULL);
>> + pthread_mutex_init(&z_erofs_mt_ctrl.work_mutex, NULL);
>> ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
>> cfg.c_mt_workers,
>> cfg.c_mt_workers << 2,
>> @@ -1626,11 +1724,17 @@ int z_erofs_compress_exit(void)
>> ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
>> if (ret)
>> return ret;
>> - while (z_erofs_mt_ctrl.idle) {
>> + while (z_erofs_mt_ctrl.work_idle) {
>> struct erofs_compress_work *tmp =
>> - z_erofs_mt_ctrl.idle->next;
>> - free(z_erofs_mt_ctrl.idle);
>> - z_erofs_mt_ctrl.idle = tmp;
>> + z_erofs_mt_ctrl.work_idle->next;
>> + free(z_erofs_mt_ctrl.work_idle);
>> + z_erofs_mt_ctrl.work_idle = tmp;
>> + }
>> + while (z_erofs_mt_ctrl.file_idle) {
>> + struct z_erofs_mt_file *tmp =
>> + z_erofs_mt_ctrl.file_idle->next;
>> + free(z_erofs_mt_ctrl.file_idle);
>> + z_erofs_mt_ctrl.file_idle = tmp;
>> }
>> #endif
>> }
>> diff --git a/lib/inode.c b/lib/inode.c
>> index 7dfb021..6d1faae 100644
>> --- a/lib/inode.c
>> +++ b/lib/inode.c
>> @@ -29,6 +29,8 @@
>> #include "erofs/fragments.h"
>> #include "liberofs_private.h"
>>
>> +extern bool z_erofs_mt_enabled;
>> +
>> #define S_SHIFT 12
>> static unsigned char erofs_ftype_by_mode[S_IFMT >> S_SHIFT] = {
>> [S_IFREG >> S_SHIFT] = EROFS_FT_REG_FILE,
>> @@ -1036,6 +1038,9 @@ struct erofs_inode *erofs_new_inode(void)
>> inode->i_ino[0] = sbi.inos++; /* inode serial number */
>> inode->i_count = 1;
>> inode->datalayout = EROFS_INODE_FLAT_PLAIN;
>> +#ifdef EROFS_MT_ENABLED
>> + inode->mt_desc = NULL;
>> +#endif
>>
>> init_list_head(&inode->i_hash);
>> init_list_head(&inode->i_subdirs);
>> @@ -1100,6 +1105,10 @@ static void erofs_fixup_meta_blkaddr(struct erofs_inode *rootdir)
>> rootdir->nid = (off - meta_offset) >> EROFS_ISLOTBITS;
>> }
>>
>> +#ifdef EROFS_MT_ENABLED
>> +#define EROFS_MT_QUEUE_SIZE 256
>> +struct erofs_inode_fifo *z_erofs_mt_queue;
>> +#endif
>>
>> static int erofs_mkfs_handle_symlink(struct erofs_inode *inode)
>> {
>> @@ -1143,6 +1152,21 @@ static int erofs_mkfs_handle_file(struct erofs_inode *inode)
>> return 0;
>> }
>>
>> +static int erofs_mkfs_issue_compress(struct erofs_inode *inode)
>> +{
>> + if (!inode->i_size || S_ISLNK(inode->i_mode))
>> + return 0;
>> +
>> + if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
>> + int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
>> + if (fd < 0)
>> + return -errno;
>> + return erofs_write_compressed_file(inode, fd, 0);
>> + }
>> +
>> + return 0;
>> +}
>> +
>> static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>> struct list_head *dirs)
>> {
>> @@ -1152,6 +1176,14 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>> struct erofs_dentry *d;
>> unsigned int nr_subdirs = 0, i_nlink;
>>
>> + ret = erofs_scan_file_xattrs(dir);
>> + if (ret < 0)
>> + return ret;
>> +
>> + ret = erofs_prepare_xattr_ibody(dir);
>> + if (ret < 0)
>> + return ret;
>> +
>> _dir = opendir(dir->i_srcpath);
>> if (!_dir) {
>> erofs_err("failed to opendir at %s: %s",
>> @@ -1159,7 +1191,6 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>> return -errno;
>> }
>>
>> - nr_subdirs = 0;
>> while (1) {
>> /*
>> * set errno to 0 before calling readdir() in order to
>> @@ -1195,13 +1226,15 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>> if (ret)
>> return ret;
>>
>> - ret = erofs_prepare_inode_buffer(dir);
>> - if (ret)
>> - return ret;
>> - dir->bh->op = &erofs_skip_write_bhops;
>> + if (!z_erofs_mt_enabled) {
>> + ret = erofs_prepare_inode_buffer(dir);
>> + if (ret)
>> + return ret;
>> + dir->bh->op = &erofs_skip_write_bhops;
>>
>> - if (IS_ROOT(dir))
>> - erofs_fixup_meta_blkaddr(dir);
>> + if (IS_ROOT(dir))
>> + erofs_fixup_meta_blkaddr(dir);
>> + }
>>
>> i_nlink = 0;
>> list_for_each_entry(d, &dir->i_subdirs, d_child) {
>> @@ -1300,11 +1333,13 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir,
>>
>> if (S_ISDIR(dir->i_mode))
>> return erofs_mkfs_handle_dir(dir, dirs);
>> + else if (z_erofs_mt_enabled)
>> + return erofs_mkfs_issue_compress(dir);
>> else
>> return erofs_mkfs_handle_file(dir);
>> }
>>
>> -struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>> +struct erofs_inode *__erofs_mkfs_build_tree_from_path(const char *path)
>> {
>> LIST_HEAD(dirs);
>> struct erofs_inode *inode, *root, *dumpdir;
>> @@ -1325,7 +1360,8 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>> list_del(&inode->i_subdirs);
>> init_list_head(&inode->i_subdirs);
>>
>> - erofs_mkfs_print_progressinfo(inode);
>> + if (!z_erofs_mt_enabled)
>> + erofs_mkfs_print_progressinfo(inode);
>>
>> err = erofs_mkfs_build_tree(inode, &dirs);
>> if (err) {
>> @@ -1333,15 +1369,215 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>> break;
>> }
>>
>> + if (!z_erofs_mt_enabled) {
>> + if (S_ISDIR(inode->i_mode)) {
>> + inode->next_dirwrite = dumpdir;
>> + dumpdir = inode;
>> + } else {
>> + erofs_iput(inode);
>> + }
>> +#ifdef EROFS_MT_ENABLED
> Missing the changes we discussed in v1 here ?
Sorry that I missed it. Fixed.
>> + } else {
>> + erofs_push_inode_fifo(z_erofs_mt_queue, &inode);
>> +#endif
>> + }
>> + } while (!list_empty(&dirs));
>> +
>> + if (!z_erofs_mt_enabled)
>> + erofs_mkfs_dumpdir(dumpdir);
>> +#ifdef EROFS_MT_ENABLED
>> + else
>> + erofs_push_inode_fifo(z_erofs_mt_queue, &dumpdir);
>> +#endif
>> + return root;
>> +}
>> +
>> +#ifdef EROFS_MT_ENABLED
>> +pthread_t z_erofs_mt_traverser;
>> +
>> +void *z_erofs_mt_traverse_task(void *path)
>> +{
>> + pthread_exit((void *)__erofs_mkfs_build_tree_from_path(path));
>> +}
>> +
>> +static int z_erofs_mt_reap_compressed(struct erofs_inode *inode)
>> +{
>> + struct z_erofs_mt_file *desc = inode->mt_desc;
>> + int fd = desc->fd;
>> + int ret = 0;
>> +
>> + pthread_mutex_lock(&desc->mutex);
>> + while (desc->nfini != desc->total)
>> + pthread_cond_wait(&desc->cond, &desc->mutex);
>> + pthread_mutex_unlock(&desc->mutex);
>> +
>> + ret = z_erofs_mt_reap(desc);
>> + if (ret == -ENOSPC) {
>> + ret = lseek(fd, 0, SEEK_SET);
>> + if (ret < 0)
>> + return -errno;
>> +
>> + ret = write_uncompressed_file_from_fd(inode, fd);
>> + }
>> +
>> + close(fd);
>> + return ret;
>> +}
>> +
>> +static int z_erofs_mt_reap_inodes()
>> +{
>> + struct erofs_inode *inode, *dumpdir;
>> + int ret = 0;
>> +
>> + dumpdir = NULL;
>> + while (true) {
>> + inode = *(struct erofs_inode **)erofs_pop_inode_fifo(
>> + z_erofs_mt_queue);
>> + if (!inode)
>> + break;
>> +
>> + erofs_mkfs_print_progressinfo(inode);
>> +
>> if (S_ISDIR(inode->i_mode)) {
>> + ret = erofs_prepare_inode_buffer(inode);
>> + if (ret)
>> + goto out;
>> + inode->bh->op = &erofs_skip_write_bhops;
>> +
>> + if (IS_ROOT(inode))
>> + erofs_fixup_meta_blkaddr(inode);
>> +
>> inode->next_dirwrite = dumpdir;
>> dumpdir = inode;
>> + continue;
>> + }
>> +
>> + if (inode->mt_desc) {
>> + ret = z_erofs_mt_reap_compressed(inode);
>> + } else if (S_ISLNK(inode->i_mode)) {
>> + ret = erofs_mkfs_handle_symlink(inode);
>> + } else if (!inode->i_size) {
>> + ret = 0;
>> } else {
>> - erofs_iput(inode);
>> + int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
>> + if (fd < 0)
>> + return -errno;
>> +
>> + if (cfg.c_chunkbits)
>> + ret = erofs_write_chunked_file(inode, fd, 0);
>> + else
>> + ret = write_uncompressed_file_from_fd(inode,
>> + fd);
>> + close(fd);
>> }
>> - } while (!list_empty(&dirs));
>> + if (ret)
>> + goto out;
>> +
>> + erofs_prepare_inode_buffer(inode);
>> + erofs_write_tail_end(inode);
>> + erofs_iput(inode);
>> + }
>>
>> erofs_mkfs_dumpdir(dumpdir);
>> +
>> +out:
>> + return ret;
>> +}
>> +
>> +struct erofs_inode_fifo *erofs_alloc_inode_fifo(size_t size, size_t elem_size)
>> +{
>> + struct erofs_inode_fifo *q = malloc(sizeof(*q));
>> +
>> + pthread_mutex_init(&q->lock, NULL);
>> + pthread_cond_init(&q->empty, NULL);
>> + pthread_cond_init(&q->full, NULL);
>> +
>> + q->size = size;
>> + q->elem_size = elem_size;
>> + q->head = 0;
>> + q->tail = 0;
>> + q->buf = calloc(size, elem_size);
>> + if (!q->buf)
>> + return ERR_PTR(-ENOMEM);
>> +
>> + return q;
>> +}
>> +
>> +void erofs_push_inode_fifo(struct erofs_inode_fifo *q, void *elem)
>> +{
>> + pthread_mutex_lock(&q->lock);
>> +
>> + while ((q->tail + 1) % q->size == q->head)
>> + pthread_cond_wait(&q->full, &q->lock);
>> +
>> + memcpy(q->buf + q->tail * q->elem_size, elem, q->elem_size);
>> + q->tail = (q->tail + 1) % q->size;
>> +
>> + pthread_cond_signal(&q->empty);
>> + pthread_mutex_unlock(&q->lock);
>> +}
>> +
>> +void *erofs_pop_inode_fifo(struct erofs_inode_fifo *q)
>> +{
>> + void *elem;
>> +
>> + pthread_mutex_lock(&q->lock);
>> +
>> + while (q->head == q->tail)
>> + pthread_cond_wait(&q->empty, &q->lock);
>> +
>> + elem = q->buf + q->head * q->elem_size;
>> + q->head = (q->head + 1) % q->size;
>> +
>> + pthread_cond_signal(&q->full);
>> + pthread_mutex_unlock(&q->lock);
>> +
>> + return elem;
>> +}
>> +
>> +void erofs_destroy_inode_fifo(struct erofs_inode_fifo *q)
>> +{
>> + pthread_mutex_destroy(&q->lock);
>> + pthread_cond_destroy(&q->empty);
>> + pthread_cond_destroy(&q->full);
>> + free(q->buf);
>> + free(q);
>> +}
>> +
>> +#endif
>> +
>> +struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>> +{
>> +#ifdef EROFS_MT_ENABLED
>> + int err;
>> +#endif
>> + struct erofs_inode *root = NULL;
>> +
>> + if (!z_erofs_mt_enabled)
>> + return __erofs_mkfs_build_tree_from_path(path);
>> +
>> +#ifdef EROFS_MT_ENABLED
>> + z_erofs_mt_queue = erofs_alloc_inode_fifo(EROFS_MT_QUEUE_SIZE,
>> + sizeof(struct erofs_inode *));
> Nit:
> z_erofs_mt_fifo or z_erofs_mt_inode_fifo ?
Fixed.
> In addition,
> the element in the fifo is struct erofs_inode **, so better to use
> sizeof(struct erofs_inode **),
> although they are both pointers ...
>
>> + if (IS_ERR(z_erofs_mt_queue))
>> + return ERR_CAST(z_erofs_mt_queue);
>> +
>> + err = pthread_create(&z_erofs_mt_traverser, NULL,
>> + z_erofs_mt_traverse_task, (void *)path);
>> + if (err)
>> + return ERR_PTR(err);
>> +
>> + err = z_erofs_mt_reap_inodes();
>> + if (err)
>> + return ERR_PTR(err);
>> +
>> + err = pthread_join(z_erofs_mt_traverser, (void *)&root);
>> + if (err)
>> + return ERR_PTR(err);
>> +
>> + erofs_destroy_inode_fifo(z_erofs_mt_queue);
>> +#endif
>> +
>> return root;
>> }
>>
>> --
>> 2.44.0
>>
> Thanks,
> Jianan
More information about the Linux-erofs
mailing list