[Cbe-oss-dev] [RFC 3/4] spufs: add kernel support for spu task (PPU)

Geoff Levand geoffrey.levand at am.sony.com
Wed Aug 1 05:12:34 EST 2007


Hi Sebastian.  Just a few minor comments.

Sebastian Siewior wrote:
> Utilization of SPUs by the kernel, main implementation. 
> Functions that are offloaded to the SPU must be spitted into two parts:
> - SPU part (executing)
> - PPU part (prepare/glue)
> 
> The SPU part expects a buffer and maybe some other parameters and performs
> the work on the buffer. After the work/job is done, it requests the
> transfer back into main memory.
> The PPU part needs to split the information into this kind of job. Every
> job consists of one buffer (16 KiB max) and a few parameters. Once
> everything is prepared, the request is added to a list. There is soft
> limit for the number of requests that fit into this list. Once this limit
> is reached, all request are dropped (unless a flag is passed in order not
> to). The limit makes sure the user is not trying to process faster than
> the SPU is capable of. The "enqueue anyway" flag is necessary because under
> some circumstances the user may not be able to drop the request or try
> again later. Anyway, the user should slow down (and later, maybe kspu fires
> another SPU conntext but this is just theory).
> A separate thread dequeues the request(s) from the list and calls a user
> supplied function in order to enqueue this request in a ring buffer which is
> located on the SPU. This transit stop enables
> - enqueuing items if the ring buffer is full (not the list)
> - enqueuing items from non-blocking context
> After the callback function returns, the SPU starts "immediately" the
> work. Once the SPU performed the work, KSPU invokes another callback to inform
> the user, that his request is complete. 
> The PPU code is responsible for proper alignment & transfer size. 


I would like to see this overview as an inline doc in the source as well, so
that it can be read by someone trying to understand this functionality after
the patch is applied.  Maybe in kspu.c?


