[PATCH v2 0/7] erofs-utils: mkfs: introduce multi-threaded compression

Yifan Zhao zhaoyifan at sjtu.edu.cn
Tue Feb 20 18:55:18 AEDT 2024


Change log since v1:
- Re-implement workqueue API instead of using xfsprogs' workqueue
- Use per-worker tmpfile for multi-threaded mkfs

Gao Xiang (1):
  erofs-utils: add a helper to get available processors

Yifan Zhao (6):
  erofs-utils: introduce multi-threading framework
  erofs-utils: mkfs: add --worker=# parameter
  erofs-utils: mkfs: optionally print warning in erofs_compressor_init
  erofs-utils: mkfs: introduce inner-file multi-threaded compression
  erofs-utils: mkfs: introduce inter-file multi-threaded compression
  erofs-utils: mkfs: use per-worker tmpfile for multi-threaded mkfs

 configure.ac                |  17 +
 include/erofs/compress.h    |  18 +
 include/erofs/config.h      |   5 +
 include/erofs/internal.h    |   6 +
 include/erofs/list.h        |   8 +
 include/erofs/queue.h       |  22 +
 include/erofs/workqueue.h   |  38 ++
 lib/Makefile.am             |   4 +
 lib/compress.c              | 836 ++++++++++++++++++++++++++++++------
 lib/compressor.c            |   7 +-
 lib/compressor.h            |   5 +-
 lib/compressor_deflate.c    |   4 +-
 lib/compressor_libdeflate.c |   4 +-
 lib/compressor_liblzma.c    |   5 +-
 lib/compressor_lz4.c        |   2 +-
 lib/compressor_lz4hc.c      |   2 +-
 lib/config.c                |  16 +
 lib/inode.c                 | 302 ++++++++++---
 lib/queue.c                 |  64 +++
 lib/workqueue.c             | 132 ++++++
 mkfs/main.c                 |  38 ++
 21 files changed, 1342 insertions(+), 193 deletions(-)
 create mode 100644 include/erofs/queue.h
 create mode 100644 include/erofs/workqueue.h
 create mode 100644 lib/queue.c
 create mode 100644 lib/workqueue.c

Interdiff against v1:
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
index a11a8fc..857947b 100644
--- a/include/erofs/workqueue.h
+++ b/include/erofs/workqueue.h
@@ -1,48 +1,38 @@
-/* SPDX-License-Identifier: GPL-2.0+ */
-/*
- * Copyright (C) 2017 Oracle.  All Rights Reserved.
- * Author: Darrick J. Wong <darrick.wong at oracle.com>
- */
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
 #ifndef __EROFS_WORKQUEUE_H
 #define __EROFS_WORKQUEUE_H
 
 #include "internal.h"
 
-struct erofs_workqueue;
 struct erofs_work;
 
-typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq,
-				    struct erofs_work *work);
+typedef void erofs_wq_func_t(struct erofs_work *);
 typedef void erofs_wq_priv_fini_t(void *);
 
 struct erofs_work {
-	struct erofs_workqueue	*queue;
+	void (*func)(struct erofs_work *work);
 	struct erofs_work *next;
-	erofs_workqueue_func_t	*function;
-	void 			*private;
+	void *priv;
 };
 
 struct erofs_workqueue {
-	pthread_t		*threads;
-	struct erofs_work	*next_item;
-	struct erofs_work	*last_item;
+	struct erofs_work *head;
+	struct erofs_work *tail;
 	pthread_mutex_t lock;
-	pthread_cond_t		wakeup;
-	unsigned int		item_count;
-	unsigned int		thread_count;
-	bool			terminate;
-	bool			terminated;
-	int			max_queued;
-	pthread_cond_t		queue_full;
-	size_t			private_size;
-	erofs_wq_priv_fini_t	*private_fini;
+	pthread_cond_t cond_empty;
+	pthread_cond_t cond_full;
+	pthread_t *workers;
+	unsigned int nworker;
+	unsigned int max_jobs;
+	unsigned int job_count;
+	bool shutdown;
+	size_t priv_size;
+	erofs_wq_priv_fini_t *priv_fini;
 };
 
-int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
-			   erofs_wq_priv_fini_t *private_fini,
-			   unsigned int nr_workers, unsigned int max_queue);
-int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi);
-int erofs_workqueue_terminate(struct erofs_workqueue *wq);
-void erofs_workqueue_destroy(struct erofs_workqueue *wq);
-
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work);
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq);
 #endif
