[Cbe-oss-dev] [PATCH 03/22]MARS/base: Workload queue cleanup

Yuji Mano yuji.mano at am.sony.com
Wed Jan 21 11:27:44 EST 2009


Clean up workload queue implimentation on host side

For maintenance and performance, this patch cleans up the workload
queue implementation on host side.

Signed-off-by: Kazunori Asayama <asayama at sm.sony.co.jp>
Signed-off-by: Yuji Mano <yuji.mano at am.sony.com>
---
 base/include/host/mars/workload_queue.h |   32 -
 base/src/host/lib/workload_queue.c      |  745 +++++++++-----------------------
 2 files changed, 248 insertions(+), 529 deletions(-)

--- a/base/include/host/mars/workload_queue.h
+++ b/base/include/host/mars/workload_queue.h
@@ -84,8 +84,8 @@ int mars_workload_queue_exit(struct mars
  * \n	MARS_ERROR_LIMIT	- workload queue is full
  */
 int mars_workload_queue_add_begin(struct mars_context *mars,
-				uint16_t *id,
-				uint64_t *workload_ea);
+				  uint16_t *id,
+				  uint64_t *workload_ea);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -126,7 +126,7 @@ int mars_workload_queue_add_end(struct m
  * \n	MARS_ERROR_STATE	- workload adding not started
  */
 int mars_workload_queue_add_cancel(struct mars_context *mars,
-				uint16_t id);
+				   uint16_t id);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -156,8 +156,8 @@ int mars_workload_queue_add_cancel(struc
  * \n	MARS_ERROR_STATE	- specified workload not added or finished
  */
 int mars_workload_queue_remove_begin(struct mars_context *mars,
-				uint16_t id,
-				uint64_t *workload_ea);
+				     uint16_t id,
+				     uint64_t *workload_ea);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -177,7 +177,7 @@ int mars_workload_queue_remove_begin(str
  * \n	MARS_ERROR_STATE	- workload removing not started
  */
 int mars_workload_queue_remove_end(struct mars_context *mars,
-				uint16_t id);
+				   uint16_t id);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -196,7 +196,7 @@ int mars_workload_queue_remove_end(struc
  * \n	MARS_ERROR_STATE	- workload removing not started
  */
 int mars_workload_queue_remove_cancel(struct mars_context *mars,
-				uint16_t id);
+				      uint16_t id);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -226,8 +226,8 @@ int mars_workload_queue_remove_cancel(st
  * \n	MARS_ERROR_STATE	- specified workload not added or finished
  */
 int mars_workload_queue_schedule_begin(struct mars_context *mars,
-				uint16_t id, uint8_t priority,
-				uint64_t *workload_ea);
+				       uint16_t id, uint8_t priority,
+				       uint64_t *workload_ea);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -248,7 +248,7 @@ int mars_workload_queue_schedule_begin(s
  * \n	MARS_ERROR_STATE	- workload scheduling not started
  */
 int mars_workload_queue_schedule_end(struct mars_context *mars,
-				uint16_t id);
+				     uint16_t id);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -268,7 +268,7 @@ int mars_workload_queue_schedule_end(str
  * \n	MARS_ERROR_STATE	- workload scheduling not started
  */
 int mars_workload_queue_schedule_cancel(struct mars_context *mars,
-				uint16_t id);
+					uint16_t id);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -288,8 +288,8 @@ int mars_workload_queue_schedule_cancel(
  * \n	MARS_ERROR_STATE	- invalid workload specified
  */
 int mars_workload_queue_wait(struct mars_context *mars,
-				uint16_t id,
-				uint64_t *workload_ea);
+			     uint16_t id,
+			     uint64_t *workload_ea);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -311,8 +311,8 @@ int mars_workload_queue_wait(struct mars
  * \n	MARS_ERROR_BUSY		- workload has not yet finished
  */
 int mars_workload_queue_try_wait(struct mars_context *mars,
-				uint16_t id,
-				uint64_t *workload_ea);
+				 uint16_t id,
+				 uint64_t *workload_ea);
 
 /**
  * \ingroup group_mars_workload_queue
@@ -329,7 +329,7 @@ int mars_workload_queue_try_wait(struct 
  * \n	MARS_ERROR_STATE	- invalid workload specified
  */
 int mars_workload_queue_signal_send(struct mars_context *mars,
-				uint16_t id);
+				    uint16_t id);
 
 #if defined(__cplusplus)
 }
