[Cbe-oss-dev] [RFC/PATCH] libspe2: use mapped mailbox registers to speed up mailbox communication

Arnd Bergmann arnd at arndb.de
Fri May 18 22:12:35 EST 2007


On Friday 18 May 2007, stenzel at de.ibm.com wrote:
>  
> +static __inline__ int _base_spe_out_mbox_read_ps(spe_context_ptr_t spectx,
> +                        unsigned int mbox_data[], 
> +                        int count)
> +{
> +	int rc;
> +	int entries;
> +
> +	volatile struct spe_spu_control_area *cntl_area =
> +        	spectx->base_private->cntl_mmap_base;
> +
> +	_base_spe_context_lock(spectx, FD_MBOX);
> +	rc = 0;
> +	while (rc < count) {
> +		entries = cntl_area->SPU_Mbox_Stat & 0xFF;
> +		if (entries) {
> +			mbox_data[rc] = cntl_area->SPU_Out_Mbox;
> +			rc ++;
> +		} else {
> +			break;
> +		}
> +	}
> +	_base_spe_context_unlock(spectx, FD_MBOX);
> +	return rc;
> +}

I think you really should have synchronizing accessors for the registers
like the kernel has. Simply dereferencing a pointer to an mmio area
is usually not a good thing to do. Declaring the pointer volatile here
helps a bit, but is probably not exactly correct.

Also, it should be more efficient not to read the number of entries
in every loop iteration -- mmio read is rather slow, especially
when going across NUMA nodes.

It may also be good to give up the lock occasionally, when you are
transferring large amounts of data through the mailbox, with the
spu constantly adding new data, you lock out other threads that might
want to access some other data structure.

How about something like this:

	do {
		lock();
		avail = mmio_read_uint32(&cntl_area->SPU_Mbox_Stat);
		if (!avail) {
			unlock();
			break;
		}
		while (rc < count && avail) {
			mbox_data[rc] = mmio_read_uint32(
						&cntl_area->SPU_Out_Mbox);
			avail--;
			rc++;
		}
		unlock();
	} while (rc < count);


> +static __inline__ int _base_spe_in_mbox_write_ps(spe_context_ptr_t spectx,
> +                        unsigned int *mbox_data, 
> +                        int count)
> +{
> +	volatile struct spe_spu_control_area *cntl_area =
> +        	spectx->base_private->cntl_mmap_base;
> +	int total = 0;
> +	unsigned int *aux;
> +
> +	_base_spe_context_lock(spectx, FD_WBOX);
> +	aux = mbox_data;
> +	while (total < count) {
> +		int space = (cntl_area->SPU_Mbox_Stat >> 8) & 0xFF;
> +		if (space) {
> +			cntl_area->SPU_In_Mbox = *aux++;
> +			total++;
> +		} else {
> +			break;
> +		}
> +	}
> +	_base_spe_context_unlock(spectx, FD_WBOX);
> +
> +	return total;
> +}

Same here, except that converting the loop to read "space" only once
is even more important because then you only need to do cheap mmio writes.

> +
> +		// now write the remaining via spufs
> +		while (total < count) {
> +			aux = mbox_data + total;
>  			rc = write(open_if_closed(spectx,FD_WBOX, 0),
> -					(const char *)mbox_data + total, 4*count - total);
> +				  aux, 4*(count - total));
>  			if (rc == -1) {
>  				break;
>  			}
> -			total += rc;
> +			total += (rc/4);
>  		}
>  		break;

You need to hold the lock around the "write" system call, since it modifies the
same registers that you are trying to protect with the lock.

>  	case  SPE_MBOX_ANY_BLOCKING: // write at least one, even if blocking
> -		total = rc = write(open_if_closed(spectx,FD_WBOX, 0), mbox_data, 4*count);
> +		total = rc = 0;
> +		// first try to write directly if we map the PS
> +		if (spectx->base_private->flags & SPE_MAP_PS) 
> +			total = _base_spe_in_mbox_write_ps(spectx, mbox_data, count);
> +		// if none was written write via spufs
> +		if (total == 0) {
> +			rc = write(open_if_closed(spectx,FD_WBOX, 0), mbox_data, 4*count);
> +			total = rc/4;
> +		}
>  		break;

same here