\ No newline at end of file
diff --git a/lib/compress.c b/lib/compress.c
index d6c59b0..3fae260 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -8,6 +8,9 @@
 #ifndef _LARGEFILE64_SOURCE
 #define _LARGEFILE64_SOURCE
 #endif
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -23,6 +26,13 @@
 #ifdef EROFS_MT_ENABLED
 #include "erofs/workqueue.h"
 #endif
+#ifdef HAVE_LINUX_FALLOC_H
+#include <linux/falloc.h>
+#endif
+
+#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE)
+#define USE_PER_WORKER_TMPFILE 1
+#endif
 
 /* compressing configuration specified by users */
 struct erofs_compress_cfg {
@@ -59,6 +69,7 @@ struct z_erofs_vle_compress_ctx {
 
 	int seg_num, seg_idx;
 	FILE *tmpfile;
+	off_t tmpfile_off;
 };
 
 struct z_erofs_write_index_ctx {
@@ -75,6 +86,7 @@ struct erofs_compress_wq_private {
 	u8 *queue;
 	char *destbuf;
 	struct erofs_compress_cfg *ccfg;
+	FILE* tmpfile;
 };
 
 struct erofs_compress_work {
@@ -402,6 +414,7 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
 		ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
 		if (ret != 1)
 			return -EIO;
+		fflush(ctx->tmpfile);
 	} else {
 		erofs_dbg("Writing %u uncompressed data to block %u", count,
 			  ctx->blkaddr);
@@ -1073,6 +1086,7 @@ void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx,
 	ctx->tof_chksum = tof_chksum;
 	ctx->fd = fd;
 	ctx->tmpfile = NULL;
+	ctx->tmpfile_off = 0;
 	init_list_head(&ctx->extents);
 }
 
@@ -1169,7 +1183,7 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 	struct erofs_compress_cfg *lc;
 	int ret;
 
-	if (!priv->init) {
+	if (unlikely(!priv->init)) {
 		priv->init = true;
 
 		priv->queue = malloc(EROFS_COMPR_QUEUE_SZ);
@@ -1185,6 +1199,16 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
 				    sizeof(struct erofs_compress_cfg));
 		if (!priv->ccfg)
 			return -ENOMEM;
+
+#ifdef USE_PER_WORKER_TMPFILE
+#ifndef HAVE_TMPFILE64
+		priv->tmpfile = tmpfile();
+#else
+		priv->tmpfile = tmpfile64();
+#endif
+		if (!priv->tmpfile)
+			return -errno;
+#endif
 	}
 
 	lc = &priv->ccfg[alg_id];
@@ -1214,15 +1238,18 @@ void z_erofs_mt_private_fini(void *private)
 		free(priv->ccfg);
 		free(priv->destbuf);
 		free(priv->queue);
+#ifdef USE_PER_WORKER_TMPFILE
+		fclose(priv->tmpfile);
+#endif
 		priv->init = false;
 	}
 }
 
-void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work)
+void z_erofs_mt_work(struct erofs_work *work)
 {
 	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
 	struct z_erofs_vle_compress_ctx *ctx = &cwork->ctx;
-	struct erofs_compress_wq_private *priv = work->private;
+	struct erofs_compress_wq_private *priv = work->priv;
 	struct erofs_compress_file *cfile = cwork->file;
 	erofs_blk_t blkaddr = ctx->blkaddr;
 	u64 offset = ctx->seg_idx * cfg.c_mt_segment_size;
@@ -1237,7 +1264,14 @@ void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work)
 	ctx->queue = priv->queue;
 	ctx->destbuf = priv->destbuf;
 	ctx->chandle = &priv->ccfg[cwork->alg_id].handle;
-
+#ifdef USE_PER_WORKER_TMPFILE
+	ctx->tmpfile = priv->tmpfile;
+	ctx->tmpfile_off = ftell(ctx->tmpfile);
+	if (ctx->tmpfile_off == -1) {
+		ret = -errno;
+		goto out;
+	}
+#else
 #ifdef HAVE_TMPFILE64
 	ctx->tmpfile = tmpfile64();
 #else
@@ -1247,13 +1281,13 @@ void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work)
 		ret = -errno;
 		goto out;
 	}
+	ctx->tmpfile_off = 0;
+#endif
 
 	ret = z_erofs_compress_file(ctx, offset, blkaddr);
 	if (ret)
 		goto out;
 