--- a/base/src/host/lib/workload_queue.c
+++ b/base/src/host/lib/workload_queue.c
@@ -48,39 +48,51 @@
 #include "kernel_internal_types.h"
 #include "workload_internal_types.h"
 
-static inline uint64_t workload_queue_block_ea(uint64_t queue_ea, int block)
+static inline uint64_t get_workload_ea(uint64_t queue_ea, int workload_id)
 {
-	return queue_ea +
-		offsetof(struct mars_workload_queue, block) +
-		sizeof(struct mars_workload_queue_block) * block;
+	uint64_t context_ea =
+		mars_ea_get_uint64(queue_ea +
+				   offsetof(struct mars_workload_queue_header,
+				   context_ea));
+
+	return context_ea + sizeof(struct mars_workload_context) * workload_id;
 }
 
-static inline uint64_t workload_queue_workload_ea(uint64_t queue_ea,
-						  int workload_id)
+static inline uint64_t get_block_ea(uint64_t queue_ea, int block)
 {
-	uint64_t context_ea =
-		mars_ea_get_uint64(
-			queue_ea +
-			offsetof(struct mars_workload_queue_header,
-				 context_ea));
-	return context_ea +
-		sizeof(struct mars_workload_context) * workload_id;
+	return queue_ea +
+	       offsetof(struct mars_workload_queue, block) +
+	       sizeof(struct mars_workload_queue_block) * block;
 }
 