>  	case  SPE_MBOX_ANY_NONBLOCKING: // only write, if non blocking
> -		rc = write(open_if_closed(spectx,FD_WBOX_NB, 0), mbox_data, 4*count);
> -		if (rc == -1 && errno == EAGAIN) {
> -			rc = 0;
> -			errno = 0;
> +		total = rc = 0;
> +		// write directly if we map the PS else write via spufs
> +		if (spectx->base_private->flags & SPE_MAP_PS) {
> +			total = _base_spe_in_mbox_write_ps(spectx, mbox_data, count);
> +		} else { 
> +			rc = write(open_if_closed(spectx,FD_WBOX_NB, 0), mbox_data, 4*count);
> +			if (rc == -1 && errno == EAGAIN) {
> +				rc = 0;
> +				errno = 0;
> +			}
> +			total = rc/4;
>  		}
> -		total = rc;
>  		break;
>  
>  	default:

here, you don't need to hold it because that path is only entered when not
using direct access, but it would be good to add a comment explaining this.

>  int _base_spe_in_mbox_status(spe_context_ptr_t spectx)
>  {
>  	int rc, ret;
> +	volatile struct spe_spu_control_area *cntl_area =
> +        	spectx->base_private->cntl_mmap_base;
>  
> -	rc = read(open_if_closed(spectx,FD_WBOX_STAT, 0), &ret, 4);
> -	if (rc != 4)
> -		ret = -1;
> +	if (spectx->base_private->flags & SPE_MAP_PS) {
> +		_base_spe_context_lock(spectx, FD_WBOX_STAT);
> +		ret = (cntl_area->SPU_Mbox_Stat >> 8) & 0xFF;
> +		_base_spe_context_unlock(spectx, FD_WBOX_STAT);
> +	} else {
> +		rc = read(open_if_closed(spectx,FD_WBOX_STAT, 0), &ret, 4);
> +		if (rc != 4)
> +			ret = -1;
> +	}
>  
>  	return ret;

I wouldn't bother optimizing the status access functions, no application
written to the libspe2 interface should need them in the first place.

Note that if you see a reason to do it anyway, you also don't need the
lock around here. Using the count value without the lock is racy anyway,
independent of whether you hold the lock while reading or not.

> @@ -123,10 +205,18 @@ int _base_spe_in_mbox_status(spe_context
>  int _base_spe_out_mbox_status(spe_context_ptr_t spectx)
>  {
>          int rc, ret;
> +	volatile struct spe_spu_control_area *cntl_area =
> +        	spectx->base_private->cntl_mmap_base;
>  
> -        rc = read(open_if_closed(spectx,FD_MBOX_STAT, 0), &ret, 4);
> -        if (rc != 4)
> -                ret = -1;
> +	if (spectx->base_private->flags & SPE_MAP_PS) {
> +		_base_spe_context_lock(spectx, FD_MBOX_STAT);
> +		ret = cntl_area->SPU_Mbox_Stat & 0xFF;
> +		_base_spe_context_unlock(spectx, FD_MBOX_STAT);
> +	} else {
> +        	rc = read(open_if_closed(spectx,FD_MBOX_STAT, 0), &ret, 4);
> +	        if (rc != 4)
> +        	        ret = -1;
> +	}
>  
>          return ret;
>  	
> @@ -135,11 +225,19 @@ int _base_spe_out_mbox_status(spe_contex
>  int _base_spe_out_intr_mbox_status(spe_context_ptr_t spectx)
>  {
>          int rc, ret;
> +	volatile struct spe_spu_control_area *cntl_area =
> +        	spectx->base_private->cntl_mmap_base;
>  
> -        rc = read(open_if_closed(spectx,FD_IBOX_STAT, 0), &ret, 4);
> -        if (rc != 4)
> -                ret = -1;
> +	if (spectx->base_private->flags & SPE_MAP_PS) {
> +		_base_spe_context_lock(spectx, FD_IBOX_STAT);
> +		ret = (cntl_area->SPU_Mbox_Stat >> 16) & 0xFF;
> +		_base_spe_context_unlock(spectx, FD_IBOX_STAT);
> +	} else {
> +	        rc = read(open_if_closed(spectx,FD_IBOX_STAT, 0), &ret, 4);
> +	        if (rc != 4)
> +        	        ret = -1;
>  
> +	}
>          return ret;
>  }

same here

> @@ -200,23 +298,43 @@ int _base_spe_out_intr_mbox_read(spe_con
>                          unsigned int data )
>  {
>  	int rc;
> +	volatile struct spe_spu_control_area *cntl_area =
> +        	spectx->base_private->cntl_mmap_base;
> +
> +	if (spectx->base_private->flags & SPE_MAP_PS) {
> +		if (signal_reg == SPE_SIG_NOTIFY_REG_1) {
> +			spe_sig_notify_1_area_t *sig = spectx->base_private->signal1_mmap_base;
> +
> +			_base_spe_context_lock(spectx, FD_SIG1);
> +			sig->SPU_Sig_Notify_1 = data;
> +			_base_spe_context_unlock(spectx, FD_SIG1);
> +		} else if (signal_reg == SPE_SIG_NOTIFY_REG_2) {
> +			spe_sig_notify_2_area_t *sig = spectx->base_private->signal2_mmap_base;
> +
> +			_base_spe_context_lock(spectx, FD_SIG2);
> +			sig->SPU_Sig_Notify_2 = data;
> +			_base_spe_context_unlock(spectx, FD_SIG2);
> +		}
> +		rc = 0;

These also don't need a lock. Accessing the signal notification registers
is guaranteed to be atomic.

	Arnd <><



More information about the cbe-oss-dev mailing list