[PATCH v4 1/5] erofs-utils: introduce multi-threading framework
Yifan Zhao
zhaoyifan at sjtu.edu.cn
Thu Feb 29 23:09:00 AEDT 2024
On 2/29/24 17:43, Gao Xiang wrote:
> Hi Yifan,
>
> On 2024/2/29 00:16, Yifan Zhao wrote:
>> Add a workqueue implementation for multi-threading support inspired by
>> xfsprogs.
>>
>> Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
>> Suggested-by: Gao Xiang <hsiangkao at linux.alibaba.com>
>> ---
>> configure.ac | 16 +++++
>> include/erofs/internal.h | 3 +
>> include/erofs/workqueue.h | 37 +++++++++++
>> lib/Makefile.am | 4 ++
>> lib/workqueue.c | 132 ++++++++++++++++++++++++++++++++++++++
>> 5 files changed, 192 insertions(+)
>> create mode 100644 include/erofs/workqueue.h
>> create mode 100644 lib/workqueue.c
>>
>> diff --git a/configure.ac b/configure.ac
>> index 4b59230..3ccd6bb 100644
>> --- a/configure.ac
>> +++ b/configure.ac
>> @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
>> AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which
>> erofs-utils supports])
>> +AC_MSG_CHECKING([whether to enable multi-threading support])
>> +AC_ARG_ENABLE([multithreading],
>> + AS_HELP_STRING([--enable-multithreading],
>> + [enable multi-threading support
>> @<:@default=no@:>@]),
>> + [enable_multithreading="$enableval"],
>> + [enable_multithreading="no"])
>> +AC_MSG_RESULT([$enable_multithreading])
>> +
>> AC_ARG_ENABLE([debug],
>> [AS_HELP_STRING([--enable-debug],
>> [enable debugging mode @<:@default=no@:>@])],
>> @@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
>> [erofs_cv_max_block_size=4096]))
>> ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
>> +# Configure multi-threading support
>> +AS_IF([test "x$enable_multithreading" != "xno"], [
>> + AC_CHECK_HEADERS([pthread.h])
>> + AC_CHECK_LIB([pthread], [pthread_mutex_lock], [],
>> AC_MSG_ERROR([libpthread is required for multi-threaded build]))
>> + AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
>> +], [])
>> +
>> # Configure debug mode
>> AS_IF([test "x$enable_debug" != "xno"], [], [
>> dnl Turn off all assert checking.
>> @@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
>> AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
>> # Set up needed symbols, conditionals and compiler/linker flags
>> +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}"
>> != "xno"])
>> AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
>> AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
>> AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
>> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
>> index 82797e1..954aef4 100644
>> --- a/include/erofs/internal.h
>> +++ b/include/erofs/internal.h
>> @@ -22,6 +22,9 @@ typedef unsigned short umode_t;
>> #include <sys/types.h> /* for off_t definition */
>> #include <sys/stat.h> /* for S_ISCHR definition */
>> #include <stdio.h>
>> +#ifdef HAVE_PTHREAD_H
>> +#include <pthread.h>
>> +#endif
>> #ifndef PATH_MAX
>> #define PATH_MAX 4096 /* # chars in a path name including
>> nul */
>> diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
>> new file mode 100644
>> index 0000000..b4b3901
>> --- /dev/null
>> +++ b/include/erofs/workqueue.h
>> @@ -0,0 +1,37 @@
>> +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
>> +#ifndef __EROFS_WORKQUEUE_H
>> +#define __EROFS_WORKQUEUE_H
>> +
>> +#include "internal.h"
>> +
>> +struct erofs_work;
>> +
>> +typedef void erofs_wq_func_t(struct erofs_work *);
>> +typedef void erofs_wq_priv_fini_t(void *);
>> +
>> +struct erofs_work {
>> + void (*func)(struct erofs_work *work);
>> + struct erofs_work *next;
>> + void *priv;
>> +};
>> +
>> +struct erofs_workqueue {
>> + struct erofs_work *head, *tail;
>> + pthread_mutex_t lock;
>> + pthread_cond_t cond_empty;
>> + pthread_cond_t cond_full;
>> + pthread_t *workers;
>> + unsigned int nworker;
>> + unsigned int max_jobs;
>> + unsigned int job_count;
>> + bool shutdown;
>> + size_t priv_size;
>
> I don't like this way honestly, how about
> ..
> erofs_wq_func_t on_start, on_exit;
> void *private;
> ..
>
> much like:
> https://www.gnu.org/software/libc/manual/html_node/Cleanups-on-Exit.html
>
I believe `private` is a per-worker field and could not appear here?
And per-worker private data is initialized on demand now (we don't know
if a certain compressor is needed in the worker thread), so I don't
think it could be replaced with `on_start` which tries to initialize it
during worker thread creation.
Thanks,
Yifan Zhao
>> + erofs_wq_priv_fini_t *priv_fini;
>> +};
>> +
>> +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int
>> nworker,
>> + unsigned int max_jobs, size_t priv_size,
>> + erofs_wq_priv_fini_t *priv_fini);
>> +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work
>> *work);
>> +int erofs_destroy_workqueue(struct erofs_workqueue *wq);
>> +#endif
>> \ No newline at end of file
>> diff --git a/lib/Makefile.am b/lib/Makefile.am
>> index 54b9c9c..7307f7b 100644
>> --- a/lib/Makefile.am
>> +++ b/lib/Makefile.am
>> @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c
>> compressor_deflate.c
>> if ENABLE_LIBDEFLATE
>> liberofs_la_SOURCES += compressor_libdeflate.c
>> endif
>> +if ENABLE_EROFS_MT
>> +liberofs_la_CFLAGS += -lpthread
>> +liberofs_la_SOURCES += workqueue.c
>> +endif
>> diff --git a/lib/workqueue.c b/lib/workqueue.c
>> new file mode 100644
>> index 0000000..138afd5
>> --- /dev/null
>> +++ b/lib/workqueue.c
>> @@ -0,0 +1,132 @@
>> +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
>> +#include <pthread.h>
>> +#include <stdlib.h>
>> +#include "erofs/workqueue.h"
>> +
>> +static void *worker_thread(void *arg)
>> +{
>> + struct erofs_workqueue *wq = arg;
>> + struct erofs_work *work;
>> + void *priv = NULL;
>> +
>> + if (wq->priv_size) {
>> + priv = calloc(wq->priv_size, 1);
>> + assert(priv);
>> + }
>
> if (wq->on_start)
> wq->on_start(wq);
>
>> +
>> + while (true) {
>> + pthread_mutex_lock(&wq->lock);
>> +
>> + while (wq->job_count == 0 && !wq->shutdown)
>> + pthread_cond_wait(&wq->cond_empty, &wq->lock);
>> + if (wq->job_count == 0 && wq->shutdown) {
>> + pthread_mutex_unlock(&wq->lock);
>> + break;
>> + }
>> +
>> + work = wq->head;
>> + wq->head = work->next;
>> + if (!wq->head)
>> + wq->tail = NULL;
>> + wq->job_count--;
>> +
>> + if (wq->job_count == wq->max_jobs - 1)
>> + pthread_cond_broadcast(&wq->cond_full);
>> +
>> + pthread_mutex_unlock(&wq->lock);
>> +
>> + work->priv = priv;
>> + work->func(work);
>> + }
>> +
>> + if (priv) {
>> + assert(wq->priv_fini);
>> + (wq->priv_fini)(priv);
>> + free(priv);
>> + }
>
> if (wq->on_exit)
> wq->on_exit(wq);
>
>> +
>> + return NULL;
>> +}
>> +
>> +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int
>> nworker,
>> + unsigned int max_jobs, size_t priv_size,
>> + erofs_wq_priv_fini_t *priv_fini)
>> +{
>> + unsigned int i;
>> +
>> + if (!wq || nworker <= 0 || max_jobs <= 0)
>> + return -EINVAL;
>> +
>> + wq->head = wq->tail = NULL;
>> + wq->nworker = nworker;
>> + wq->max_jobs = max_jobs;
>> + wq->job_count = 0;
>> + wq->shutdown = false;
>> + wq->priv_size = priv_size;
>> + wq->priv_fini = priv_fini;
>> + pthread_mutex_init(&wq->lock, NULL);
>> + pthread_cond_init(&wq->cond_empty, NULL);
>> + pthread_cond_init(&wq->cond_full, NULL);
>> +
>> + wq->workers = malloc(nworker * sizeof(pthread_t));
>> + if (!wq->workers)
>> + return -ENOMEM;
>> +
>> + for (i = 0; i < nworker; i++) {
>> + if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
>> + while (i--)
>> + pthread_cancel(wq->workers[i]);
>
> How about
> while (i)
> pthread_cancel(wq->workers[--i]);
>
> I preferred this since i won't be < 0.
>
> Thanks,
> Gao Xiang
More information about the Linux-erofs
mailing list