[PATCH v3 2/2] erofs-utils: mkfs: introduce inter-file multi-threaded compression

Yifan Zhao zhaoyifan at sjtu.edu.cn
Sun Mar 24 04:31:57 AEDT 2024


On 2024/3/23 11:46, Huang Jianan wrote:
> Yifan Zhao <zhaoyifan at sjtu.edu.cn> 于2024年3月22日周五 18:25写道:
>> This patch allows parallelizing the compression process of different
>> files in mkfs. Specifically, a traverser thread traverses the files and
>> issues the compression task, which is handled by the workers. Then, the
>> main thread consumes them and writes the compressed data to the device.
>>
>> To this end, the logic of erofs_write_compressed_file() has been
>> modified to split the creation and completion logic of the compression
>> task.
>>
>> Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
>> Co-authored-by: Tong Xin <xin_tong at sjtu.edu.cn>
>> ---
>>   include/erofs/compress.h |  16 ++
>>   include/erofs/inode.h    |  17 ++
>>   include/erofs/internal.h |   3 +
>>   lib/compress.c           | 336 +++++++++++++++++++++++++--------------
>>   lib/inode.c              | 258 ++++++++++++++++++++++++++++--
>>   5 files changed, 503 insertions(+), 127 deletions(-)
>>
>> diff --git a/include/erofs/compress.h b/include/erofs/compress.h
>> index 871db54..8d5a54b 100644
>> --- a/include/erofs/compress.h
>> +++ b/include/erofs/compress.h
>> @@ -17,6 +17,22 @@ extern "C"
>>   #define EROFS_CONFIG_COMPR_MAX_SZ      (4000 * 1024)
>>   #define Z_EROFS_COMPR_QUEUE_SZ         (EROFS_CONFIG_COMPR_MAX_SZ * 2)
>>
>> +#ifdef EROFS_MT_ENABLED
>> +struct z_erofs_mt_file {
>> +       pthread_mutex_t mutex;
>> +       pthread_cond_t cond;
>> +       int total;
>> +       int nfini;
>> +
>> +       int fd;
>> +       struct erofs_compress_work *head;
>> +
>> +       struct z_erofs_mt_file *next;
>> +};
>> +
>> +int z_erofs_mt_reap(struct z_erofs_mt_file *desc);
>> +#endif
>> +
>>   void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
>>   int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
>>
>> diff --git a/include/erofs/inode.h b/include/erofs/inode.h
>> index d5a732a..101ff59 100644
>> --- a/include/erofs/inode.h
>> +++ b/include/erofs/inode.h
>> @@ -41,6 +41,23 @@ struct erofs_inode *erofs_new_inode(void);
>>   struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path);
>>   struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name);
>>
>> +#ifdef EROFS_MT_ENABLED
>> +struct erofs_inode_fifo {
>> +       pthread_mutex_t lock;
>> +       pthread_cond_t full, empty;
>> +
>> +       void *buf;
>> +
>> +       size_t size, elem_size;
>> +       size_t head, tail;
>> +};
>> +
>> +struct erofs_inode_fifo *erofs_alloc_inode_fifo(size_t size, size_t elem_size);
>> +void erofs_push_inode_fifo(struct erofs_inode_fifo *q, void *elem);
>> +void *erofs_pop_inode_fifo(struct erofs_inode_fifo *q);
>> +void erofs_destroy_inode_fifo(struct erofs_inode_fifo *q);
>> +#endif
>> +
>>   #ifdef __cplusplus
>>   }
>>   #endif
>> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
>> index 4cd2059..2580588 100644
>> --- a/include/erofs/internal.h
>> +++ b/include/erofs/internal.h
>> @@ -250,6 +250,9 @@ struct erofs_inode {
>>   #ifdef WITH_ANDROID
>>          uint64_t capabilities;
>>   #endif
>> +#ifdef EROFS_MT_ENABLED
>> +       struct z_erofs_mt_file *mt_desc;
>> +#endif
>>   };
>>
>>   static inline erofs_off_t erofs_iloc(struct erofs_inode *inode)
>> diff --git a/lib/compress.c b/lib/compress.c
>> index e064293..d89e404 100644
>> --- a/lib/compress.c
>> +++ b/lib/compress.c
>> @@ -85,6 +85,7 @@ struct erofs_compress_work {
>>          struct erofs_work work;
>>          struct z_erofs_compress_sctx ctx;
>>          struct erofs_compress_work *next;
>> +       struct z_erofs_mt_file *mtfile_desc;
>>
>>          unsigned int alg_id;
>>          char *alg_name;
>> @@ -96,14 +97,14 @@ struct erofs_compress_work {
>>
>>   static struct {
>>          struct erofs_workqueue wq;
>> -       struct erofs_compress_work *idle;
>> -       pthread_mutex_t mutex;
>> -       pthread_cond_t cond;
>> -       int nfini;
>> +       struct erofs_compress_work *work_idle;
>> +       pthread_mutex_t work_mutex;
>> +       struct z_erofs_mt_file *file_idle;
>> +       pthread_mutex_t file_mutex;
>>   } z_erofs_mt_ctrl;
>>   #endif
>>
>> -static bool z_erofs_mt_enabled;
>> +bool z_erofs_mt_enabled;
>>
>>   #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0)
>>
>> @@ -1025,6 +1026,90 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx,
>>          return 0;
>>   }
>>
>> +int z_erofs_finalize_compression(struct z_erofs_compress_ictx *ictx,
>> +                                struct erofs_buffer_head *bh,
>> +                                erofs_blk_t blkaddr,
>> +                                erofs_blk_t compressed_blocks)
>> +{
>> +       struct erofs_inode *inode = ictx->inode;
>> +       struct erofs_sb_info *sbi = inode->sbi;
>> +       u8 *compressmeta = ictx->metacur - Z_EROFS_LEGACY_MAP_HEADER_SIZE;
>> +       unsigned int legacymetasize;
>> +       int ret = 0;
>> +
>> +       /* fall back to no compression mode */
>> +       DBG_BUGON(compressed_blocks < !!inode->idata_size);
>> +       compressed_blocks -= !!inode->idata_size;
>> +
>> +       z_erofs_write_indexes(ictx);
>> +       legacymetasize = ictx->metacur - compressmeta;
>> +       /* estimate if data compression saves space or not */
>> +       if (!inode->fragment_size &&
>> +           compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
>> +           legacymetasize >= inode->i_size) {
>> +               z_erofs_dedupe_commit(true);
>> +
>> +               if (inode->idata) {
>> +                       free(inode->idata);
>> +                       inode->idata = NULL;
>> +               }
>> +               erofs_bdrop(bh, true); /* revoke buffer */
>> +               free(compressmeta);
>> +               inode->compressmeta = NULL;
>> +
>> +               return -ENOSPC;
>> +       }
>> +       z_erofs_dedupe_commit(false);
>> +       z_erofs_write_mapheader(inode, compressmeta);
>> +
>> +       if (!ictx->fragemitted)
>> +               sbi->saved_by_deduplication += inode->fragment_size;
>> +
>> +       /* if the entire file is a fragment, a simplified form is used. */
>> +       if (inode->i_size <= inode->fragment_size) {
>> +               DBG_BUGON(inode->i_size < inode->fragment_size);
>> +               DBG_BUGON(inode->fragmentoff >> 63);
>> +               *(__le64 *)compressmeta =
>> +                       cpu_to_le64(inode->fragmentoff | 1ULL << 63);
>> +               inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
>> +               legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
>> +       }
>> +
>> +       if (compressed_blocks) {
>> +               ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
>> +               DBG_BUGON(ret != erofs_blksiz(sbi));
>> +       } else {
>> +               if (!cfg.c_fragments && !cfg.c_dedupe)
>> +                       DBG_BUGON(!inode->idata_size);
>> +       }
>> +
>> +       erofs_info("compressed %s (%llu bytes) into %u blocks",
>> +                  inode->i_srcpath, (unsigned long long)inode->i_size,
>> +                  compressed_blocks);
>> +
>> +       if (inode->idata_size) {
>> +               bh->op = &erofs_skip_write_bhops;
>> +               inode->bh_data = bh;
>> +       } else {
>> +               erofs_bdrop(bh, false);
>> +       }
>> +
>> +       inode->u.i_blocks = compressed_blocks;
>> +
>> +       if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
>> +               inode->extent_isize = legacymetasize;
>> +       } else {
>> +               ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
>> +                                                         legacymetasize,
>> +                                                         compressmeta);
>> +               DBG_BUGON(ret);
>> +       }
>> +       inode->compressmeta = compressmeta;
>> +       if (!erofs_is_packed_inode(inode))
>> +               erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
>> +       return 0;
>> +}
>> +
>>   #ifdef EROFS_MT_ENABLED
>>   void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
>>   {
>> @@ -1099,6 +1184,7 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
>>          struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
>>          struct erofs_compress_wq_tls *tls = tlsp;
>>          struct z_erofs_compress_sctx *sctx = &cwork->ctx;
>> +       struct z_erofs_mt_file *mtfile_desc = cwork->mtfile_desc;
>>          struct erofs_sb_info *sbi = sctx->ictx->inode->sbi;
>>          int ret = 0;
>>
>> @@ -1124,10 +1210,10 @@ void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp)
>>
>>   out:
>>          cwork->errcode = ret;
>> -       pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
>> -       ++z_erofs_mt_ctrl.nfini;
>> -       pthread_cond_signal(&z_erofs_mt_ctrl.cond);
>> -       pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
>> +       pthread_mutex_lock(&mtfile_desc->mutex);
>> +       ++mtfile_desc->nfini;
>> +       pthread_cond_signal(&mtfile_desc->cond);
>> +       pthread_mutex_unlock(&mtfile_desc->mutex);
>>   }
>>
>>   int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
>> @@ -1161,27 +1247,60 @@ int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
>>   }
>>
>>   int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
>> -                       struct erofs_compress_cfg *ccfg,
>> -                       erofs_blk_t blkaddr,
>> -                       erofs_blk_t *compressed_blocks)
>> +                       struct erofs_compress_cfg *ccfg)
>>   {
>>          struct erofs_compress_work *cur, *head = NULL, **last = &head;
>>          struct erofs_inode *inode = ictx->inode;
>> +       struct z_erofs_mt_file *mtfile_desc = NULL;
>>          int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
>> -       int ret, i;
>> +       int i;
>>
>> -       z_erofs_mt_ctrl.nfini = 0;
>> +       pthread_mutex_lock(&z_erofs_mt_ctrl.file_mutex);
>> +       if (z_erofs_mt_ctrl.file_idle) {
>> +               mtfile_desc = z_erofs_mt_ctrl.file_idle;
>> +               z_erofs_mt_ctrl.file_idle = mtfile_desc->next;
>> +               mtfile_desc->next = NULL;
>> +       }
>> +       pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex);
>> +       if (!mtfile_desc) {
>> +               mtfile_desc = calloc(1, sizeof(*mtfile_desc));
>> +               if (!mtfile_desc) {
>> +                       return -ENOMEM;
>> +               }
>> +       }
>> +       inode->mt_desc = mtfile_desc;
>> +
>> +       mtfile_desc->fd = ictx->fd;
>> +       mtfile_desc->total = nsegs;
>> +       mtfile_desc->nfini = 0;
>> +       pthread_mutex_init(&mtfile_desc->mutex, NULL);
>> +       pthread_cond_init(&mtfile_desc->cond, NULL);
>>
>>          for (i = 0; i < nsegs; i++) {
>> -               if (z_erofs_mt_ctrl.idle) {
>> -                       cur = z_erofs_mt_ctrl.idle;
>> -                       z_erofs_mt_ctrl.idle = cur->next;
>> +               cur = NULL;
>> +
>> +               pthread_mutex_lock(&z_erofs_mt_ctrl.work_mutex);
>> +               if (z_erofs_mt_ctrl.work_idle) {
>> +                       cur = z_erofs_mt_ctrl.work_idle;
>> +                       z_erofs_mt_ctrl.work_idle = cur->next;
>>                          cur->next = NULL;
>> -               } else {
>> +               }
>> +               pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex);
>> +               if (!cur) {
>>                          cur = calloc(1, sizeof(*cur));
>> -                       if (!cur)
>> +                       if (!cur) {
>> +                               while (head) {
>> +                                       cur = head;
>> +                                       head = cur->next;
>> +                                       free(cur);
>> +                               }
>> +                               free(mtfile_desc);
>>                                  return -ENOMEM;
>> +                       }
>>                  }
>> +
>> +               if (i == 0)
>> +                       mtfile_desc->head = cur;
>>                  *last = cur;
>>                  last = &cur->next;
>>
>> @@ -1205,21 +1324,29 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
>>                  cur->comp_level = ccfg->handle.compression_level;
>>                  cur->dict_size = ccfg->handle.dict_size;
>>
>> +               cur->mtfile_desc = mtfile_desc;
>>                  cur->work.fn = z_erofs_mt_workfn;
>>                  erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
>>          }
>>
>> -       pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
>> -       while (z_erofs_mt_ctrl.nfini != nsegs)
>> -               pthread_cond_wait(&z_erofs_mt_ctrl.cond,
>> -                                 &z_erofs_mt_ctrl.mutex);
>> -       pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
>> +       return 0;
>> +}
>>
>> -       ret = 0;
>> -       while (head) {
>> -               cur = head;
>> -               head = cur->next;
>> +int z_erofs_mt_reap(struct z_erofs_mt_file *desc) {
>> +       struct erofs_buffer_head *bh = NULL;
>> +       struct erofs_compress_work *cur = desc->head, *tmp;
>> +       struct z_erofs_compress_ictx *ictx = cur->ctx.ictx;
>> +       erofs_blk_t blkaddr, compressed_blocks = 0;
>> +       int ret = 0;
>>
>> +       bh = erofs_balloc(DATA, 0, 0, 0);
>> +       if (IS_ERR(bh)) {
>> +               ret = PTR_ERR(bh);
>> +               goto out;
>> +       }
>> +       blkaddr = erofs_mapbh(bh->block);
>> +
>> +       while (cur) {
>>                  if (cur->errcode) {
>>                          ret = cur->errcode;
>>                  } else {
>> @@ -1230,13 +1357,30 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx,
>>                          if (ret2)
>>                                  ret = ret2;
>>
>> -                       *compressed_blocks += cur->ctx.blkaddr - blkaddr;
>> +                       compressed_blocks += cur->ctx.blkaddr - blkaddr;
>>                          blkaddr = cur->ctx.blkaddr;
>>                  }
>>
>> -               cur->next = z_erofs_mt_ctrl.idle;
>> -               z_erofs_mt_ctrl.idle = cur;
>> +               tmp = cur->next;
>> +               pthread_mutex_lock(&z_erofs_mt_ctrl.work_mutex);
>> +               cur->next = z_erofs_mt_ctrl.work_idle;
>> +               z_erofs_mt_ctrl.work_idle = cur;
>> +               pthread_mutex_unlock(&z_erofs_mt_ctrl.work_mutex);
>> +               cur = tmp;
>>          }
>> +       if (ret)
>> +               goto out;
>> +
>> +       ret = z_erofs_finalize_compression(
>> +               ictx, bh, blkaddr - compressed_blocks, compressed_blocks);
>> +
>> +out:
>> +       free(ictx);
>> +       pthread_mutex_lock(&z_erofs_mt_ctrl.file_mutex);
>> +       desc->next = z_erofs_mt_ctrl.file_idle;
>> +       z_erofs_mt_ctrl.file_idle = desc;
>> +       pthread_mutex_unlock(&z_erofs_mt_ctrl.file_mutex);
>> +
>>          return ret;
>>   }
>>   #endif
>> @@ -1249,9 +1393,7 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>>          static struct z_erofs_compress_sctx sctx;
>>          struct erofs_compress_cfg *ccfg;
>>          erofs_blk_t blkaddr, compressed_blocks = 0;
>> -       unsigned int legacymetasize;
>>          int ret;
>> -       bool ismt = false;
>>          struct erofs_sb_info *sbi = inode->sbi;
>>          u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) *
>>                                    sizeof(struct z_erofs_lcluster_index) +
>> @@ -1260,11 +1402,17 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>>          if (!compressmeta)
>>                  return -ENOMEM;
>>
>> -       /* allocate main data buffer */
>> -       bh = erofs_balloc(DATA, 0, 0, 0);
>> -       if (IS_ERR(bh)) {
>> -               ret = PTR_ERR(bh);
>> -               goto err_free_meta;
>> +       if (!z_erofs_mt_enabled) {
>> +               /* allocate main data buffer */
>> +               bh = erofs_balloc(DATA, 0, 0, 0);
>> +               if (IS_ERR(bh)) {
>> +                       ret = PTR_ERR(bh);
>> +                       goto err_free_meta;
>> +               }
>> +               blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
>> +       } else {
>> +               bh = NULL;
>> +               blkaddr = EROFS_NULL_ADDR;
>>          }
>>
>>          /* initialize per-file compression setting */
>> @@ -1313,7 +1461,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>>                          goto err_bdrop;
>>          }
>>
>> -       blkaddr = erofs_mapbh(bh->block);       /* start_blkaddr */
>>          ctx.inode = inode;
>>          ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
>>          ctx.fd = fd;
>> @@ -1331,11 +1478,24 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>>                  if (ret)
>>                          goto err_free_idata;
>>   #ifdef EROFS_MT_ENABLED
>> -       } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) {
>> -               ismt = true;
>> -               ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, &compressed_blocks);
>> -               if (ret)
>> +       } else if (z_erofs_mt_enabled) {
>> +               struct z_erofs_compress_ictx *l_ictx;
>> +
>> +               l_ictx = malloc(sizeof(*l_ictx));
>> +               if (!l_ictx) {
>> +                       ret = -ENOMEM;
>>                          goto err_free_idata;
>> +               }
>> +
>> +               memcpy(l_ictx, &ctx, sizeof(*l_ictx));
>> +               init_list_head(&l_ictx->extents);
>> +
>> +               ret = z_erofs_mt_compress(l_ictx, ccfg);
>> +               if (ret) {
>> +                       free(l_ictx);
>> +                       goto err_free_idata;
>> +               }
>> +               return 0;
>>   #endif
>>          } else {
>>                  sctx.queue = g_queue;
>> @@ -1352,10 +1512,6 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>>                  compressed_blocks = sctx.blkaddr - blkaddr;
>>          }
>>
>> -       /* fall back to no compression mode */
>> -       DBG_BUGON(compressed_blocks < !!inode->idata_size);
>> -       compressed_blocks -= !!inode->idata_size;
>> -
>>          /* generate an extent for the deduplicated fragment */
>>          if (inode->fragment_size && !ctx.fragemitted) {
>>                  struct z_erofs_extent_item *ei;
>> @@ -1377,69 +1533,10 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
>>                  z_erofs_commit_extent(&sctx, ei);
>>          }
>>          z_erofs_fragments_commit(inode);
>> +       list_splice_tail(&sctx.extents, &ctx.extents);
>>
>> -       if (!ismt)
>> -               list_splice_tail(&sctx.extents, &ctx.extents);
>> -
>> -       z_erofs_write_indexes(&ctx);
>> -       legacymetasize = ctx.metacur - compressmeta;
>> -       /* estimate if data compression saves space or not */
>> -       if (!inode->fragment_size &&
>> -           compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
>> -           legacymetasize >= inode->i_size) {
>> -               z_erofs_dedupe_commit(true);
>> -               ret = -ENOSPC;
>> -               goto err_free_idata;
>> -       }
>> -       z_erofs_dedupe_commit(false);
>> -       z_erofs_write_mapheader(inode, compressmeta);
>> -
>> -       if (!ctx.fragemitted)
>> -               sbi->saved_by_deduplication += inode->fragment_size;
>> -
>> -       /* if the entire file is a fragment, a simplified form is used. */
>> -       if (inode->i_size <= inode->fragment_size) {
>> -               DBG_BUGON(inode->i_size < inode->fragment_size);
>> -               DBG_BUGON(inode->fragmentoff >> 63);
>> -               *(__le64 *)compressmeta =
>> -                       cpu_to_le64(inode->fragmentoff | 1ULL << 63);
>> -               inode->datalayout = EROFS_INODE_COMPRESSED_FULL;
>> -               legacymetasize = Z_EROFS_LEGACY_MAP_HEADER_SIZE;
>> -       }
>> -
>> -       if (compressed_blocks) {
>> -               ret = erofs_bh_balloon(bh, erofs_pos(sbi, compressed_blocks));
>> -               DBG_BUGON(ret != erofs_blksiz(sbi));
>> -       } else {
>> -               if (!cfg.c_fragments && !cfg.c_dedupe)
>> -                       DBG_BUGON(!inode->idata_size);
>> -       }
>> -
>> -       erofs_info("compressed %s (%llu bytes) into %u blocks",
>> -                  inode->i_srcpath, (unsigned long long)inode->i_size,
>> -                  compressed_blocks);
>> -
>> -       if (inode->idata_size) {
>> -               bh->op = &erofs_skip_write_bhops;
>> -               inode->bh_data = bh;
>> -       } else {
>> -               erofs_bdrop(bh, false);
>> -       }
>> -
>> -       inode->u.i_blocks = compressed_blocks;
>> -
>> -       if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL) {
>> -               inode->extent_isize = legacymetasize;
>> -       } else {
>> -               ret = z_erofs_convert_to_compacted_format(inode, blkaddr,
>> -                                                         legacymetasize,
>> -                                                         compressmeta);
>> -               DBG_BUGON(ret);
>> -       }
>> -       inode->compressmeta = compressmeta;
>> -       if (!erofs_is_packed_inode(inode))
>> -               erofs_droid_blocklist_write(inode, blkaddr, compressed_blocks);
>> -       return 0;
>> +       return z_erofs_finalize_compression(&ctx, bh, blkaddr,
>> +                                           compressed_blocks);
>>
>>   err_free_idata:
>>          if (inode->idata) {
>> @@ -1447,7 +1544,8 @@ err_free_idata:
>>                  inode->idata = NULL;
>>          }
>>   err_bdrop:
>> -       erofs_bdrop(bh, true);  /* revoke buffer */
>> +       if (bh)
>> +               erofs_bdrop(bh, true);  /* revoke buffer */
>>   err_free_meta:
>>          free(compressmeta);
>>          inode->compressmeta = NULL;
>> @@ -1598,8 +1696,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
>>          z_erofs_mt_enabled = false;
>>   #ifdef EROFS_MT_ENABLED
>>          if (cfg.c_mt_workers > 1) {
>> -               pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
>> -               pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
>> +               pthread_mutex_init(&z_erofs_mt_ctrl.file_mutex, NULL);
>> +               pthread_mutex_init(&z_erofs_mt_ctrl.work_mutex, NULL);
>>                  ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
>>                                              cfg.c_mt_workers,
>>                                              cfg.c_mt_workers << 2,
>> @@ -1626,11 +1724,17 @@ int z_erofs_compress_exit(void)
>>                  ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
>>                  if (ret)
>>                          return ret;
>> -               while (z_erofs_mt_ctrl.idle) {
>> +               while (z_erofs_mt_ctrl.work_idle) {
>>                          struct erofs_compress_work *tmp =
>> -                               z_erofs_mt_ctrl.idle->next;
>> -                       free(z_erofs_mt_ctrl.idle);
>> -                       z_erofs_mt_ctrl.idle = tmp;
>> +                               z_erofs_mt_ctrl.work_idle->next;
>> +                       free(z_erofs_mt_ctrl.work_idle);
>> +                       z_erofs_mt_ctrl.work_idle = tmp;
>> +               }
>> +               while (z_erofs_mt_ctrl.file_idle) {
>> +                       struct z_erofs_mt_file *tmp =
>> +                               z_erofs_mt_ctrl.file_idle->next;
>> +                       free(z_erofs_mt_ctrl.file_idle);
>> +                       z_erofs_mt_ctrl.file_idle = tmp;
>>                  }
>>   #endif
>>          }
>> diff --git a/lib/inode.c b/lib/inode.c
>> index 7dfb021..6d1faae 100644
>> --- a/lib/inode.c
>> +++ b/lib/inode.c
>> @@ -29,6 +29,8 @@
>>   #include "erofs/fragments.h"
>>   #include "liberofs_private.h"
>>
>> +extern bool z_erofs_mt_enabled;
>> +
>>   #define S_SHIFT                 12
>>   static unsigned char erofs_ftype_by_mode[S_IFMT >> S_SHIFT] = {
>>          [S_IFREG >> S_SHIFT]  = EROFS_FT_REG_FILE,
>> @@ -1036,6 +1038,9 @@ struct erofs_inode *erofs_new_inode(void)
>>          inode->i_ino[0] = sbi.inos++;   /* inode serial number */
>>          inode->i_count = 1;
>>          inode->datalayout = EROFS_INODE_FLAT_PLAIN;
>> +#ifdef EROFS_MT_ENABLED
>> +       inode->mt_desc = NULL;
>> +#endif
>>
>>          init_list_head(&inode->i_hash);
>>          init_list_head(&inode->i_subdirs);
>> @@ -1100,6 +1105,10 @@ static void erofs_fixup_meta_blkaddr(struct erofs_inode *rootdir)
>>          rootdir->nid = (off - meta_offset) >> EROFS_ISLOTBITS;
>>   }
>>
>> +#ifdef EROFS_MT_ENABLED
>> +#define EROFS_MT_QUEUE_SIZE 256
>> +struct erofs_inode_fifo *z_erofs_mt_queue;
>> +#endif
>>
>>   static int erofs_mkfs_handle_symlink(struct erofs_inode *inode)
>>   {
>> @@ -1143,6 +1152,21 @@ static int erofs_mkfs_handle_file(struct erofs_inode *inode)
>>          return 0;
>>   }
>>
>> +static int erofs_mkfs_issue_compress(struct erofs_inode *inode)
>> +{
>> +       if (!inode->i_size || S_ISLNK(inode->i_mode))
>> +               return 0;
>> +
>> +       if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
>> +               int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
>> +               if (fd < 0)
>> +                       return -errno;
>> +               return erofs_write_compressed_file(inode, fd, 0);
>> +       }
>> +
>> +       return 0;
>> +}
>> +
>>   static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>>                                   struct list_head *dirs)
>>   {
>> @@ -1152,6 +1176,14 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>>          struct erofs_dentry *d;
>>          unsigned int nr_subdirs = 0, i_nlink;
>>
>> +       ret = erofs_scan_file_xattrs(dir);
>> +       if (ret < 0)
>> +               return ret;
>> +
>> +       ret = erofs_prepare_xattr_ibody(dir);
>> +       if (ret < 0)
>> +               return ret;
>> +
>>          _dir = opendir(dir->i_srcpath);
>>          if (!_dir) {
>>                  erofs_err("failed to opendir at %s: %s",
>> @@ -1159,7 +1191,6 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>>                  return -errno;
>>          }
>>
>> -       nr_subdirs = 0;
>>          while (1) {
>>                  /*
>>                   * set errno to 0 before calling readdir() in order to
>> @@ -1195,13 +1226,15 @@ static int erofs_mkfs_handle_dir(struct erofs_inode *dir,
>>          if (ret)
>>                  return ret;
>>
>> -       ret = erofs_prepare_inode_buffer(dir);
>> -       if (ret)
>> -               return ret;
>> -       dir->bh->op = &erofs_skip_write_bhops;
>> +       if (!z_erofs_mt_enabled) {
>> +               ret = erofs_prepare_inode_buffer(dir);
>> +               if (ret)
>> +                       return ret;
>> +               dir->bh->op = &erofs_skip_write_bhops;
>>
>> -       if (IS_ROOT(dir))
>> -               erofs_fixup_meta_blkaddr(dir);
>> +               if (IS_ROOT(dir))
>> +                       erofs_fixup_meta_blkaddr(dir);
>> +       }
>>
>>          i_nlink = 0;
>>          list_for_each_entry(d, &dir->i_subdirs, d_child) {
>> @@ -1300,11 +1333,13 @@ static int erofs_mkfs_build_tree(struct erofs_inode *dir,
>>
>>          if (S_ISDIR(dir->i_mode))
>>                  return erofs_mkfs_handle_dir(dir, dirs);
>> +       else if (z_erofs_mt_enabled)
>> +               return erofs_mkfs_issue_compress(dir);
>>          else
>>                  return erofs_mkfs_handle_file(dir);
>>   }
>>
>> -struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>> +struct erofs_inode *__erofs_mkfs_build_tree_from_path(const char *path)
>>   {
>>          LIST_HEAD(dirs);
>>          struct erofs_inode *inode, *root, *dumpdir;
>> @@ -1325,7 +1360,8 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>>                  list_del(&inode->i_subdirs);
>>                  init_list_head(&inode->i_subdirs);
>>
>> -               erofs_mkfs_print_progressinfo(inode);
>> +               if (!z_erofs_mt_enabled)
>> +                       erofs_mkfs_print_progressinfo(inode);
>>
>>                  err = erofs_mkfs_build_tree(inode, &dirs);
>>                  if (err) {
>> @@ -1333,15 +1369,215 @@ struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>>                          break;
>>                  }
>>
>> +               if (!z_erofs_mt_enabled) {
>> +                       if (S_ISDIR(inode->i_mode)) {
>> +                               inode->next_dirwrite = dumpdir;
>> +                               dumpdir = inode;
>> +                       } else {
>> +                               erofs_iput(inode);
>> +                       }
>> +#ifdef EROFS_MT_ENABLED
> Missing the changes we discussed in v1 here ?
Sorry that I missed it. Fixed.
>> +               } else {
>> +                       erofs_push_inode_fifo(z_erofs_mt_queue, &inode);
>> +#endif
>> +               }
>> +       } while (!list_empty(&dirs));
>> +
>> +       if (!z_erofs_mt_enabled)
>> +               erofs_mkfs_dumpdir(dumpdir);
>> +#ifdef EROFS_MT_ENABLED
>> +       else
>> +               erofs_push_inode_fifo(z_erofs_mt_queue, &dumpdir);
>> +#endif
>> +       return root;
>> +}
>> +
>> +#ifdef EROFS_MT_ENABLED
>> +pthread_t z_erofs_mt_traverser;
>> +
>> +void *z_erofs_mt_traverse_task(void *path)
>> +{
>> +       pthread_exit((void *)__erofs_mkfs_build_tree_from_path(path));
>> +}
>> +
>> +static int z_erofs_mt_reap_compressed(struct erofs_inode *inode)
>> +{
>> +       struct z_erofs_mt_file *desc = inode->mt_desc;
>> +       int fd = desc->fd;
>> +       int ret = 0;
>> +
>> +       pthread_mutex_lock(&desc->mutex);
>> +       while (desc->nfini != desc->total)
>> +               pthread_cond_wait(&desc->cond, &desc->mutex);
>> +       pthread_mutex_unlock(&desc->mutex);
>> +
>> +       ret = z_erofs_mt_reap(desc);
>> +       if (ret == -ENOSPC) {
>> +               ret = lseek(fd, 0, SEEK_SET);
>> +               if (ret < 0)
>> +                       return -errno;
>> +
>> +               ret = write_uncompressed_file_from_fd(inode, fd);
>> +       }
>> +
>> +       close(fd);
>> +       return ret;
>> +}
>> +
>> +static int z_erofs_mt_reap_inodes()
>> +{
>> +       struct erofs_inode *inode, *dumpdir;
>> +       int ret = 0;
>> +
>> +       dumpdir = NULL;
>> +       while (true) {
>> +               inode = *(struct erofs_inode **)erofs_pop_inode_fifo(
>> +                       z_erofs_mt_queue);
>> +               if (!inode)
>> +                       break;
>> +
>> +               erofs_mkfs_print_progressinfo(inode);
>> +
>>                  if (S_ISDIR(inode->i_mode)) {
>> +                       ret = erofs_prepare_inode_buffer(inode);
>> +                       if (ret)
>> +                               goto out;
>> +                       inode->bh->op = &erofs_skip_write_bhops;
>> +
>> +                       if (IS_ROOT(inode))
>> +                               erofs_fixup_meta_blkaddr(inode);
>> +
>>                          inode->next_dirwrite = dumpdir;
>>                          dumpdir = inode;
>> +                       continue;
>> +               }
>> +
>> +               if (inode->mt_desc) {
>> +                       ret = z_erofs_mt_reap_compressed(inode);
>> +               } else if (S_ISLNK(inode->i_mode)) {
>> +                       ret = erofs_mkfs_handle_symlink(inode);
>> +               } else if (!inode->i_size) {
>> +                       ret = 0;
>>                  } else {
>> -                       erofs_iput(inode);
>> +                       int fd = open(inode->i_srcpath, O_RDONLY | O_BINARY);
>> +                       if (fd < 0)
>> +                               return -errno;
>> +
>> +                       if (cfg.c_chunkbits)
>> +                               ret = erofs_write_chunked_file(inode, fd, 0);
>> +                       else
>> +                               ret = write_uncompressed_file_from_fd(inode,
>> +                                                                     fd);
>> +                       close(fd);
>>                  }
>> -       } while (!list_empty(&dirs));
>> +               if (ret)
>> +                       goto out;
>> +
>> +               erofs_prepare_inode_buffer(inode);
>> +               erofs_write_tail_end(inode);
>> +               erofs_iput(inode);
>> +       }
>>
>>          erofs_mkfs_dumpdir(dumpdir);
>> +
>> +out:
>> +       return ret;
>> +}
>> +
>> +struct erofs_inode_fifo *erofs_alloc_inode_fifo(size_t size, size_t elem_size)
>> +{
>> +       struct erofs_inode_fifo *q = malloc(sizeof(*q));
>> +
>> +       pthread_mutex_init(&q->lock, NULL);
>> +       pthread_cond_init(&q->empty, NULL);
>> +       pthread_cond_init(&q->full, NULL);
>> +
>> +       q->size = size;
>> +       q->elem_size = elem_size;
>> +       q->head = 0;
>> +       q->tail = 0;
>> +       q->buf = calloc(size, elem_size);
>> +       if (!q->buf)
>> +               return ERR_PTR(-ENOMEM);
>> +
>> +       return q;
>> +}
>> +
>> +void erofs_push_inode_fifo(struct erofs_inode_fifo *q, void *elem)
>> +{
>> +       pthread_mutex_lock(&q->lock);
>> +
>> +       while ((q->tail + 1) % q->size == q->head)
>> +               pthread_cond_wait(&q->full, &q->lock);
>> +
>> +       memcpy(q->buf + q->tail * q->elem_size, elem, q->elem_size);
>> +       q->tail = (q->tail + 1) % q->size;
>> +
>> +       pthread_cond_signal(&q->empty);
>> +       pthread_mutex_unlock(&q->lock);
>> +}
>> +
>> +void *erofs_pop_inode_fifo(struct erofs_inode_fifo *q)
>> +{
>> +       void *elem;
>> +
>> +       pthread_mutex_lock(&q->lock);
>> +
>> +       while (q->head == q->tail)
>> +               pthread_cond_wait(&q->empty, &q->lock);
>> +
>> +       elem = q->buf + q->head * q->elem_size;
>> +       q->head = (q->head + 1) % q->size;
>> +
>> +       pthread_cond_signal(&q->full);
>> +       pthread_mutex_unlock(&q->lock);
>> +
>> +       return elem;
>> +}
>> +
>> +void erofs_destroy_inode_fifo(struct erofs_inode_fifo *q)
>> +{
>> +       pthread_mutex_destroy(&q->lock);
>> +       pthread_cond_destroy(&q->empty);
>> +       pthread_cond_destroy(&q->full);
>> +       free(q->buf);
>> +       free(q);
>> +}
>> +
>> +#endif
>> +
>> +struct erofs_inode *erofs_mkfs_build_tree_from_path(const char *path)
>> +{
>> +#ifdef EROFS_MT_ENABLED
>> +       int err;
>> +#endif
>> +       struct erofs_inode *root = NULL;
>> +
>> +       if (!z_erofs_mt_enabled)
>> +               return __erofs_mkfs_build_tree_from_path(path);
>> +
>> +#ifdef EROFS_MT_ENABLED
>> +       z_erofs_mt_queue = erofs_alloc_inode_fifo(EROFS_MT_QUEUE_SIZE,
>> +                                            sizeof(struct erofs_inode *));
> Nit:
> z_erofs_mt_fifo or z_erofs_mt_inode_fifo ?
Fixed.
> In addition,
> the element in the fifo is struct erofs_inode **, so better to use
> sizeof(struct erofs_inode **),
> although they are both pointers ...
>
>> +       if (IS_ERR(z_erofs_mt_queue))
>> +               return ERR_CAST(z_erofs_mt_queue);
>> +
>> +       err = pthread_create(&z_erofs_mt_traverser, NULL,
>> +                            z_erofs_mt_traverse_task, (void *)path);
>> +       if (err)
>> +               return ERR_PTR(err);
>> +
>> +       err = z_erofs_mt_reap_inodes();
>> +       if (err)
>> +               return ERR_PTR(err);
>> +
>> +       err = pthread_join(z_erofs_mt_traverser, (void *)&root);
>> +       if (err)
>> +               return ERR_PTR(err);
>> +
>> +       erofs_destroy_inode_fifo(z_erofs_mt_queue);
>> +#endif
>> +
>>          return root;
>>   }
>>
>> --
>> 2.44.0
>>
> Thanks,
> Jianan


More information about the Linux-erofs mailing list