-	fflush(ctx->tmpfile);
-
 	if (ctx->seg_idx == ctx->seg_num - 1)
 		ret = z_erofs_handle_fragments(ctx);
 
@@ -1273,6 +1307,7 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 	struct erofs_sb_info *sbi = cur->ctx.inode->sbi;
 	struct z_erofs_write_index_ctx *ictx = cfile->ictx;
 	char *memblock = NULL;
+	size_t size = 0;
 	int ret = 0, lret;
 
 	while (cur != NULL) {
@@ -1289,28 +1324,32 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 			goto out;
 		}
 
-		memblock = realloc(memblock,
-				   ctx->compressed_blocks * erofs_blksiz(sbi));
+		size = ctx->compressed_blocks * erofs_blksiz(sbi);
+		memblock = realloc(memblock, size);
 		if (!memblock) {
 			if (!ret)
 				ret = -ENOMEM;
 			goto out;
 		}
 
-		lret = fseek(ctx->tmpfile, 0, SEEK_SET);
-		if (lret) {
+		lret = pread(fileno(ctx->tmpfile), memblock, size,
+			     ctx->tmpfile_off);
+		if (lret != size) {
 			if (!ret)
-				ret = lret;
+				ret = -errno;
 			goto out;
 		}
 
-		lret = fread(memblock, erofs_blksiz(sbi),
-			     ctx->compressed_blocks, ctx->tmpfile);
-		if (lret != ctx->compressed_blocks) {
+#ifdef USE_PER_WORKER_TMPFILE
+		lret = fallocate(fileno(ctx->tmpfile),
+				 FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+				 ctx->tmpfile_off, size);
+		if (lret) {
 			if (!ret)
-				ret = -EIO;
+				ret = -errno;
 			goto out;
 		}
+#endif
 
 		lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks,
 				 ctx->compressed_blocks);
@@ -1322,8 +1361,9 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr,
 		*compressed_blocks += ctx->compressed_blocks;
 
 out:
-		if (ctx->tmpfile)
+#ifndef USE_PER_WORKER_TMPFILE
 		fclose(ctx->tmpfile);
+#endif
 
 		tmp = cur->next;
 		pthread_mutex_lock(&work_mutex);
@@ -1405,7 +1445,7 @@ struct erofs_compress_file *z_erofs_mt_do_compress(
 		work->dict_size = ccfg->handle.dict_size;
 
 		work->file = cfile;
-		work->work.function = z_erofs_mt_work;
+		work->work.func = z_erofs_mt_work;
 
 		erofs_workqueue_add(&wq, &work->work);
 	}
@@ -1749,10 +1789,10 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 	if (cfg.c_mt_worker_num == 1) {
 		mt_enabled = false;
 	} else {
-		ret = erofs_workqueue_create(
-			&wq, sizeof(struct erofs_compress_wq_private),
-			z_erofs_mt_private_fini, cfg.c_mt_worker_num,
-			cfg.c_mt_worker_num << 2);
+		ret = erofs_workqueue_init(
+			&wq, cfg.c_mt_worker_num, cfg.c_mt_worker_num << 2,
+			sizeof(struct erofs_compress_wq_private),
+			z_erofs_mt_private_fini);
 		mt_enabled = !ret;
 	}
 #else
@@ -1777,10 +1817,9 @@ int z_erofs_compress_exit(void)
 
 	if (mt_enabled) {
 #ifdef EROFS_MT_ENABLED
-		ret = erofs_workqueue_terminate(&wq);
+		ret = erofs_workqueue_shutdown(&wq);
 		if (ret)
 			return ret;
-		erofs_workqueue_destroy(&wq);
 		while (work_idle) {
 			struct erofs_compress_work *tmp = work_idle->next;
 			free(work_idle);
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 01d12d9..3ec6142 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -1,222 +1,132 @@
-// SPDX-License-Identifier: GPL-2.0+
-/*
- * Copyright (C) 2017 Oracle.  All Rights Reserved.
- * Author: Darrick J. Wong <darrick.wong at oracle.com>
- */
+// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
 #include <pthread.h>
-#include <signal.h>
 #include <stdlib.h>
-#include <string.h>
-#include <stdint.h>
-#include <stdbool.h>
-#include <errno.h>
-#include <assert.h>
 #include "erofs/workqueue.h"
 
-/* Main processing thread */
-static void *workqueue_thread(void *arg)
+static void *worker_thread(void *arg)
 {
 	struct erofs_workqueue *wq = arg;
-	struct erofs_work		*wi;
-	void				*private = NULL;
+	struct erofs_work *work;
+	void *priv = NULL;
 
-	if (wq->private_size) {
-		private = calloc(1, wq->private_size);
-		assert(private);
+	if (wq->priv_size) {
+		priv = calloc(wq->priv_size, 1);
+		assert(priv);
 	}
 
-	/*
-	 * Loop pulling work from the passed in work queue.
-	 * Check for notification to exit after every chunk of work.
-	 */
-	while (1) {
+	while (true) {
 		pthread_mutex_lock(&wq->lock);
 
-		/*
-		 * Wait for work.
-		 */
-		while (wq->next_item == NULL && !wq->terminate) {
-			assert(wq->item_count == 0);
-			pthread_cond_wait(&wq->wakeup, &wq->lock);
-		}
-		if (wq->next_item == NULL && wq->terminate) {
+		while (wq->job_count == 0 && !wq->shutdown)
+			pthread_cond_wait(&wq->cond_empty, &wq->lock);
+		if (wq->job_count == 0 && wq->shutdown) {
 			pthread_mutex_unlock(&wq->lock);
 			break;
 		}
 
-		/*
-		 *  Dequeue work from the head of the list. If the queue was
-		 *  full then send a wakeup if we're configured to do so.
-		 */
-		assert(wq->item_count > 0);
-		if (wq->max_queued)
-			pthread_cond_broadcast(&wq->queue_full);
-
-		wi = wq->next_item;
-		wq->next_item = wi->next;
-		wq->item_count--;
-
-		if (wq->max_queued && wq->next_item) {
-			/* more work, wake up another worker */
-			pthread_cond_signal(&wq->wakeup);
-		}
+		work = wq->head;
+		wq->head = work->next;
+		if (!wq->head)
+			wq->tail = NULL;
+		wq->job_count--;
+
+		if (wq->job_count == wq->max_jobs - 1)
+			pthread_cond_broadcast(&wq->cond_full);
+
 		pthread_mutex_unlock(&wq->lock);
 
-		wi->private = private;
-		(wi->function)(wq, wi);
+		work->priv = priv;
+		work->func(work);
 	}
 
-	if (private) {
-		assert(wq->private_fini);
-		(wq->private_fini)(private);
-		free(private);
+	if (priv) {
+		assert(wq->priv_fini);
+		(wq->priv_fini)(priv);
+		free(priv);
 	}
 
 	return NULL;
 }
 
-/* Allocate a work queue and threads.  Returns zero or negative error code. */
-int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
-			   erofs_wq_priv_fini_t *priv_fini,
-			   unsigned int nr_workers, unsigned int max_queue)
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini)
 {
 	unsigned int i;
-	int			err = 0;
-
-	memset(wq, 0, sizeof(*wq));
-	err = -pthread_cond_init(&wq->wakeup, NULL);
-	if (err)
-		return err;
-	err = -pthread_cond_init(&wq->queue_full, NULL);
-	if (err)
-		goto out_wake;
-	err = -pthread_mutex_init(&wq->lock, NULL);
-	if (err)
-		goto out_cond;
-
-	wq->private_size = private_size;
-	wq->private_fini = priv_fini;
-	wq->thread_count = nr_workers;
-	wq->max_queued = max_queue;
-	wq->threads = malloc(nr_workers * sizeof(pthread_t));
-	if (!wq->threads) {
-		err = -errno;
-		goto out_mutex;
-	}
-	wq->terminate = false;
-	wq->terminated = false;
 
-	for (i = 0; i < nr_workers; i++) {
-		err = -pthread_create(&wq->threads[i], NULL, workqueue_thread,
-				wq);
-		if (err)
-			break;
-	}
+	if (!wq || nworker <= 0 || max_jobs <= 0)
+		return -EINVAL;
 
-	/*
-	 * If we encounter errors here, we have to signal and then wait for all
-	 * the threads that may have been started running before we can destroy
-	 * the workqueue.
-	 */
-	if (err)
-		erofs_workqueue_destroy(wq);
-	return err;
-out_mutex:
-	pthread_mutex_destroy(&wq->lock);
-out_cond:
-	pthread_cond_destroy(&wq->queue_full);
-out_wake:
-	pthread_cond_destroy(&wq->wakeup);
-	return err;
-}
+	wq->head = wq->tail = NULL;
+	wq->nworker = nworker;
+	wq->max_jobs = max_jobs;
+	wq->job_count = 0;
+	wq->shutdown = false;
+	wq->priv_size = priv_size;
+	wq->priv_fini = priv_fini;
+	pthread_mutex_init(&wq->lock, NULL);
+	pthread_cond_init(&wq->cond_empty, NULL);
+	pthread_cond_init(&wq->cond_full, NULL);
 
-/*
- * Create a work item consisting of a function and some arguments and schedule
- * the work item to be run via the thread pool.  Returns zero or a negative
- * error code.
- */
-int erofs_workqueue_add(struct erofs_workqueue	*wq,
-			struct erofs_work *wi)
-{
-	int	ret;
+	wq->workers = malloc(nworker * sizeof(pthread_t));
+	if (!wq->workers)
+		return -ENOMEM;
 
-	assert(!wq->terminated);
+	for (i = 0; i < nworker; i++) {
+		if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
+			while (i--)
+				pthread_cancel(wq->workers[i]);
+			free(wq->workers);
+			return -ENOMEM;
+		}
+	}
 
-	if (wq->thread_count == 0) {
-		(wi->function)(wq, wi);
 	return 0;
 }
 
-	wi->queue = wq;
-	wi->next = NULL;
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work)
+{
+	if (!wq || !work)
+		return -EINVAL;
 
-	/* Now queue the new work structure to the work queue. */
 	pthread_mutex_lock(&wq->lock);
-restart:
-	if (wq->next_item == NULL) {
-		assert(wq->item_count == 0);
-		ret = -pthread_cond_signal(&wq->wakeup);
-		if (ret) {
-			pthread_mutex_unlock(&wq->lock);
-			return ret;
-		}
-		wq->next_item = wi;
-	} else {
-		/* throttle on a full queue if configured */
-		if (wq->max_queued && wq->item_count == wq->max_queued) {
-			pthread_cond_wait(&wq->queue_full, &wq->lock);
-			/*
-			 * Queue might be empty or even still full by the time
-			 * we get the lock back, so restart the lookup so we do
-			 * the right thing with the current state of the queue.
-			 */
-			goto restart;
-		}
-		wq->last_item->next = wi;
-	}
-	wq->last_item = wi;
-	wq->item_count++;
+
+	while (wq->job_count == wq->max_jobs)
+		pthread_cond_wait(&wq->cond_full, &wq->lock);
+
+	work->next = NULL;
+	if (!wq->head)
+		wq->head = work;
+	else
+		wq->tail->next = work;
+	wq->tail = work;
+	wq->job_count++;
+
+	pthread_cond_signal(&wq->cond_empty);
 	pthread_mutex_unlock(&wq->lock);
+
 	return 0;
 }
 
-/*
- * Wait for all pending work items to be processed and tear down the
- * workqueue thread pool.  Returns zero or a negative error code.
- */
-int erofs_workqueue_terminate(struct erofs_workqueue *wq)
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq)
 {
 	unsigned int i;
-	int			ret;
-
-	pthread_mutex_lock(&wq->lock);
-	wq->terminate = true;
-	pthread_mutex_unlock(&wq->lock);
 
-	ret = -pthread_cond_broadcast(&wq->wakeup);
-	if (ret)
-		return ret;
-
-	for (i = 0; i < wq->thread_count; i++) {
-		ret = -pthread_join(wq->threads[i], NULL);
-		if (ret)
-			return ret;
-	}
+	if (!wq)
+		return -EINVAL;
 
 	pthread_mutex_lock(&wq->lock);
-	wq->terminated = true;
+	wq->shutdown = true;
+	pthread_cond_broadcast(&wq->cond_empty);
 	pthread_mutex_unlock(&wq->lock);
-	return 0;
-}
 
-/* Tear down the workqueue. */
-void erofs_workqueue_destroy(struct erofs_workqueue *wq)
-{
-	assert(wq->terminated);
+	for (i = 0; i < wq->nworker; i++)
+		pthread_join(wq->workers[i], NULL);
 
-	free(wq->threads);
+	free(wq->workers);
 	pthread_mutex_destroy(&wq->lock);
-	pthread_cond_destroy(&wq->wakeup);
-	pthread_cond_destroy(&wq->queue_full);
-	memset(wq, 0, sizeof(*wq));
+	pthread_cond_destroy(&wq->cond_empty);
+	pthread_cond_destroy(&wq->cond_full);
+
+	return 0;
 }
\ No newline at end of file
-- 
2.43.2



More information about the Linux-erofs mailing list