[PATCH 5/7] erofs-utils: extend multi-threading framework for per-thread fields
Yifan Zhao
zhaoyifan at sjtu.edu.cn
Sun Feb 4 21:34:18 AEDT 2024
Currently each worker thread is stateless. This patch allows us to bind
some per-worker fields to each worker thread, which could be reused
accross different tasks.
One of the examples is the compressor and buffer used in multi-threaded
compression.
Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
---
include/erofs/workqueue.h | 10 +++++++---
lib/workqueue.c | 19 ++++++++++++++++++-
2 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
index d0ebc5a..a11a8fc 100644
--- a/include/erofs/workqueue.h
+++ b/include/erofs/workqueue.h
@@ -13,11 +13,13 @@ struct erofs_work;
typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq,
struct erofs_work *work);
+typedef void erofs_wq_priv_fini_t(void *);
struct erofs_work {
struct erofs_workqueue *queue;
struct erofs_work *next;
erofs_workqueue_func_t *function;
+ void *private;
};
struct erofs_workqueue {
@@ -32,12 +34,14 @@ struct erofs_workqueue {
bool terminated;
int max_queued;
pthread_cond_t queue_full;
+ size_t private_size;
+ erofs_wq_priv_fini_t *private_fini;
};
-int erofs_workqueue_create(struct erofs_workqueue *wq,
+int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
+ erofs_wq_priv_fini_t *private_fini,
unsigned int nr_workers, unsigned int max_queue);
-int erofs_workqueue_add(struct erofs_workqueue *wq,
- struct erofs_work *wi);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi);
int erofs_workqueue_terminate(struct erofs_workqueue *wq);
void erofs_workqueue_destroy(struct erofs_workqueue *wq);
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 6573821..01d12d9 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -18,6 +18,12 @@ static void *workqueue_thread(void *arg)
{
struct erofs_workqueue *wq = arg;
struct erofs_work *wi;
+ void *private = NULL;
+
+ if (wq->private_size) {
+ private = calloc(1, wq->private_size);
+ assert(private);
+ }
/*
* Loop pulling work from the passed in work queue.
@@ -56,13 +62,22 @@ static void *workqueue_thread(void *arg)
}
pthread_mutex_unlock(&wq->lock);
+ wi->private = private;
(wi->function)(wq, wi);
}
+
+ if (private) {
+ assert(wq->private_fini);
+ (wq->private_fini)(private);
+ free(private);
+ }
+
return NULL;
}
/* Allocate a work queue and threads. Returns zero or negative error code. */
-int erofs_workqueue_create(struct erofs_workqueue *wq,
+int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
+ erofs_wq_priv_fini_t *priv_fini,
unsigned int nr_workers, unsigned int max_queue)
{
unsigned int i;
@@ -79,6 +94,8 @@ int erofs_workqueue_create(struct erofs_workqueue *wq,
if (err)
goto out_cond;
+ wq->private_size = private_size;
+ wq->private_fini = priv_fini;
wq->thread_count = nr_workers;
wq->max_queued = max_queue;
wq->threads = malloc(nr_workers * sizeof(pthread_t));
--
2.43.0
More information about the Linux-erofs
mailing list