> --- /dev/null
> +++ b/arch/powerpc/platforms/cell/spufs/kspu.c
> @@ -0,0 +1,571 @@
> +/*
> + * Interface for accessing SPUs from the kernel.
> + *
> + * Author: Sebastian Siewior <sebastian at breakpoint.cc>
> + * License: GPLv2
> + */
> +
> +#include <asm/spu_priv1.h>
> +#include <asm/kspu/kspu.h>
> +#include <asm/kspu/merged_code.h>
> +#include <linux/kthread.h>
> +#include <linux/module.h>
> +#include <linux/init_task.h>
> +#include <linux/hardirq.h>
> +#include <linux/kernel.h>
> +#include "spufs.h"
> +#include "kspu_util.h"
> +#include "spu_kspu_dump.h"
> +
> +static struct kspu_code single_spu_code = {
> +	.code = spu_kspu_code,
> +	.code_len = sizeof(spu_kspu_code),
> +	.kspu_data_offset = KERNEL_SPU_DATA_OFFSET,
> +	.queue_mask = RB_SLOTS-1,
> +	.queue_entr_size = sizeof(struct kspu_job),
> +};
> +
> +static void free_kspu_context(struct kspu_context *kctx)
> +{
> +	struct spu_context *spu_ctx = kctx->spu_ctx;
> +	int ret;
> +
> +	if (spu_ctx->owner)
> +		spu_forget(spu_ctx);
> +	ret = put_spu_context(spu_ctx);
> +	WARN_ON(!ret);
> +	kfree(kctx->notify_cb_info);
> +	kfree(kctx);
> +}
> +
> +static void setup_stack(struct kspu_context *kctx)
> +{
> +	struct spu_context *ctx = kctx->spu_ctx;
> +	u8 *ls;
> +	u32 *u32p;
> +
> +	spu_acquire_saved(ctx);
> +	ls = ctx->ops->get_ls(ctx);
> +
> +#define BACKCHAIN (kctx->spu_code->kspu_data_offset - 16)
> +#define STACK_GAP 176
> +#define INITIAL_STACK (BACKCHAIN - STACK_GAP)


These could (should?) be inline functions.


> +
> +	BUG_ON(INITIAL_STACK > KSPU_LS_SIZE);
> +
> +	u32p = (u32 *) &ls[BACKCHAIN];
> +	u32p[0] = 0;
> +	u32p[1] = 0;
> +	u32p[2] = 0;
> +	u32p[3] = 0;
> +
> +	u32p = (u32 *) &ls[INITIAL_STACK];
> +	u32p[0] = BACKCHAIN;
> +	u32p[1] = 0;
> +	u32p[2] = 0;
> +	u32p[3] = 0;
> +
> +	ctx->csa.lscsa->gprs[1].slot[0] = INITIAL_STACK;
> +	spu_release(ctx);
> +	pr_debug("SPU's stack ready 0x%04x\n", INITIAL_STACK);
> +}
> +
> +static struct kspu_context *__init kcreate_spu_context(int flags,
> +		struct kspu_code *spu_code)
> +{
> +	struct kspu_context *kctx;
> +	struct spu_context *ctx;
> +	unsigned int ret;
> +	u8 *ls;
> +
> +	flags |= SPU_CREATE_EVENTS_ENABLED;
> +	ret = -EINVAL;
> +
> +	if (flags & (~SPU_CREATE_FLAG_ALL))
> +		goto err;
> +	/*
> +	 * it must be a multiple of 16 because this value is used to calculate
> +	 * the initial stack frame which must be 16byte aligned
> +	 */
> +	if (spu_code->kspu_data_offset & 15)
> +		goto err;
> +
> +	pr_debug("SPU's queue: %d elemets, %d bytes each (%d bytes total)\n",
> +			spu_code->queue_mask+1, spu_code->queue_entr_size,
> +			(spu_code->queue_mask+1) * spu_code->queue_entr_size);
> +
> +	ret = -EFBIG;
> +	if (spu_code->code_len > KSPU_LS_SIZE)
> +		goto err;
> +
> +	ret = -ENOMEM;
> +	kctx = kzalloc(sizeof *kctx, GFP_KERNEL);
> +	if (!kctx)
> +		goto err;
> +
> +	kctx->qlen = 0;
> +	kctx->spu_code = spu_code;
> +	init_waitqueue_head(&kctx->newitem_wq);
> +	spin_lock_init(&kctx->queue_lock);
> +	INIT_LIST_HEAD(&kctx->work_queue);
> +	kctx->notify_cb_info = kzalloc(sizeof(*kctx->notify_cb_info) *
> +			(kctx->spu_code->queue_mask+1), GFP_KERNEL);
> +	if (!kctx->notify_cb_info)
> +		goto err_notify;
> +
> +	ctx = kspu_alloc_context();
> +	if (!ctx)
> +		goto err_spu_ctx;
> +
> +	kctx->spu_ctx = ctx;
> +	ctx->flags = flags;
> +
> +	spu_acquire(ctx);
> +	ls = ctx->ops->get_ls(ctx);
> +	memcpy(ls, spu_code->code, spu_code->code_len);
> +	spu_release(ctx);
> +	setup_stack(kctx);
> +
> +	return kctx;
> +err_spu_ctx:
> +	kfree(kctx->notify_cb_info);
> +
> +err_notify:
> +	kfree(kctx);
> +err:
> +	return ERR_PTR(ret);
> +}
> +
> +/**
> + * kspu_get_rb_slot - get a free slot to queue a work request on the SPU.
> + * @kctx:	kspu context, where the free slot is required
> + *
> + * Returns a free slot where a request may be queued on. Repeated calls will
> + * return the same slot until it is marked as taken (by
> + * kspu_mark_rb_slot_ready()).
> + */
> +struct kspu_job *kspu_get_rb_slot(struct kspu_context *kctx)
> +{
> +	struct kspu_ring_data *ring_data;
> +	unsigned char *ls;
> +	unsigned int consumed;
> +	unsigned int outstanding;
> +	unsigned int queue_mask;
> +
> +	ls = kctx->spu_ctx->ops->get_ls(kctx->spu_ctx);
> +	ls += kctx->spu_code->kspu_data_offset;
> +	ring_data = (struct kspu_ring_data *) ls;
> +
> +	queue_mask = kctx->spu_code->queue_mask;
> +	barrier();
> +	consumed = ring_data->consumed;
> +	outstanding = ring_data->outstanding;
> +
> +	outstanding++;
> +
> +	if (outstanding == consumed)
> +		return NULL;
> +
> +	outstanding = ring_data->outstanding;
> +
> +	ls += sizeof(struct kspu_ring_data);
> +	/* ls points now to the first queue slot */
> +	ls += kctx->spu_code->queue_entr_size * (outstanding & queue_mask);
> +
> +	pr_debug("Return slot %d, at %p\n", (outstanding&queue_mask), ls);
> +	return (struct kspu_job *) ls;
> +


Seems like you have a misplaced line-feed here.


> +}
> +EXPORT_SYMBOL_GPL(kspu_get_rb_slot);
> +
> +/*
> + * kspu_mark_rb_slot_ready - mark a request valid.
> + * @kctx:	kspu context that the request belongs to
> + * @work:	work item that is used for notification. May be NULL.
> + *
> + * The slot will be marked as valid not returned kspu_get_rb_slot() until
> + * the request is processed. If @work is not NULL, work->notify will be
> + * called to notify the user, that his request is done.
> + */
> +void kspu_mark_rb_slot_ready(struct kspu_context *kctx,
> +		struct kspu_work_item *work)
> +{
> +	struct kspu_ring_data *ring_data;
> +	unsigned char *ls;
> +	unsigned int outstanding;
> +	unsigned int queue_mask;
> +
> +	ls = kctx->spu_ctx->ops->get_ls(kctx->spu_ctx);
> +	ls += kctx->spu_code->kspu_data_offset;
> +	ring_data = (struct kspu_ring_data *) ls;
> +
> +	queue_mask = kctx->spu_code->queue_mask;
> +	barrier();
> +	outstanding = ring_data->outstanding;
> +	kctx->notify_cb_info[outstanding & queue_mask] = work;
> +	pr_debug("item ready: outs %d, notification data %p\n",
> +			outstanding &queue_mask, work);
> +	outstanding++;
> +	BUG_ON(outstanding == ring_data->consumed);
> +	ring_data->outstanding = outstanding;
> +}
> +EXPORT_SYMBOL_GPL(kspu_mark_rb_slot_ready);
> +
> +static int notify_done_reqs(struct kspu_context *kctx)
> +{
> +	struct kspu_ring_data *ring_data;
> +	struct kspu_work_item *kspu_work;
> +	unsigned char *kjob;
> +	unsigned char *ls;
> +	unsigned int current_notify;
> +	unsigned int queue_mask;
> +	unsigned ret = 0;
> +
> +	ls = kctx->spu_ctx->ops->get_ls(kctx->spu_ctx);
> +	ls += kctx->spu_code->kspu_data_offset;
> +	ring_data = (struct kspu_ring_data *) ls;
> +	ls += sizeof(struct kspu_ring_data);
> +
> +	current_notify = kctx->last_notified;
> +	queue_mask = kctx->spu_code->queue_mask;
> +	pr_debug("notify| %d | %d\n", current_notify & queue_mask,
> +			ring_data->consumed & queue_mask);
> +
> +	while (ring_data->consumed != current_notify) {
> +
> +		pr_debug("do notify %d\n", current_notify);
> +
> +		kspu_work = kctx->notify_cb_info[current_notify & queue_mask];
> +		if (likely(kspu_work)) {
> +			kjob = (unsigned char *) ls +
> +				kctx->spu_code->queue_entr_size *
> +				(current_notify & queue_mask);
> +			kspu_work->notify(kspu_work, (struct kspu_job *) kjob);
> +		}
> +
> +		current_notify++;
> +		ret = 1;
> +		barrier();
> +	}
> +
> +	kctx->last_notified = current_notify;
> +	pr_debug("notify done\n");
> +	return ret;
> +}
> +
> +static int queue_requests(struct kspu_context *kctx)
> +{
> +	int ret;
> +	int empty;
> +	int queued = 0;
> +	struct kspu_work_item *work;
> +
> +	WARN_ON(in_irq());
> +
> +	do {
> +		if (!kspu_get_rb_slot(kctx))
> +			break;
> +
> +		spin_lock_bh(&kctx->queue_lock);
> +		empty = list_empty(&kctx->work_queue);
> +		if (unlikely(empty)) {
> +			work = NULL;
> +		} else {
> +			work = list_first_entry(&kctx->work_queue,
> +					struct kspu_work_item, list);
> +			list_del(&work->list);
> +			kctx->qlen--;
> +		}
> +		spin_unlock_bh(&kctx->queue_lock);
> +
> +		if (!work)
> +			break;
> +
> +		pr_debug("Adding item %p to queue\n", work);
> +		ret = work->enqueue(work);
> +		if (unlikely(ret == 0)) {
> +			pr_debug("Adding item %p again to list.\n", work);
> +			spin_lock_bh(&kctx->queue_lock);
> +			list_add(&work->list, &kctx->work_queue);
> +			kctx->qlen++;
> +			spin_unlock_bh(&kctx->queue_lock);
> +			break;
> +		}
> +
> +		queued = 1;
> +	} while (1);


In other places use use the construction while(1){}, but here you use do{}while(1)


> +	pr_debug("Queue requests done. => %d\n", queued);
> +	return queued;
> +}
> +
> +/**
> + * kspu_enqueue_work_item - Enqueue a request that supposed to be queued on the
> + * SPU.
> + * @kctx:	kspu context that should be used.
> + * @work:	Work item that should be placed on the SPU
> + *
> + * The functions puts the work item in a list belonging to the kctx. If the
> + * queue is full (KSPU_MAX_QUEUE_LENGTH limit) the request will be discarded
> + * unless the KSPU_MUST_BACKLOG flag has been specified. The flag should be
> + * specified if the user can't drop the requeuest or try again later (softirq).
> + * Once a SPU slot is available, the user supplied enqueue function
> + * (work->enqueue) will be called from a kthread context. The user may then
> + * enqueue the request on the SPU. This function may be called from softirq.
> + *
> + * Returns: -EINPROGRESS if the work item is enqueued,
> + * -EBUSY if the queue is full and the user should slowdown. Packet is
> + *  discarded unless KSPU_MUST_BACKLOG has been passed.
> + */
> +int kspu_enqueue_work_item(struct kspu_context *kctx,
> +		struct kspu_work_item *work, unsigned int flags)
> +{
> +	int ret = -EINPROGRESS;
> +
> +	spin_lock_bh(&kctx->queue_lock);
> +	if (unlikely(kctx->qlen > KSPU_MAX_QUEUE_LENGTH)) {
> +
> +		ret = -EBUSY;
> +		if (flags != KSPU_MUST_BACKLOG) {
> +			spin_unlock_bh(&kctx->queue_lock);
> +			return ret;
> +		}
> +	}
> +
> +	kctx->qlen++;
> +	list_add_tail(&work->list, &kctx->work_queue);
> +
> +	spin_unlock_bh(&kctx->queue_lock);
> +	wake_up_all(&kctx->newitem_wq);
> +	return ret;
> +}
> +EXPORT_SYMBOL_GPL(kspu_enqueue_work_item);
> +
> +static int pending_spu_work(struct kspu_context *kctx)
> +{
> +	struct kspu_ring_data *ring_data;
> +	unsigned char *ls;
> +
> +	ls = kctx->spu_ctx->ops->get_ls(kctx->spu_ctx);
> +	ls += kctx->spu_code->kspu_data_offset;
> +	ring_data = (struct kspu_ring_data *) ls;
> +
> +	pr_debug("pending spu work status: %u == %u ?\n",
> +			ring_data->consumed,
> +			ring_data->outstanding);
> +	barrier();
> +	if (ring_data->consumed == ring_data->outstanding)
> +		return 0;
> +
> +	return 1;
> +}
> +
> +/*
> + * Fill dummy requests in the ring buffer. Dummy requests are required
> + * to let MFC "transfer" data if there are not enough real requests.
> + * Transfers with a size of 0 bytes are nops for the MFC
> + */
> +static void kspu_fill_dummy_reqs(struct kspu_context *kctx)
> +{
> +
> +	struct kspu_ring_data *ring_data;
> +	unsigned char *ls;
> +	unsigned int requests;
> +	struct kspu_job *kjob;
> +	int i;
> +
> +	ls = kctx->spu_ctx->ops->get_ls(kctx->spu_ctx);
> +	ls += kctx->spu_code->kspu_data_offset;
> +	ring_data = (struct kspu_ring_data *) ls;
> +
> +	barrier();
> +	requests = ring_data->outstanding - ring_data->consumed;
> +
> +	if (requests >= DMA_BUFFERS *2)
> +		return;
> +
> +	for (i = requests; i < (DMA_BUFFERS*2); i++) {


This needs spaces, as in '(DMA_BUFFERS * 2)'.


> +		kjob = kspu_get_rb_slot(kctx);
> +		kjob->in_size = 0;
> +		kspu_mark_rb_slot_ready(kctx, NULL);
> +	}
> +}
> +
> +/*
> + * based on run.c spufs_run_spu
> + */
> +static int spufs_run_kernel_spu(void *priv)
> +{
> +	struct kspu_context *kctx = (struct kspu_context *) priv;
> +	struct spu_context *ctx = kctx->spu_ctx;
> +	int ret;
> +	u32 status;
> +	unsigned int npc = 0;
> +	int fastpath;
> +	DEFINE_WAIT(wait_for_stop);
> +	DEFINE_WAIT(wait_for_ibox);
> +	DEFINE_WAIT(wait_for_newitem);
> +
> +	spu_enable_spu(ctx);
> +	ctx->event_return = 0;
> +
> +	spu_acquire(ctx);
> +	if (ctx->state == SPU_STATE_SAVED) {
> +		__spu_update_sched_info(ctx);
> +
> +		ret = spu_activate(ctx, 0);
> +		if (ret) {
> +			spu_release(ctx);
> +			printk(KERN_ERR "could not obtain runnable spu: %d\n",
> +					ret);
> +			BUG();
> +		}
> +	} else {
> +		/*
> +		 * We have to update the scheduling priority under active_mutex
> +		 * to protect against find_victim().
> +		 */
> +		spu_update_sched_info(ctx);
> +	}
> +
> +


Too many line-feeds?


> +	spu_run_init(ctx, &npc);
> +
...




More information about the cbe-oss-dev mailing list