[PATCH v7 1/5] erofs-utils: introduce multi-threading framework
Gao Xiang
hsiangkao at linux.alibaba.com
Fri Mar 15 12:10:15 AEDT 2024
From: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
Add a workqueue implementation for multi-threading support inspired by
xfsprogs.
Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
Suggested-by: Gao Xiang <hsiangkao at linux.alibaba.com>
Signed-off-by: Gao Xiang <hsiangkao at linux.alibaba.com>
---
configure.ac | 16 +++++
include/erofs/internal.h | 3 +
include/erofs/workqueue.h | 34 +++++++++++
lib/Makefile.am | 4 ++
lib/workqueue.c | 123 ++++++++++++++++++++++++++++++++++++++
5 files changed, 180 insertions(+)
create mode 100644 include/erofs/workqueue.h
create mode 100644 lib/workqueue.c
diff --git a/configure.ac b/configure.ac
index 4b59230..3ccd6bb 100644
--- a/configure.ac
+++ b/configure.ac
@@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
+AC_MSG_CHECKING([whether to enable multi-threading support])
+AC_ARG_ENABLE([multithreading],
+ AS_HELP_STRING([--enable-multithreading],
+ [enable multi-threading support @<:@default=no@:>@]),
+ [enable_multithreading="$enableval"],
+ [enable_multithreading="no"])
+AC_MSG_RESULT([$enable_multithreading])
+
AC_ARG_ENABLE([debug],
[AS_HELP_STRING([--enable-debug],
[enable debugging mode @<:@default=no@:>@])],
@@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
[erofs_cv_max_block_size=4096]))
], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
+# Configure multi-threading support
+AS_IF([test "x$enable_multithreading" != "xno"], [
+ AC_CHECK_HEADERS([pthread.h])
+ AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
+ AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
+], [])
+
# Configure debug mode
AS_IF([test "x$enable_debug" != "xno"], [], [
dnl Turn off all assert checking.
@@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
# Set up needed symbols, conditionals and compiler/linker flags
+AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 5e968d6..4cd2059 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -22,6 +22,9 @@ typedef unsigned short umode_t;
#include <sys/types.h> /* for off_t definition */
#include <sys/stat.h> /* for S_ISCHR definition */
#include <stdio.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
#ifndef PATH_MAX
#define PATH_MAX 4096 /* # chars in a path name including nul */
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
new file mode 100644
index 0000000..36037c3
--- /dev/null
+++ b/include/erofs/workqueue.h
@@ -0,0 +1,34 @@
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
+#ifndef __EROFS_WORKQUEUE_H
+#define __EROFS_WORKQUEUE_H
+
+#include "internal.h"
+
+struct erofs_workqueue;
+
+typedef void *(*erofs_wq_func_t)(struct erofs_workqueue *, void *);
+
+struct erofs_work {
+ struct erofs_work *next;
+ void (*fn)(struct erofs_work *work, void *tlsp);
+};
+
+struct erofs_workqueue {
+ struct erofs_work *head, *tail;
+ pthread_mutex_t lock;
+ pthread_cond_t cond_empty;
+ pthread_cond_t cond_full;
+ pthread_t *workers;
+ unsigned int nworker;
+ unsigned int max_jobs;
+ unsigned int job_count;
+ bool shutdown;
+ erofs_wq_func_t on_start, on_exit;
+};
+
+int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
+ unsigned int max_jobs, erofs_wq_func_t on_start,
+ erofs_wq_func_t on_exit);
+int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work);
+int erofs_destroy_workqueue(struct erofs_workqueue *wq);
+#endif
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 54b9c9c..b3bea74 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
if ENABLE_LIBDEFLATE
liberofs_la_SOURCES += compressor_libdeflate.c
endif
+if ENABLE_EROFS_MT
+liberofs_la_LDFLAGS = -lpthread
+liberofs_la_SOURCES += workqueue.c
+endif
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..47cec9b
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,123 @@
+// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
+#include <pthread.h>
+#include <stdlib.h>
+#include "erofs/workqueue.h"
+
+static void *worker_thread(void *arg)
+{
+ struct erofs_workqueue *wq = arg;
+ struct erofs_work *work;
+ void *tlsp = NULL;
+
+ if (wq->on_start)
+ tlsp = (wq->on_start)(wq, NULL);
+
+ while (true) {
+ pthread_mutex_lock(&wq->lock);
+
+ while (wq->job_count == 0 && !wq->shutdown)
+ pthread_cond_wait(&wq->cond_empty, &wq->lock);
+ if (wq->job_count == 0 && wq->shutdown) {
+ pthread_mutex_unlock(&wq->lock);
+ break;
+ }
+
+ work = wq->head;
+ wq->head = work->next;
+ if (!wq->head)
+ wq->tail = NULL;
+ wq->job_count--;
+
+ if (wq->job_count == wq->max_jobs - 1)
+ pthread_cond_broadcast(&wq->cond_full);
+
+ pthread_mutex_unlock(&wq->lock);
+ work->fn(work, tlsp);
+ }
+
+ if (wq->on_exit)
+ (void)(wq->on_exit)(wq, tlsp);
+ return NULL;
+}
+
+int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
+ unsigned int max_jobs, erofs_wq_func_t on_start,
+ erofs_wq_func_t on_exit)
+{
+ unsigned int i;
+ int ret;
+
+ if (!wq || nworker <= 0 || max_jobs <= 0)
+ return -EINVAL;
+
+ wq->head = wq->tail = NULL;
+ wq->nworker = nworker;
+ wq->max_jobs = max_jobs;
+ wq->job_count = 0;
+ wq->shutdown = false;
+ wq->on_start = on_start;
+ wq->on_exit = on_exit;
+ pthread_mutex_init(&wq->lock, NULL);
+ pthread_cond_init(&wq->cond_empty, NULL);
+ pthread_cond_init(&wq->cond_full, NULL);
+
+ wq->workers = malloc(nworker * sizeof(pthread_t));
+ if (!wq->workers)
+ return -ENOMEM;
+
+ for (i = 0; i < nworker; i++) {
+ ret = pthread_create(&wq->workers[i], NULL, worker_thread, wq);
+ if (ret) {
+ while (i)
+ pthread_cancel(wq->workers[--i]);
+ free(wq->workers);
+ return ret;
+ }
+ }
+ return 0;
+}
+
+int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work)
+{
+ if (!wq || !work)
+ return -EINVAL;
+
+ pthread_mutex_lock(&wq->lock);
+
+ while (wq->job_count == wq->max_jobs)
+ pthread_cond_wait(&wq->cond_full, &wq->lock);
+
+ work->next = NULL;
+ if (!wq->head)
+ wq->head = work;
+ else
+ wq->tail->next = work;
+ wq->tail = work;
+ wq->job_count++;
+
+ pthread_cond_signal(&wq->cond_empty);
+ pthread_mutex_unlock(&wq->lock);
+ return 0;
+}
+
+int erofs_destroy_workqueue(struct erofs_workqueue *wq)
+{
+ unsigned int i;
+
+ if (!wq)
+ return -EINVAL;
+
+ pthread_mutex_lock(&wq->lock);
+ wq->shutdown = true;
+ pthread_cond_broadcast(&wq->cond_empty);
+ pthread_mutex_unlock(&wq->lock);
+
+ for (i = 0; i < wq->nworker; i++)
+ pthread_join(wq->workers[i], NULL);
+
+ free(wq->workers);
+ pthread_mutex_destroy(&wq->lock);
+ pthread_cond_destroy(&wq->cond_empty);
+ pthread_cond_destroy(&wq->cond_full);
+ return 0;
+}
--
2.39.3
More information about the Linux-erofs
mailing list