-static inline uint64_t workload_queue_block_bits_ea(uint64_t block_ea,
-						    int index)
+static inline uint64_t get_block_bits_ea(uint64_t block_ea, int index)
 {
 	return block_ea +
-		offsetof(struct mars_workload_queue_block, bits) +
-		sizeof(uint64_t) * index;
+	       offsetof(struct mars_workload_queue_block, bits) +
+	       sizeof(uint64_t) * index;
+}
+
+static void init_block(uint64_t block_ea, uint64_t initial_bits)
+{
+	int index;
+	struct mars_workload_queue_block *block =
+		mars_ea_work_area_get(block_ea,
+				      MARS_WORKLOAD_QUEUE_BLOCK_ALIGN,
+				      sizeof(struct mars_workload_queue_block));
+
+	for (index = 0; index < MARS_WORKLOAD_PER_BLOCK; index++)
+		block->bits[index] = initial_bits;
+
+	mars_ea_put(block_ea, block, sizeof(struct mars_workload_queue_block));
+	mars_mutex_reset(block_ea);
 }
 
 int mars_workload_queue_create(struct mars_context *mars)
 {
-	struct mars_workload_queue *queue;
 	int block;
-	int index;
 	uint64_t queue_ea;
+	uint64_t bits;
+	struct mars_workload_queue *queue;
 
 	/* check function params */
 	if (!mars)
@@ -109,30 +121,15 @@ int mars_workload_queue_create(struct ma
 	/* update queue header on EA */
 	mars_ea_put(queue_ea, queue, sizeof(struct mars_workload_queue_header));
 
+	/* create initial bit pattern of workload queue entries */
+	bits = 0;
+	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_NONE);
+	/* other bits are set by mars_workload_queue_schedule_begin properly */
+
 	/* initialize workload queue blocks */
 	for (block = 0; block < MARS_WORKLOAD_NUM_BLOCKS; block++) {
-		uint64_t block_ea = workload_queue_block_ea(queue_ea, block);
-
-		for (index = 0; index < MARS_WORKLOAD_PER_BLOCK; index++) {
-			uint64_t bits_ea =
-				workload_queue_block_bits_ea(block_ea, index);
-			uint64_t bits = 0;
-
-			MARS_BITS_SET(&bits, STATE,
-				MARS_WORKLOAD_STATE_NONE);
-			MARS_BITS_SET(&bits, PRIORITY,
-				MARS_WORKLOAD_PRIORITY_MIN);
-			MARS_BITS_SET(&bits, COUNTER,
-				MARS_WORKLOAD_COUNTER_MIN);
-			MARS_BITS_SET(&bits, SIGNAL,
-				MARS_WORKLOAD_SIGNAL_OFF);
-			MARS_BITS_SET(&bits, WAIT_ID,
-				MARS_WORKLOAD_ID_NONE);
-
-			mars_ea_put_uint64(bits_ea, bits);
-		}
-
-		mars_mutex_reset(block_ea);
+		uint64_t block_ea = get_block_ea(queue_ea, block);
+		init_block(block_ea, bits);
 	}
 
 	/* sync EA */
@@ -144,12 +141,31 @@ int mars_workload_queue_create(struct ma
 	return MARS_SUCCESS;
 }
 
+static int is_block_empty(uint64_t block_ea)
+{
+	int index;
+	struct mars_workload_queue_block *block =
+		mars_ea_work_area_get(block_ea,
+				      MARS_WORKLOAD_QUEUE_BLOCK_ALIGN,
+				      sizeof(struct mars_workload_queue_block));
+
+	/* get the workload queue block from shared memory */
+	mars_ea_get(block_ea, block, sizeof(struct mars_workload_queue_block));
+
+	/* check status */
+	for (index = 0; index < MARS_WORKLOAD_PER_BLOCK; index++) {
+		if (MARS_BITS_GET(&block->bits[index], STATE) !=
+		    MARS_WORKLOAD_STATE_NONE)
+			return MARS_ERROR_STATE;
+	}
+
+	return MARS_SUCCESS;
+}
+
 int mars_workload_queue_destroy(struct mars_context *mars)
 {
-	uint64_t queue_ea;
 	int block;
-	int index;
-	uint16_t id = 0;
+	uint64_t queue_ea;
 
 	/* check function params */
 	if (!mars)
@@ -160,22 +176,11 @@ int mars_workload_queue_destroy(struct m
 	queue_ea = mars->workload_queue_ea;
 
 	/* check for any workloads left in workload queue */
-	while (id < MARS_WORKLOAD_MAX) {
-		uint64_t block_ea, bits_ea;
-		uint64_t bits;
-
-		block = id / MARS_WORKLOAD_PER_BLOCK;
-		index = id % MARS_WORKLOAD_PER_BLOCK;
-
-		/* get bits from EA */
-		block_ea = workload_queue_block_ea(queue_ea, block);
-		bits_ea = workload_queue_block_bits_ea(block_ea, index);
-		bits = mars_ea_get_uint64(bits_ea);
-
-		if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_NONE)
-			return MARS_ERROR_STATE;
-
-		id++;
+	for (block = 0; block < MARS_WORKLOAD_NUM_BLOCKS; block++) {
+		uint64_t block_ea = get_block_ea(queue_ea, block);
+		int ret = is_block_empty(block_ea);
+		if (ret != MARS_SUCCESS)
+			return ret;
 	}
 
 	/* free workload queue instance */
@@ -206,175 +211,90 @@ int mars_workload_queue_exit(struct mars
 	return MARS_SUCCESS;
 }
 
-int mars_workload_queue_add_begin(struct mars_context *mars,
-				uint16_t *id,
-				uint64_t *workload_ea)
+static int alloc_block(uint64_t block_ea)
 {
-	uint64_t queue_ea;
-	int block = 0;
-	int index = 0;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (!id)
-		return MARS_ERROR_NULL;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
+	int ret = -1;
+	int index;
+	struct mars_workload_queue_block *block =
+		mars_ea_work_area_get(block_ea,
+				      MARS_WORKLOAD_QUEUE_BLOCK_ALIGN,
+				      sizeof(struct mars_workload_queue_block));
 
 	mars_mutex_lock(block_ea);
 
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* initialize return id to 0 */
-	*id = 0;
-
-	while (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_NONE) {
-		(*id)++;
-		index++;
-		if (index == MARS_WORKLOAD_PER_BLOCK) {
-			index = 0;
+	/* get the workload queue block from shared memory */
+	mars_ea_get(block_ea, block, sizeof(struct mars_workload_queue_block));
 
-			mars_mutex_unlock(block_ea);
-
-			if (++block == MARS_WORKLOAD_NUM_BLOCKS)
-				return MARS_ERROR_LIMIT;
-
-			block_ea = workload_queue_block_ea(queue_ea, block);
-			mars_mutex_lock(block_ea);
+	/* check status */
+	for (index = 0; index < MARS_WORKLOAD_PER_BLOCK; index++) {
+		uint64_t bits = block->bits[index];
+		if (MARS_BITS_GET(&bits, STATE) == MARS_WORKLOAD_STATE_NONE) {
+			MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_ADDING);
+			mars_ea_put_uint64(get_block_bits_ea(block_ea, index),
+					   bits);
+			ret = index;
+			break;
 		}
-		bits_ea = workload_queue_block_bits_ea(block_ea, index);
-		bits = mars_ea_get_uint64(bits_ea);
 	}
 
-	/* set state to adding */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_ADDING);
-
-	mars_ea_put_uint64(bits_ea, bits);
-
 	mars_mutex_unlock(block_ea);
 
-	/* if requested set workload context pointer to return */
-	if (workload_ea)
-		*workload_ea = workload_queue_workload_ea(queue_ea, *id);
-
-	return MARS_SUCCESS;
+	return ret;
 }
 
-int mars_workload_queue_add_end(struct mars_context *mars,
-				uint16_t id)
+int mars_workload_queue_add_begin(struct mars_context *mars,
+				  uint16_t *id,
+				  uint64_t *workload_ea)
 {
-	uint64_t queue_ea;
 	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_ADDING) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
-
-	/* reset workload queue bits and set state to finished state */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_FINISHED);
-
-	mars_ea_put_uint64(bits_ea, bits);
-
-	mars_mutex_unlock(block_ea);
-
-	return MARS_SUCCESS;
-}
-
-int mars_workload_queue_add_cancel(struct mars_context *mars,
-				uint16_t id)
-{
+	int index = 0;
 	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
 
 	/* check function params */
 	if (!mars)
 		return MARS_ERROR_NULL;
 	if (!mars->workload_queue_ea)
 		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
+	if (!id)
+		return MARS_ERROR_NULL;
 
 	queue_ea = mars->workload_queue_ea;
 
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
+	/* find free workload queue entry */
+	for (block = 0; block < MARS_WORKLOAD_NUM_BLOCKS; block++) {
+		uint64_t block_ea = get_block_ea(queue_ea, block);
 
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_ADDING) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
+		index = alloc_block(block_ea);
+		if (index >= 0)
+			break;
 	}
 
-	/* set state back to none state */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_NONE);
+	if (block >= MARS_WORKLOAD_NUM_BLOCKS)
+		return MARS_ERROR_LIMIT;
 
