[PATCH 3/3] erofs-utils: mkfs: use libcurl multiplexing api to optimize S3 interaction

Yifan Zhao zhaoyifan28 at huawei.com
Fri Sep 19 01:12:45 AEST 2025


From: zhaoyifan <zhaoyifan28 at huawei.com>

The current S3 remote implementation uses the `curl_easy*` API family to
interact with the backend, which operates serially. Let’s migrate to the
`curl_multi*` APIs to leverage libcurl’s multiplexing capabilities.

Now following `ListObjects`, `GetObject` are concurrently executed with a
sliding window limiting maximum concurrency.

Signed-off-by: Yifan Zhao <zhaoyifan28 at huawei.com>
---
 lib/remotes/s3.c | 400 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 281 insertions(+), 119 deletions(-)

diff --git a/lib/remotes/s3.c b/lib/remotes/s3.c
index 0296ef4..3c900e0 100644
--- a/lib/remotes/s3.c
+++ b/lib/remotes/s3.c
@@ -25,6 +25,8 @@
 #define S3EROFS_URL_LEN			8192
 #define S3EROFS_CANONICAL_QUERY_LEN	2048
 
+#define S3EROFS_MAX_GETOBJECT_CONCUR	16
+
 #define BASE64_ENCODE_LEN(len)	(((len + 2) / 3) * 4)
 
 struct s3erofs_query_params {
@@ -101,8 +103,6 @@ static int s3erofs_prepare_url(struct s3erofs_curl_request *req,
 	return 0;
 }
 
-static char *get_canonical_headers(const struct curl_slist *list) { return ""; }
-
 // See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTAuthentication.html#ConstructingTheAuthenticationHeader
 static char *s3erofs_sigv2_header(const struct curl_slist *headers,
 		const char *method, const char *content_md5,
@@ -112,7 +112,6 @@ static char *s3erofs_sigv2_header(const struct curl_slist *headers,
 	u8 hmac_signature[EVP_MAX_MD_SIZE];
 	char *str, *output = NULL;
 	unsigned int len, pos, output_len;
-	const char *canonical_headers = get_canonical_headers(headers);
 	const char *prefix = "Authorization: AWS ";
 
 	if (!method || !date || !ak || !sk)
@@ -126,7 +125,7 @@ static char *s3erofs_sigv2_header(const struct curl_slist *headers,
 		canonical_query = "/";
 
 	pos = asprintf(&str, "%s\n%s\n%s\n%s\n%s%s", method, content_md5,
-		       content_type, date, canonical_headers, canonical_query);
+		       content_type, date, "", canonical_query);
 	if (pos < 0)
 		return ERR_PTR(-ENOMEM);
 
@@ -164,6 +163,11 @@ struct s3erofs_curl_response {
 	size_t size;
 };
 
+struct s3erofs_curl_getobject_resp {
+	struct erofs_vfile vf;
+	erofs_off_t pos, end;
+};
+
 static size_t s3erofs_request_write_memory_cb(void *contents, size_t size,
 					      size_t nmemb, void *userp)
 {
@@ -256,11 +260,48 @@ err_header:
 	return ret;
 }
 
+static CURL *s3erofs_curl_easy_init()
+{
+	CURL *curl;
+
+	curl = curl_easy_init();
+	if (!curl)
+		return ERR_PTR(-ENOMEM);
+
+	if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L) != CURLE_OK)
+		goto out_cleanup;
+
+	if (curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 30L) != CURLE_OK)
+		goto out_cleanup;
+
+	if (curl_easy_setopt(curl, CURLOPT_USERAGENT,
+			     "s3erofs/" PACKAGE_VERSION) != CURLE_OK)
+		goto out_cleanup;
+
+	return curl;
+
+out_cleanup:
+	curl_easy_cleanup(curl);
+	return ERR_PTR(-EFAULT);
+}
+
 struct s3erofs_object_info {
 	char *key;
 	u64 size;
 	time_t mtime;
 	u32 mtime_ns;
+
+	bool downloaded;
+	struct erofs_diskbuf *diskbuf;
+};
+
+struct s3erofs_getobject_task {
+	CURL *easy_handle;
+	struct curl_slist *request_headers;
+	struct s3erofs_curl_request req;
+	struct s3erofs_curl_getobject_resp resp;
+	struct s3erofs_object_info *obj;
+	int index;
 };
 
 struct s3erofs_object_iterator {
@@ -273,8 +314,106 @@ struct s3erofs_object_iterator {
 
 	char *next_marker;
 	bool is_truncated;
+
+	/* fields for getobject */
+	CURLM *multi_handle;
+	struct s3erofs_getobject_task *getobj_tasks;
+	int getobj_idx;
 };
 
+static size_t s3erofs_remote_getobject_cb(void *contents, size_t size, size_t nmemb,
+					  void *userp)
+{
+	struct s3erofs_curl_getobject_resp *resp = userp;
+	size_t realsize = size * nmemb;
+
+	if (resp->pos + realsize > resp->end ||
+	    erofs_io_pwrite(&resp->vf, contents, resp->pos, realsize) != realsize)
+		return 0;
+
+	resp->pos += realsize;
+	return realsize;
+}
+
+static int s3erofs_add_getobject_task(struct s3erofs_object_iterator *it,
+				     struct s3erofs_object_info *obj,
+				     int task_index)
+{
+	struct s3erofs_query_params params = {0};
+	struct s3erofs_getobject_task *task;
+	int ret;
+
+	task = &it->getobj_tasks[task_index];
+	if (!task->easy_handle) {
+		task->easy_handle = s3erofs_curl_easy_init();
+		if (!task->easy_handle)
+			return PTR_ERR(task->easy_handle);
+
+		if (curl_easy_setopt(task->easy_handle, CURLOPT_PRIVATE, task) !=
+		    CURLE_OK) {
+			curl_easy_cleanup(task->easy_handle);
+			return -EFAULT;
+		}
+	}
+
+	task->index = task_index;
+	task->obj = obj;
+
+	obj->diskbuf = calloc(1, sizeof(struct erofs_diskbuf));
+	if (!obj->diskbuf)
+		return -ENOMEM;
+	task->resp.vf.fd = erofs_diskbuf_reserve(obj->diskbuf, 0, &task->resp.pos);
+	if (task->resp.vf.fd < 0) {
+		ret = -EBADF;
+		goto err;
+	}
+	erofs_diskbuf_commit(obj->diskbuf, obj->size);
+	task->resp.end = task->resp.pos + obj->size;
+
+	task->req.method = "GET";
+	ret = s3erofs_prepare_url(&task->req, it->s3->endpoint, it->bucket, obj->key,
+				  &params, it->s3->url_style);
+	if (ret < 0)
+		goto err;
+
+	curl_easy_setopt(task->easy_handle, CURLOPT_URL, task->req.url);
+	curl_easy_setopt(task->easy_handle, CURLOPT_WRITEFUNCTION,
+			 s3erofs_remote_getobject_cb);
+	curl_easy_setopt(task->easy_handle, CURLOPT_WRITEDATA, &task->resp);
+
+	/* Add authentication headers */
+	if (it->s3->access_key[0]) {
+		if (task->request_headers) {
+			curl_slist_free_all(task->request_headers);
+			task->request_headers = NULL;
+		}
+
+		ret = s3erofs_request_insert_auth(&task->request_headers, task->req.method,
+						  task->req.canonical_query,
+						  it->s3->access_key, it->s3->secret_key);
+		if (ret < 0) {
+			erofs_err("failed to insert auth headers");
+			goto err;
+		}
+		curl_easy_setopt(task->easy_handle, CURLOPT_HTTPHEADER, task->request_headers);
+	}
+
+	/* Add to multi handle */
+	ret = curl_multi_add_handle(it->multi_handle, task->easy_handle);
+	if (ret != CURLM_OK) {
+		erofs_err("failed to add getobject task to multi handle: %s",
+			  curl_multi_strerror(ret));
+		ret = -EIO;
+		goto err;
+	}
+
+	return 0;
+
+err:
+	free(obj->diskbuf);
+	return ret;
+}
+
 static int s3erofs_parse_list_objects_one(xmlNodePtr node,
 					  struct s3erofs_object_info *info)
 {
@@ -312,6 +451,8 @@ static int s3erofs_parse_list_objects_one(xmlNodePtr node,
 			xmlFree(str);
 		}
 	}
+
+	info->downloaded = false;
 	return 0;
 }
 
