[PATCH 5/7] erofs-utils: extend multi-threading framework for per-thread fields

Yifan Zhao zhaoyifan at sjtu.edu.cn
Sun Feb 4 21:34:18 AEDT 2024


Currently each worker thread is stateless. This patch allows us to bind
some per-worker fields to each worker thread, which could be reused
accross different tasks.

One of the examples is the compressor and buffer used in multi-threaded
compression.

Signed-off-by: Yifan Zhao <zhaoyifan at sjtu.edu.cn>
---
 include/erofs/workqueue.h | 10 +++++++---
 lib/workqueue.c           | 19 ++++++++++++++++++-
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
index d0ebc5a..a11a8fc 100644
--- a/include/erofs/workqueue.h
+++ b/include/erofs/workqueue.h
@@ -13,11 +13,13 @@ struct erofs_work;
 
 typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq,
 				    struct erofs_work *work);
+typedef void erofs_wq_priv_fini_t(void *);
 
 struct erofs_work {
 	struct erofs_workqueue	*queue;
 	struct erofs_work	*next;
 	erofs_workqueue_func_t	*function;
+	void 			*private;
 };
 
 struct erofs_workqueue {
@@ -32,12 +34,14 @@ struct erofs_workqueue {
 	bool			terminated;
 	int			max_queued;
 	pthread_cond_t		queue_full;
+	size_t			private_size;
+	erofs_wq_priv_fini_t	*private_fini;
 };
 
-int erofs_workqueue_create(struct erofs_workqueue *wq,
+int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
+			   erofs_wq_priv_fini_t *private_fini,
 			   unsigned int nr_workers, unsigned int max_queue);
-int erofs_workqueue_add(struct erofs_workqueue	*wq,
-			struct erofs_work *wi);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi);
 int erofs_workqueue_terminate(struct erofs_workqueue *wq);
 void erofs_workqueue_destroy(struct erofs_workqueue *wq);
 
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 6573821..01d12d9 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -18,6 +18,12 @@ static void *workqueue_thread(void *arg)
 {
 	struct erofs_workqueue		*wq = arg;
 	struct erofs_work		*wi;
+	void				*private = NULL;
+
+	if (wq->private_size) {
+		private = calloc(1, wq->private_size);
+		assert(private);
+	}
 
 	/*
 	 * Loop pulling work from the passed in work queue.
@@ -56,13 +62,22 @@ static void *workqueue_thread(void *arg)
 		}
 		pthread_mutex_unlock(&wq->lock);
 
+		wi->private = private;
 		(wi->function)(wq, wi);
 	}
+
+	if (private) {
+		assert(wq->private_fini);
+		(wq->private_fini)(private);
+		free(private);
+	}
+
 	return NULL;
 }
 
 /* Allocate a work queue and threads.  Returns zero or negative error code. */
-int erofs_workqueue_create(struct erofs_workqueue *wq,
+int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
+			   erofs_wq_priv_fini_t *priv_fini,
 			   unsigned int nr_workers, unsigned int max_queue)
 {
 	unsigned int		i;
@@ -79,6 +94,8 @@ int erofs_workqueue_create(struct erofs_workqueue *wq,
 	if (err)
 		goto out_cond;
 
+	wq->private_size = private_size;
+	wq->private_fini = priv_fini;
 	wq->thread_count = nr_workers;
 	wq->max_queued = max_queue;
 	wq->threads = malloc(nr_workers * sizeof(pthread_t));
-- 
2.43.0



More information about the Linux-erofs mailing list