[Cbe-oss-dev] [PATCH 1/2] libspe2: Don't use pipe if no stop handler is registered (take 2)

Kazunori Asayama asayama at sm.sony.co.jp
Mon Jun 9 14:05:25 EST 2008


This patch modifies the SPE event implementation of the libpse2 so
that the libspe2 doesn't write stop_info on the pipe if no stop event
handler is registered. Instead, the stop info is stored in the
internal buffer in such a case.

As a result of this change, the libspe2 behaves as following:

  a. If no stop event handler is registered, spe_context_run stores
     the stop info in an internal buffer instead of the pipe, and
     spe_stop_info_read reads the result from the buffer and returns
     it. If no s stop info is stored in the buffer yet or the existing
     stop info has been already consumed when spe_stop_info_read is
     called, it returns EAGAIN error. Even if the stored stop info
     isn't consumed by spe_stop_info_read, the buffer is overwritten
     by the next spe_context_run, so the libspe2 can avoid stalling.

  b. If any stop event handlers are registered, spe_context_run write
     the stop info on the pipe. That is the same behavior as the
     current implementation.

Signed-off-by: Kazunori Asayama <asayama at sm.sony.co.jp>
---
 speevent/spe_event.c |  174 +++++++++++++++++++++++++++++++++++++++++++--------
 speevent/speevent.h  |    5 +
 2 files changed, 152 insertions(+), 27 deletions(-)

Index: b/speevent/spe_event.c
===================================================================
--- a/speevent/spe_event.c	2008-06-09 11:43:54.000000000 +0900
+++ b/speevent/spe_event.c	2008-06-09 12:25:03.000000000 +0900
@@ -22,6 +22,7 @@
 #include <stdlib.h>
 #include "speevent.h"
 
+#include <string.h>
 #include <errno.h>
 #include <unistd.h>
 #include <sys/epoll.h>
@@ -56,6 +57,75 @@ void _event_spe_context_unlock(spe_conte
   pthread_mutex_unlock(&__SPE_EVENT_CONTEXT_PRIV_GET(spe)->lock);
 }
 
+static void stop_event_lock(spe_context_event_priv_ptr_t evctx)
+{
+  pthread_mutex_lock(&evctx->stop_event_lock);
+}
+
+static void stop_event_unlock(spe_context_event_priv_ptr_t evctx)
+{
+  pthread_mutex_unlock(&evctx->stop_event_lock);
+}
+
+static int stop_event_pipe_close(spe_context_event_priv_ptr_t evctx)
+{
+  if (evctx->stop_event_pipe[0] != -1) {
+    close(evctx->stop_event_pipe[0]);
+    evctx->stop_event_pipe[0] = -1;
+  }
+  if (evctx->stop_event_pipe[1] != -1) {
+    close(evctx->stop_event_pipe[1]);
+    evctx->stop_event_pipe[1] = -1;
+  }
+
+  return 0;
+}
+
+static int stop_event_pipe_open(spe_context_event_priv_ptr_t evctx)
+{
+  int rc;
+
+  rc = pipe(evctx->stop_event_pipe);
+  if (rc == -1) {
+    return -1;
+  }
+  rc = fcntl(evctx->stop_event_pipe[0], F_GETFL);
+  if (rc != -1) {
+    rc = fcntl(evctx->stop_event_pipe[0], F_SETFL, rc | O_NONBLOCK);
+  }
+  if (rc == -1) {
+    stop_event_pipe_close(evctx);
+    return -1;
+  }
+
+  return 0;
+}
+
+static int stop_event_pipe_acquire(spe_context_event_priv_ptr_t evctx)
+{
+  if (evctx->stop_event_handler_count == 0) {
+    if (stop_event_pipe_open(evctx) == -1) {
+      return -1;
+    }
+    evctx->stop_event_buffer_count = 0; /* invalidate the buffer */
+  }
+
+  evctx->stop_event_handler_count++;
+
+  return 0;
+}
+
+static int stop_event_pipe_release(spe_context_event_priv_ptr_t evctx)
+{
+  evctx->stop_event_handler_count--;
+
+  if (evctx->stop_event_handler_count == 0) {
+    stop_event_pipe_close(evctx);
+  }
+
+  return 0;
+}
+
 int _event_spe_stop_info_read (spe_context_ptr_t spe, spe_stop_info_t *stopinfo)
 {
   spe_context_event_priv_ptr_t evctx;
@@ -64,13 +134,32 @@ int _event_spe_stop_info_read (spe_conte
   size_t total;
   
   evctx = __SPE_EVENT_CONTEXT_PRIV_GET(spe);
+
+  stop_event_lock(evctx); /* for atomic read */
+
   fd = evctx->stop_event_pipe[0];
 
-  pthread_mutex_lock(&evctx->stop_event_read_lock); /* for atomic read */
+  if (fd == -1) { /* no stop event handler */
+    if (evctx->stop_event_buffer_count) {
+      /* return the last stop info for backward compatibility */
+      memcpy(stopinfo, &evctx->stop_event_buffer, sizeof(*stopinfo));
+      evctx->stop_event_buffer_count--;
+      rc = 0;
+    }
+    else {
+      /* there is no valid stop info in the buffer */
+      errno = EAGAIN;
+      rc = -1;
+    }
+    stop_event_unlock(evctx);
+    return rc;
+  }
+
+  /* any stop event handler. */
 
   rc = read(fd, stopinfo, sizeof(*stopinfo));
   if (rc == -1) {
-    pthread_mutex_unlock(&evctx->stop_event_read_lock);
+    stop_event_unlock(evctx);
     return -1;
   }
 
@@ -98,7 +187,7 @@ int _event_spe_stop_info_read (spe_conte
     }
   }
 
-  pthread_mutex_unlock(&evctx->stop_event_read_lock);
+  stop_event_unlock(evctx);
 
   return rc == -1 ? -1 : 0;
 }