@@ -319,7 +460,7 @@ static int s3erofs_parse_list_objects_result(const char *data, int len,
 					     struct s3erofs_object_iterator *it)
 {
 	xmlNodePtr root = NULL, node, next;
-	int ret, i, contents_count;
+	int ret, i, j, contents_count;
 	xmlDocPtr doc = NULL;
 	xmlChar *str;
 	void *tmp;
@@ -398,6 +539,7 @@ static int s3erofs_parse_list_objects_result(const char *data, int len,
 		it->objects[0].key = NULL;
 	}
 	it->cur = 0;
+	it->getobj_idx = 0;
 
 	ret = 0;
 	for (i = 0, node = root->children; node; node = node->next) {
@@ -428,6 +570,26 @@ static int s3erofs_parse_list_objects_result(const char *data, int len,
 			ret = -ENOMEM;
 	}
 
+	if (!ret && i && it->multi_handle) {
+		j = 0;
+		while (it->getobj_idx < i && j < S3EROFS_MAX_GETOBJECT_CONCUR) {
+			if (it->objects[it->getobj_idx].size == 0) {
+				it->getobj_idx++;
+				continue;
+			}
+
+			ret = s3erofs_add_getobject_task(it, it->objects + it->getobj_idx,
+							 j++);
+			if (ret < 0) {
+				erofs_err("failed to add download task for object %s: %s",
+					  it->objects[it->getobj_idx].key,
+					  erofs_strerror(ret));
+				goto out;
+			}
+			it->getobj_idx++;
+		}
+	}
+
 	if (!ret)
 		ret = i;
 out:
@@ -490,10 +652,11 @@ static int s3erofs_list_objects(struct s3erofs_object_iterator *it)
 
 static struct s3erofs_object_iterator *
 s3erofs_create_object_iterator(struct erofs_s3 *s3, const char *path,
-			       const char *delimiter)
+			       const char *delimiter, bool fillzero)
 {
 	struct s3erofs_object_iterator *iter;
 	char *prefix;
+	int ret = 0;
 
 	iter = calloc(1, sizeof(struct s3erofs_object_iterator));
 	if (!iter)
@@ -501,8 +664,10 @@ s3erofs_create_object_iterator(struct erofs_s3 *s3, const char *path,
 	iter->s3 = s3;
 	prefix = strchr(path, '/');
 	if (prefix) {
-		if (++prefix - path > S3EROFS_PATH_MAX)
-			return ERR_PTR(-EINVAL);
+		if (++prefix - path > S3EROFS_PATH_MAX) {
+			ret = -EINVAL;
+			goto err_iter;
+		}
 		iter->bucket = strndup(path, prefix - path);
 		iter->prefix = strdup(prefix);
 	} else {
@@ -511,18 +676,60 @@ s3erofs_create_object_iterator(struct erofs_s3 *s3, const char *path,
 	}
 	iter->delimiter = delimiter;
 	iter->is_truncated = true;
+	if (!fillzero) {
+		iter->getobj_idx = 0;
+		iter->getobj_tasks = calloc(S3EROFS_MAX_GETOBJECT_CONCUR,
+					    sizeof(struct s3erofs_getobject_task));
+		if (!iter->getobj_tasks) {
+			ret = -ENOMEM;
+			goto err_prefix;
+		}
+		iter->multi_handle = curl_multi_init();
+		if (!iter->multi_handle) {
+			ret = -EIO;
+			free(iter->getobj_tasks);
+			goto err_prefix;
+		}
+	}
 	return iter;
+
+err_prefix:
+	if (iter->bucket)
+		free(iter->bucket);
+	if (iter->prefix)
+		free(iter->prefix);
+err_iter:
+	free(iter);
+	return ERR_PTR(ret);
 }
 
 static void s3erofs_destroy_object_iterator(struct s3erofs_object_iterator *it)
 {
+	struct s3erofs_getobject_task *task;
 	int i;
 
+	if (it->getobj_tasks) {
+		for (i = 0; i < S3EROFS_MAX_GETOBJECT_CONCUR; i++) {
+			task = &it->getobj_tasks[i];
+			if (task->easy_handle)
+				curl_easy_cleanup(task->easy_handle);
+			if (task->request_headers)
+				curl_slist_free_all(task->request_headers);
+		}
+		free(it->getobj_tasks);
+	}
+	if (it->multi_handle)
+		curl_multi_cleanup(it->multi_handle);
 	if (it->next_marker)
 		free(it->next_marker);
 	if (it->objects) {
-		for (i = 0; it->objects[i].key; ++i)
+		for (i = 0; it->objects[i].key; ++i) {
 			free(it->objects[i].key);
+			if (it->objects[i].diskbuf) {
+				erofs_diskbuf_close(it->objects[i].diskbuf);
+				free(it->objects[i].diskbuf);
+			}
+		}
 		free(it->objects);
 	}
 	free(it->prefix);
@@ -547,125 +754,79 @@ s3erofs_get_next_object(struct s3erofs_object_iterator *it)
 	return NULL;
 }
 
-static int s3erofs_curl_easy_init(struct erofs_s3 *s3)
-{
-	CURL *curl;
-
-	curl = curl_easy_init();
-	if (!curl)
-		return -ENOMEM;
-
-	if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L) != CURLE_OK)
-		goto out_cleanup;
-
-	if (curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 30L) != CURLE_OK)
-		goto out_cleanup;
-
-	if (curl_easy_setopt(curl, CURLOPT_USERAGENT,
-			     "s3erofs/" PACKAGE_VERSION) != CURLE_OK)
-		goto out_cleanup;
-
-	s3->easy_curl = curl;
-	return 0;
-out_cleanup:
-	curl_easy_cleanup(curl);
-	return -EFAULT;
-}
-
-static void s3erofs_curl_easy_exit(struct erofs_s3 *s3)
+static void s3erofs_handover_object_data(struct erofs_inode *inode,
+					 struct s3erofs_object_info *obj)
 {
-	if (!s3->easy_curl)
-		return;
-	curl_easy_cleanup(s3->easy_curl);
-	s3->easy_curl = NULL;
+	inode->datasource = EROFS_INODE_DATA_SOURCE_DISKBUF;
+	inode->i_diskbuf = obj->diskbuf;
+	obj->diskbuf = NULL;
 }
 
