[PATCH v2 1/7] erofs-utils: introduce multi-threading framework
Gao Xiang
hsiangkao at linux.alibaba.com
Thu Feb 22 13:37:13 AEDT 2024
On 2024/2/20 15:55, Yifan Zhao wrote:
> Add a workqueue implementation for multi-threading support inspired by
> xfsprogs.
>
> Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
> Suggested-by: Gao Xiang <hsiangkao at linux.alibaba.com>
> ---
> configure.ac | 16 +++++
> include/erofs/internal.h | 3 +
> include/erofs/workqueue.h | 38 +++++++++++
> lib/Makefile.am | 4 ++
> lib/workqueue.c | 132 ++++++++++++++++++++++++++++++++++++++
> 5 files changed, 193 insertions(+)
> create mode 100644 include/erofs/workqueue.h
> create mode 100644 lib/workqueue.c
>
> diff --git a/configure.ac b/configure.ac
> index bf6e99f..53306c3 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
>
> AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
>
> +AC_MSG_CHECKING([whether to enable multi-threading support])
> +AC_ARG_ENABLE([multithreading],
> + AS_HELP_STRING([--enable-multithreading],
> + [enable multi-threading support @<:@default=no@:>@]),
> + [enable_multithreading="$enableval"],
> + [enable_multithreading="no"])
> +AC_MSG_RESULT([$enable_multithreading])
> +
> AC_ARG_ENABLE([debug],
> [AS_HELP_STRING([--enable-debug],
> [enable debugging mode @<:@default=no@:>@])],
> @@ -288,6 +296,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
> [erofs_cv_max_block_size=4096]))
> ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
>
> +# Configure multi-threading support
> +AS_IF([test "x$enable_multithreading" != "xno"], [
> + AC_CHECK_HEADERS([pthread.h])
> + AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
> + AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
> +], [])
> +
> # Configure debug mode
> AS_IF([test "x$enable_debug" != "xno"], [], [
> dnl Turn off all assert checking.
> @@ -467,6 +482,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
> AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
>
> # Set up needed symbols, conditionals and compiler/linker flags
> +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
> AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
> AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
> AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
> index 82797e1..954aef4 100644
> --- a/include/erofs/internal.h
> +++ b/include/erofs/internal.h
> @@ -22,6 +22,9 @@ typedef unsigned short umode_t;
> #include <sys/types.h> /* for off_t definition */
> #include <sys/stat.h> /* for S_ISCHR definition */
> #include <stdio.h>
> +#ifdef HAVE_PTHREAD_H
> +#include <pthread.h>
> +#endif
>
> #ifndef PATH_MAX
> #define PATH_MAX 4096 /* # chars in a path name including nul */
> diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
> new file mode 100644
> index 0000000..857947b
> --- /dev/null
> +++ b/include/erofs/workqueue.h
> @@ -0,0 +1,38 @@
> +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
> +#ifndef __EROFS_WORKQUEUE_H
> +#define __EROFS_WORKQUEUE_H
> +
> +#include "internal.h"
> +
> +struct erofs_work;
> +
> +typedef void erofs_wq_func_t(struct erofs_work *);
> +typedef void erofs_wq_priv_fini_t(void *);
> +
> +struct erofs_work {
> + void (*func)(struct erofs_work *work);
> + struct erofs_work *next;
> + void *priv;
> +};
> +
> +struct erofs_workqueue {
> + struct erofs_work *head;
> + struct erofs_work *tail;
I'd suggest
struct erofs_work *head, *tail;
since they seem the same functionality.
> + pthread_mutex_t lock;
> + pthread_cond_t cond_empty;
> + pthread_cond_t cond_full;
> + pthread_t *workers;
> + unsigned int nworker;
> + unsigned int max_jobs;
> + unsigned int job_count;
> + bool shutdown;
> + size_t priv_size;
> + erofs_wq_priv_fini_t *priv_fini;
> +};
> +
> +int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
> + unsigned int max_jobs, size_t priv_size,
> + erofs_wq_priv_fini_t *priv_fini);
> +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work);
> +int erofs_workqueue_shutdown(struct erofs_workqueue *wq);
> +#endif
> \ No newline at end of file
> diff --git a/lib/Makefile.am b/lib/Makefile.am
> index 54b9c9c..7307f7b 100644
> --- a/lib/Makefile.am
> +++ b/lib/Makefile.am
> @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
> if ENABLE_LIBDEFLATE
> liberofs_la_SOURCES += compressor_libdeflate.c
> endif
> +if ENABLE_EROFS_MT
> +liberofs_la_CFLAGS += -lpthread
> +liberofs_la_SOURCES += workqueue.c
> +endif
> diff --git a/lib/workqueue.c b/lib/workqueue.c
> new file mode 100644
> index 0000000..3ec6142
> --- /dev/null
> +++ b/lib/workqueue.c
> @@ -0,0 +1,132 @@
> +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
> +#include <pthread.h>
> +#include <stdlib.h>
> +#include "erofs/workqueue.h"
> +
> +static void *worker_thread(void *arg)
> +{
> + struct erofs_workqueue *wq = arg;
> + struct erofs_work *work;
> + void *priv = NULL;
> +
> + if (wq->priv_size) {
> + priv = calloc(wq->priv_size, 1);
> + assert(priv);
> + }
> +
> + while (true) {
> + pthread_mutex_lock(&wq->lock);
> +
> + while (wq->job_count == 0 && !wq->shutdown)
> + pthread_cond_wait(&wq->cond_empty, &wq->lock);
> + if (wq->job_count == 0 && wq->shutdown) {
> + pthread_mutex_unlock(&wq->lock);
> + break;
> + }
> +
> + work = wq->head;
> + wq->head = work->next;
> + if (!wq->head)
> + wq->tail = NULL;
> + wq->job_count--;
> +
> + if (wq->job_count == wq->max_jobs - 1)
> + pthread_cond_broadcast(&wq->cond_full);
> +
> + pthread_mutex_unlock(&wq->lock);
> +
> + work->priv = priv;
> + work->func(work);
> + }
> +
> + if (priv) {
> + assert(wq->priv_fini);
> + (wq->priv_fini)(priv);
> + free(priv);
> + }
> +
> + return NULL;
> +}
> +
> +int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
> + unsigned int max_jobs, size_t priv_size,
> + erofs_wq_priv_fini_t *priv_fini)
Let's match kernel workqueue naming...
erofs_alloc_workqueue...
> +{
> + unsigned int i;
> +
> + if (!wq || nworker <= 0 || max_jobs <= 0)
> + return -EINVAL;
> +
> + wq->head = wq->tail = NULL;
> + wq->nworker = nworker;
> + wq->max_jobs = max_jobs;
> + wq->job_count = 0;
> + wq->shutdown = false;
> + wq->priv_size = priv_size;
> + wq->priv_fini = priv_fini;
> + pthread_mutex_init(&wq->lock, NULL);
> + pthread_cond_init(&wq->cond_empty, NULL);
> + pthread_cond_init(&wq->cond_full, NULL);
> +
> + wq->workers = malloc(nworker * sizeof(pthread_t));
> + if (!wq->workers)
> + return -ENOMEM;
> +
> + for (i = 0; i < nworker; i++) {
> + if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
> + while (i--)
> + pthread_cancel(wq->workers[i]);
> + free(wq->workers);
> + return -ENOMEM;
> + }
> + }
> +
> + return 0;
> +}
> +
> +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work)
erofs_queue_work
> +{
> + if (!wq || !work)
> + return -EINVAL;
> +
> + pthread_mutex_lock(&wq->lock);
> +
> + while (wq->job_count == wq->max_jobs)
> + pthread_cond_wait(&wq->cond_full, &wq->lock);
> +
> + work->next = NULL;
> + if (!wq->head)
> + wq->head = work;
> + else
> + wq->tail->next = work;
> + wq->tail = work;
> + wq->job_count++;
> +
> + pthread_cond_signal(&wq->cond_empty);
> + pthread_mutex_unlock(&wq->lock);
> +
> + return 0;
> +}
> +
> +int erofs_workqueue_shutdown(struct erofs_workqueue *wq)
erofs_destroy_workqueue
Thanks,
Gao Xiang
> +{
> + unsigned int i;
> +
> + if (!wq)
> + return -EINVAL;
> +
> + pthread_mutex_lock(&wq->lock);
> + wq->shutdown = true;
> + pthread_cond_broadcast(&wq->cond_empty);
> + pthread_mutex_unlock(&wq->lock);
> +
> + for (i = 0; i < wq->nworker; i++)
> + pthread_join(wq->workers[i], NULL);
> +
> + free(wq->workers);
> + pthread_mutex_destroy(&wq->lock);
> + pthread_cond_destroy(&wq->cond_empty);
> + pthread_cond_destroy(&wq->cond_full);
> +
> + return 0;
> +}
> \ No newline at end of file
More information about the Linux-erofs
mailing list