-	mars_ea_put_uint64(bits_ea, bits);
+	*id = block * MARS_WORKLOAD_PER_BLOCK + index;
 
-	mars_mutex_unlock(block_ea);
+	/* if requested set workload context pointer to return */
+	if (workload_ea)
+		*workload_ea = get_workload_ea(queue_ea, *id);
 
 	return MARS_SUCCESS;
 }
 
-int mars_workload_queue_remove_begin(struct mars_context *mars,
-				uint16_t id,
-				uint64_t *workload_ea)
+static int change_bits(struct mars_context *mars,
+		       uint16_t id,
+		       uint64_t *workload_ea,
+		       int (*check_bits)(uint64_t bits, uint64_t param),
+		       uint64_t check_bits_param,
+		       uint64_t (*set_bits)(uint64_t bits, uint64_t param),
+		       uint64_t set_bits_param)
 {
-	uint64_t queue_ea;
 	int block;
 	int index;
+	uint64_t queue_ea;
+	uint64_t block_ea;
+	uint64_t bits_ea;
 	uint64_t bits;
-	uint64_t block_ea, bits_ea;
 
 	/* check function params */
 	if (!mars)
@@ -391,290 +311,142 @@ int mars_workload_queue_remove_begin(str
 	index = id % MARS_WORKLOAD_PER_BLOCK;
 
 	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
+	block_ea = get_block_ea(queue_ea, block);
 
 	mars_mutex_lock(block_ea);
 
 	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
+	bits_ea = get_block_bits_ea(block_ea, index);
 	bits = mars_ea_get_uint64(bits_ea);
 
 	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_FINISHED) {
+	if (!(*check_bits)(bits, check_bits_param)) {
 		mars_mutex_unlock(block_ea);
 		return MARS_ERROR_STATE;
 	}
 
-	/* set state to removing */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_REMOVING);
+	/* reset workload queue bits and set state to new state */
+	bits = (*set_bits)(bits, set_bits_param);
 
+	/* store new bits into queue block */
 	mars_ea_put_uint64(bits_ea, bits);
 
 	mars_mutex_unlock(block_ea);
 
 	/* if requested set workload context pointer to return */
 	if (workload_ea)
-		*workload_ea = workload_queue_workload_ea(queue_ea, id);
+		*workload_ea = get_workload_ea(queue_ea, id);
 
 	return MARS_SUCCESS;
 }
 