-struct s3erofs_curl_getobject_resp {
-	struct erofs_vfile *vf;
-	erofs_off_t pos, end;
-};
-
-static size_t s3erofs_remote_getobject_cb(void *contents, size_t size,
-					  size_t nmemb, void *userp)
+static int s3erofs_remote_getobject(struct s3erofs_object_iterator *it,
+				    struct s3erofs_object_info *obj,
+				    struct erofs_inode *inode)
 {
-	struct s3erofs_curl_getobject_resp *resp = userp;
-	size_t realsize = size * nmemb;
+	int running_handles, msgs_in_queue;
+	CURLMsg *msg;
+	CURL *easy_handle;
+	struct s3erofs_getobject_task *task;
+	bool found = false;
+	int ret = 0;
 
-	if (resp->pos + realsize > resp->end ||
-	    erofs_io_pwrite(resp->vf, contents, resp->pos, realsize) != realsize)
+	if (obj->downloaded) {
+		s3erofs_handover_object_data(inode, obj);
 		return 0;
+	}
 
-	resp->pos += realsize;
-	return realsize;
-}
+	curl_multi_perform(it->multi_handle, &running_handles);
+	while (!found) {
+		ret = curl_multi_wait(it->multi_handle, NULL, 0, 1000, NULL);
+		if (ret != CURLM_OK) {
+			erofs_err("curl_multi_wait() failed: %s",
+				  curl_multi_strerror(ret));
+			return -EIO;
+		}
 
-static int s3erofs_remote_getobject(struct erofs_importer *im,
-				    struct erofs_s3 *s3,
-				    struct erofs_inode *inode,
-				    const char *bucket, const char *key)
-{
-	struct erofs_sb_info *sbi = inode->sbi;
-	struct s3erofs_curl_request req = {};
-	struct s3erofs_curl_getobject_resp resp;
-	struct s3erofs_query_params params;
-	struct erofs_vfile vf;
-	int ret;
+		curl_multi_perform(it->multi_handle, &running_handles);
 
-	params.num = 0;
-	req.method = "GET";
-	ret = s3erofs_prepare_url(&req, s3->endpoint, bucket, key,
-				  &params, s3->url_style);
-	if (ret < 0)
-		return ret;
+		while ((msg = curl_multi_info_read(it->multi_handle, &msgs_in_queue))) {
+			if (msg->msg != CURLMSG_DONE)
+				continue;
 
-	if (curl_easy_setopt(s3->easy_curl, CURLOPT_WRITEFUNCTION,
-			     s3erofs_remote_getobject_cb) != CURLE_OK)
-		return -EIO;
+			easy_handle = msg->easy_handle;
 
-	resp.pos = 0;
-	if (!cfg.c_compr_opts[0].alg && im->params->no_datainline) {
-		inode->datalayout = EROFS_INODE_FLAT_PLAIN;
-		inode->idata_size = 0;
-		ret = erofs_allocate_inode_bh_data(inode,
-				DIV_ROUND_UP(inode->i_size, 1U << sbi->blkszbits));
-		if (ret)
-			return ret;
-		resp.vf = &sbi->bdev;
-		resp.pos = erofs_pos(inode->sbi, inode->u.i_blkaddr);
-		inode->datasource = EROFS_INODE_DATA_SOURCE_NONE;
-	} else {
-		u64 off;
+			task = NULL;
+			curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &task);
+			if (!task) {
+				erofs_err("failed to get private data from curl handle");
+				curl_multi_remove_handle(it->multi_handle, easy_handle);
+				return -EIO;
+			}
 
-		if (!inode->i_diskbuf) {
-			inode->i_diskbuf = calloc(1, sizeof(*inode->i_diskbuf));
-			if (!inode->i_diskbuf)
-				return -ENOSPC;
-		} else {
-			erofs_diskbuf_close(inode->i_diskbuf);
-		}
+			ret = msg->data.result;
+			if (ret != CURLE_OK) {
+				erofs_err("getobject failed for object %s: %s",
+					  task->obj->key, curl_easy_strerror(ret));
+				curl_multi_remove_handle(it->multi_handle, easy_handle);
+				return -EIO;
+			}
 
-		vf = (struct erofs_vfile) {.fd =
-			erofs_diskbuf_reserve(inode->i_diskbuf, 0, &off)};
-		if (vf.fd < 0)
-			return -EBADF;
-		resp.pos = off;
-		resp.vf = &vf;
-		inode->datasource = EROFS_INODE_DATA_SOURCE_DISKBUF;
-	}
-	resp.end = resp.pos + inode->i_size;
+			curl_multi_remove_handle(it->multi_handle, easy_handle);
+			task->obj->downloaded = true;
+			
+			if (strcmp(task->obj->key, obj->key) == 0) {
+				s3erofs_handover_object_data(inode, task->obj);
+				found = true;
+			}
 
-	ret = s3erofs_request_perform(s3, &req, &resp);
-	if (resp.vf == &vf) {
-		erofs_diskbuf_commit(inode->i_diskbuf, resp.end - resp.pos);
-		if (ret) {
-			erofs_diskbuf_close(inode->i_diskbuf);
-			inode->i_diskbuf = NULL;
-			inode->datasource = EROFS_INODE_DATA_SOURCE_NONE;
+			if (it->objects[it->getobj_idx].key) {
+				s3erofs_add_getobject_task(
+					it, &it->objects[it->getobj_idx++], task->index);
+			}
 		}
 	}
-	if (ret)
-		return ret;
-	return resp.pos != resp.end ? -EIO : 0;
+
+	return ret;
 }
 
 int s3erofs_build_trees(struct erofs_importer *im, struct erofs_s3 *s3,
@@ -685,13 +846,14 @@ int s3erofs_build_trees(struct erofs_importer *im, struct erofs_s3 *s3,
 	st.st_uid = root->i_uid;
 	st.st_gid = root->i_gid;
 
-	ret = s3erofs_curl_easy_init(s3);
-	if (ret) {
+	s3->easy_curl = s3erofs_curl_easy_init();
+	if (!s3->easy_curl) {
+		ret = PTR_ERR(s3->easy_curl);
 		erofs_err("failed to initialize s3erofs: %s", erofs_strerror(ret));
 		return ret;
 	}
 
-	iter = s3erofs_create_object_iterator(s3, path, NULL);
+	iter = s3erofs_create_object_iterator(s3, path, NULL, fillzero);
 	if (IS_ERR(iter)) {
 		erofs_err("failed to create object iterator");
 		ret = PTR_ERR(iter);
@@ -747,11 +909,10 @@ int s3erofs_build_trees(struct erofs_importer *im, struct erofs_s3 *s3,
 		ret = __erofs_fill_inode(im, inode, &st, obj->key);
 		if (!ret && S_ISREG(inode->i_mode)) {
 			inode->i_size = obj->size;
-			if (fillzero)
+			if (fillzero || !inode->i_size)
 				ret = erofs_write_zero_inode(inode);
 			else
-				ret = s3erofs_remote_getobject(im, s3, inode,
-						iter->bucket, obj->key);
+				ret = s3erofs_remote_getobject(iter, obj, inode);
 		}
 		if (ret)
 			goto err_iter;
@@ -760,6 +921,7 @@ int s3erofs_build_trees(struct erofs_importer *im, struct erofs_s3 *s3,
 err_iter:
 	s3erofs_destroy_object_iterator(iter);
 err_global:
-	s3erofs_curl_easy_exit(s3);
+	curl_easy_cleanup(s3->easy_curl);
+	s3->easy_curl = NULL;
 	return ret;
 }
-- 
2.46.0



More information about the Linux-erofs mailing list