@@ -248,6 +337,15 @@ int _event_spe_event_handler_register(sp
   }
 
   if (event->events & SPE_EVENT_SPE_STOPPED) {
+    /* prevent reading stop info while registering */
+    stop_event_lock(evctx);
+
+    if (stop_event_pipe_acquire(evctx) == -1) {
+      stop_event_unlock(evctx);
+      _event_spe_context_unlock(event->spe);
+      return -1;
+    }
+
     fd = evctx->stop_event_pipe[0];
     
     ev_buf = &evctx->events[__SPE_EVENT_SPE_STOPPED];
@@ -257,9 +355,13 @@ int _event_spe_event_handler_register(sp
     ep_event.events = EPOLLIN;
     ep_event.data.ptr = ev_buf;
     if (epoll_ctl(epfd, ep_op, fd, &ep_event) == -1) {
+      stop_event_pipe_release(evctx);
+      stop_event_unlock(evctx);
       _event_spe_context_unlock(event->spe);
       return -1;
     }
+
+    stop_event_unlock(evctx);
   }
 
   _event_spe_context_unlock(event->spe);
@@ -340,12 +442,20 @@ int _event_spe_event_handler_deregister(
   }
   
   if (event->events & SPE_EVENT_SPE_STOPPED) {
+    /* prevent reading stop info while unregistering */
+    stop_event_lock(evctx);
+
     fd = evctx->stop_event_pipe[0];
     if (epoll_ctl(epfd, ep_op, fd, NULL) == -1) {
+      stop_event_unlock(evctx);
       _event_spe_context_unlock(event->spe);
       return -1;
     }
     evctx->events[__SPE_EVENT_SPE_STOPPED].events = 0;
+
+    stop_event_pipe_release(evctx);
+
+    stop_event_unlock(evctx);
   }
 
   _event_spe_context_unlock(event->spe);
@@ -424,12 +534,11 @@ int _event_spe_context_finalize(spe_cont
 
   evctx = __SPE_EVENT_CONTEXT_PRIV_GET(spe);
   __SPE_EVENT_CONTEXT_PRIV_SET(spe, NULL);
-  
-  close(evctx->stop_event_pipe[0]);
-  close(evctx->stop_event_pipe[1]);
+
+  stop_event_pipe_close(evctx);
 
   pthread_mutex_destroy(&evctx->lock);
-  pthread_mutex_destroy(&evctx->stop_event_read_lock);
+  pthread_mutex_destroy(&evctx->stop_event_lock);
 
   free(evctx);
 
@@ -439,7 +548,6 @@ int _event_spe_context_finalize(spe_cont
 struct spe_context_event_priv * _event_spe_context_initialize(spe_context_ptr_t spe)
 {
   spe_context_event_priv_ptr_t evctx;
-  int rc;
   int i;
 
   evctx = calloc(1, sizeof(*evctx));
@@ -447,29 +555,16 @@ struct spe_context_event_priv * _event_s
     return NULL;
   }
 
-  rc = pipe(evctx->stop_event_pipe);
-  if (rc == -1) {
-    free(evctx);
-    return NULL;
-  }
-  rc = fcntl(evctx->stop_event_pipe[0], F_GETFL);
-  if (rc != -1) {
-    rc = fcntl(evctx->stop_event_pipe[0], F_SETFL, rc | O_NONBLOCK);
-  }
-  if (rc == -1) {
-    close(evctx->stop_event_pipe[0]);
-    close(evctx->stop_event_pipe[1]);
-    free(evctx);
-    errno = EIO;
-    return NULL;
-  }
+  /* the pipe will be created when any stop event handler is registered */
+  evctx->stop_event_pipe[0] = -1;
+  evctx->stop_event_pipe[1] = -1;
 
   for (i = 0; i < sizeof(evctx->events) / sizeof(evctx->events[0]); i++) {
     evctx->events[i].spe = spe;
   }
 
   pthread_mutex_init(&evctx->lock, NULL);
-  pthread_mutex_init(&evctx->stop_event_read_lock, NULL);
+  pthread_mutex_init(&evctx->stop_event_lock, NULL);
 
   return evctx;
 }
@@ -479,17 +574,44 @@ int _event_spe_context_run	(spe_context_
   spe_context_event_priv_ptr_t evctx;
   spe_stop_info_t stopinfo_buf;
   int rc;
+  int errno_saved;
+  int fd;
 
   if (!stopinfo) {
     stopinfo = &stopinfo_buf;
   }
   rc = _base_spe_context_run(spe, entry, runflags, argp, envp, stopinfo);
+  errno_saved = errno;
 
   evctx = __SPE_EVENT_CONTEXT_PRIV_GET(spe);
-  if (write(evctx->stop_event_pipe[1], stopinfo, sizeof(*stopinfo)) != sizeof(*stopinfo)) {
+
+  stop_event_lock(evctx);
+
+  fd = evctx->stop_event_pipe[1];
+  /* don't write stop info to the pipe if no stop event handler is registered */
+  if (fd == -1) {
+    /* store the last stop info in the internal buffer for backward
+     * compatibility */
+    memcpy(&evctx->stop_event_buffer, stopinfo, sizeof(*stopinfo));
+    evctx->stop_event_buffer_count = 1; /* overwrite the buffer */
+    stop_event_unlock(evctx);
+    return rc;
+  }
+
+  stop_event_pipe_acquire(evctx); /* to avoid closing the pipe after unlocked */
+  stop_event_unlock(evctx); /* unlock here to avoid deadlocks */
+
+  if (write(fd, stopinfo, sizeof(*stopinfo)) != sizeof(*stopinfo)) {
     /* error check. */
   }
 
+  /* release the pipe */
+  stop_event_lock(evctx);
+  stop_event_pipe_release(evctx);
+  stop_event_unlock(evctx);
+
+  errno = errno_saved;
+
   return rc;
 }
 
Index: b/speevent/speevent.h
===================================================================
--- a/speevent/speevent.h	2008-06-09 11:43:54.000000000 +0900
+++ b/speevent/speevent.h	2008-06-09 11:44:00.000000000 +0900
@@ -35,8 +35,11 @@ enum __spe_event_types {
 typedef struct spe_context_event_priv
 {
   pthread_mutex_t lock;
-  pthread_mutex_t stop_event_read_lock;
+  pthread_mutex_t stop_event_lock;
   int stop_event_pipe[2];
+  int stop_event_handler_count;
+  int stop_event_buffer_count;
+  spe_stop_info_t stop_event_buffer;
   spe_event_unit_t events[__NUM_SPE_EVENT_TYPES];
 } spe_context_event_priv_t, *spe_context_event_priv_ptr_t;
 



More information about the cbe-oss-dev mailing list