-int mars_workload_queue_remove_end(struct mars_context *mars,
-				uint16_t id)
+static int check_state_bits(uint64_t bits, uint64_t state)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_REMOVING) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
-
-	/* set state to none */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_NONE);
+	return (MARS_BITS_GET(&bits, STATE) == state);
+}
 
-	mars_ea_put_uint64(bits_ea, bits);
+static uint64_t set_state_bits(uint64_t bits, uint64_t state)
+{
+	MARS_BITS_SET(&bits, STATE, state);
 
-	mars_mutex_unlock(block_ea);
+	return bits;
+}
 
-	return MARS_SUCCESS;
+static int change_state(struct mars_context *mars,
+			uint16_t id,
+			uint64_t *workload_ea,
+			unsigned int old_state,
+			unsigned int new_state)
+{
+	return change_bits(mars, id, workload_ea,
+			   check_state_bits, old_state,
+			   set_state_bits, new_state);
 }
 
-int mars_workload_queue_remove_cancel(struct mars_context *mars,
+int mars_workload_queue_add_end(struct mars_context *mars,
 				uint16_t id)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_REMOVING) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
-
-	/* set state back to finished */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_FINISHED);
-
-	mars_ea_put_uint64(bits_ea, bits);
-
-	mars_mutex_unlock(block_ea);
-
-	return MARS_SUCCESS;
+	return change_state(mars, id, NULL,
+			    MARS_WORKLOAD_STATE_ADDING,
+			    MARS_WORKLOAD_STATE_FINISHED);
 }
 
-int mars_workload_queue_schedule_begin(struct mars_context *mars,
-				uint16_t id, uint8_t priority,
-				uint64_t *workload_ea)
+int mars_workload_queue_add_cancel(struct mars_context *mars,
+				   uint16_t id)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
+	return change_state(mars, id, NULL,
+			    MARS_WORKLOAD_STATE_ADDING,
+			    MARS_WORKLOAD_STATE_NONE);
+}
 
-	mars_mutex_lock(block_ea);
+int mars_workload_queue_remove_begin(struct mars_context *mars,
+				     uint16_t id,
+				     uint64_t *workload_ea)
+{
+	return change_state(mars, id, workload_ea,
+			    MARS_WORKLOAD_STATE_FINISHED,
+			    MARS_WORKLOAD_STATE_REMOVING);
+}
 
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
+int mars_workload_queue_remove_end(struct mars_context *mars,
+				   uint16_t id)
+{
+	return change_state(mars, id, NULL,
+			    MARS_WORKLOAD_STATE_REMOVING,
+			    MARS_WORKLOAD_STATE_NONE);
+}
 
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_FINISHED) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
+int mars_workload_queue_remove_cancel(struct mars_context *mars,
+				      uint16_t id)
+{
+	return change_state(mars, id, NULL,
+			    MARS_WORKLOAD_STATE_REMOVING,
+			    MARS_WORKLOAD_STATE_FINISHED);
+}
 
-	/* reset workload queue bits and set state to scheduling */
+static uint64_t set_schedule_bits(uint64_t bits, uint64_t priority)
+{
 	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_SCHEDULING);
 	MARS_BITS_SET(&bits, PRIORITY, priority);
 	MARS_BITS_SET(&bits, COUNTER, MARS_WORKLOAD_COUNTER_MIN);
 	MARS_BITS_SET(&bits, SIGNAL, MARS_WORKLOAD_SIGNAL_OFF);
 	MARS_BITS_SET(&bits, WAIT_ID, MARS_WORKLOAD_ID_NONE);
 
-	mars_ea_put_uint64(bits_ea, bits);
-
-	mars_mutex_unlock(block_ea);
-
-	/* if requested set workload context pointer to return */
-	if (workload_ea)
-		*workload_ea = workload_queue_workload_ea(queue_ea, id);
+	return bits;
+}
 
-	return MARS_SUCCESS;
+int mars_workload_queue_schedule_begin(struct mars_context *mars,
+				       uint16_t id, uint8_t priority,
+				       uint64_t *workload_ea)
+{
+	return change_bits(mars, id, workload_ea,
+			   check_state_bits, MARS_WORKLOAD_STATE_FINISHED,
+			   set_schedule_bits, priority);
 }
 
 int mars_workload_queue_schedule_end(struct mars_context *mars,
-				uint16_t id)
+				     uint16_t id)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_SCHEDULING) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
-
-	/* set state to ready */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_READY);
-
-	mars_ea_put_uint64(bits_ea, bits);
-
-	mars_mutex_unlock(block_ea);
-
-	return MARS_SUCCESS;
+	return change_state(mars, id, NULL,
+			    MARS_WORKLOAD_STATE_SCHEDULING,
+			    MARS_WORKLOAD_STATE_READY);
 }
 
 int mars_workload_queue_schedule_cancel(struct mars_context *mars,
-				uint16_t id)
+					uint16_t id)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_SCHEDULING) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
-
-	/* set state back to finished */
-	MARS_BITS_SET(&bits, STATE, MARS_WORKLOAD_STATE_FINISHED);
-
-	mars_ea_put_uint64(bits_ea, bits);
-
-	mars_mutex_unlock(block_ea);
-
-	return MARS_SUCCESS;
+	return change_state(mars, id, NULL,
+			    MARS_WORKLOAD_STATE_SCHEDULING,
+			    MARS_WORKLOAD_STATE_FINISHED);
 }
 
-static int test_workload_state_finished(uint32_t upper, void *param)
+static int is_workload_finished(uint32_t upper, void *param)
 {
+	(void)param;
+
 	/* this function assumes 'STATE' is stored in upper 32bits */
 	uint64_t bits = (uint64_t)upper << 32;
 
-	(void)param;
-
 	switch (MARS_BITS_GET(&bits, STATE)) {
 	case MARS_WORKLOAD_STATE_FINISHED:
 		return MARS_SUCCESS;
@@ -685,16 +457,17 @@ static int test_workload_state_finished(
 	}
 }
 
-int mars_workload_queue_wait(struct mars_context *mars,
-				uint16_t id,
-				uint64_t *workload_ea)
+static int workload_queue_wait(struct mars_context *mars,
+			       uint16_t id,
+			       int try,
+			       uint64_t *workload_ea)
 {
 	int ret;
-	uint64_t queue_ea;
 	int block;
 	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
+	uint64_t queue_ea;
+	uint64_t block_ea;
+	uint64_t bits_ea;
 
 	/* check function params */
 	if (!mars)
@@ -711,113 +484,59 @@ int mars_workload_queue_wait(struct mars
 	index = id % MARS_WORKLOAD_PER_BLOCK;
 
 	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
+	block_ea = get_block_ea(queue_ea, block);
+	bits_ea = get_block_bits_ea(block_ea, index);
 
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
+	if (try) {
+		ret = is_workload_finished(mars_ea_get_uint32(bits_ea), NULL);
+		if (ret < 0)
+			ret = MARS_ERROR_BUSY;
+	} else {
+		ret = mars_ea_cond_wait(bits_ea, is_workload_finished, NULL);
+	}
 
-	ret = mars_ea_cond_wait(bits_ea, test_workload_state_finished, NULL);
-	if (ret)
+	if (ret != MARS_SUCCESS)
 		return ret;
 
 	/* if requested set workload context pointer to return */
 	if (workload_ea)
-		*workload_ea = workload_queue_workload_ea(queue_ea, id);
+		*workload_ea = get_workload_ea(queue_ea, id);
 
 	return MARS_SUCCESS;
 }
 
-int mars_workload_queue_try_wait(struct mars_context *mars,
-				uint16_t id,
-				uint64_t *workload_ea)
+int mars_workload_queue_wait(struct mars_context *mars,
+			     uint16_t id,
+			     uint64_t *workload_ea)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
+	return workload_queue_wait(mars, id, 0, workload_ea);
+}
 
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
+int mars_workload_queue_try_wait(struct mars_context *mars,
+				 uint16_t id,
+				 uint64_t *workload_ea)
+{
+	return workload_queue_wait(mars, id, 1, workload_ea);
+}
 
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) == MARS_WORKLOAD_STATE_NONE)
-		return MARS_ERROR_STATE;
+static int check_signal_bits(uint64_t bits, uint64_t params)
+{
+	(void)params;
 
-	/* check if workload is finished */
-	if (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_FINISHED)
-		return MARS_ERROR_BUSY;
+	return (MARS_BITS_GET(&bits, STATE) != MARS_WORKLOAD_STATE_NONE);
+}
 
-	/* if requested set workload context pointer to return */
-	if (workload_ea)
-		*workload_ea = workload_queue_workload_ea(queue_ea, id);
+static uint64_t set_signal_bits(uint64_t bits, uint64_t value)
+{
+	MARS_BITS_SET(&bits, SIGNAL, value);
 
-	return MARS_SUCCESS;
+	return bits;
 }
 
 int mars_workload_queue_signal_send(struct mars_context *mars,
-				uint16_t id)
+				    uint16_t id)
 {
-	uint64_t queue_ea;
-	int block;
-	int index;
-	uint64_t bits;
-	uint64_t block_ea, bits_ea;
-
-	/* check function params */
-	if (!mars)
-		return MARS_ERROR_NULL;
-	if (!mars->workload_queue_ea)
-		return MARS_ERROR_PARAMS;
-	if (id >= MARS_WORKLOAD_MAX)
-		return MARS_ERROR_PARAMS;
-
-	queue_ea = mars->workload_queue_ea;
-
-	/* calculate block/index from id */
-	block = id / MARS_WORKLOAD_PER_BLOCK;
-	index = id % MARS_WORKLOAD_PER_BLOCK;
-
-	/* prepare work area for queue block */
-	block_ea = workload_queue_block_ea(queue_ea, block);
-
-	mars_mutex_lock(block_ea);
-
-	/* get bits from workload queue block */
-	bits_ea = workload_queue_block_bits_ea(block_ea, index);
-	bits = mars_ea_get_uint64(bits_ea);
-
-	/* check for valid state */
-	if (MARS_BITS_GET(&bits, STATE) == MARS_WORKLOAD_STATE_NONE) {
-		mars_mutex_unlock(block_ea);
-		return MARS_ERROR_STATE;
-	}
-
-	/* set signal bit on */
-	MARS_BITS_SET(&bits, SIGNAL, MARS_WORKLOAD_SIGNAL_ON);
-
-	mars_ea_put_uint64(bits_ea, bits);
-
-	mars_mutex_unlock(block_ea);
-
-	return MARS_SUCCESS;
+	return change_bits(mars, id, NULL,
+			   check_signal_bits, 0,
+			   set_signal_bits, MARS_WORKLOAD_SIGNAL_ON);
 }






More information about the cbe-oss-dev mailing list