<font size=2 face="sans-serif">Thanks Peng, seems the indentation Cyril
pointed out on the first version has been addressed. Perhaps Brad can give
a final approval next.</font><br><br><br><tt><font size=2>"openbmc" <openbmc-bounces+anoo=us.ibm.com@lists.ozlabs.org>
wrote on 06/05/2016 08:40:44 PM:<br><br>> From: OpenBMC Patches <openbmc-patches@stwcx.xyz></font></tt><br><tt><font size=2>> To: openbmc@lists.ozlabs.org</font></tt><br><tt><font size=2>> Cc: shgoupf <shgoupf@cn.ibm.com></font></tt><br><tt><font size=2>> Date: 06/05/2016 08:41 PM</font></tt><br><tt><font size=2>> Subject: [PATCH phosphor-rest-server v3] The
streaming support for obmc-rest.</font></tt><br><tt><font size=2>> Sent by: "openbmc" <openbmc-bounces+anoo=us.ibm.com@lists.ozlabs.org></font></tt><br><tt><font size=2>> <br>> From: shgoupf <shgoupf@cn.ibm.com><br>> <br>> Changes:<br>> 1) The main idea of this change is to have a streaming path as below:<br>>     dbus signal -> obmc-rest capture the dbus signal
-> obmc-rest <br>> notify the client of the signal receiving.<br>> 2) Replace rocket with gevent WSGI server to support multiple async
accesses.<br>> 3) Use gevent queue to notify the dbus signal receiving.<br>> 4) The uri to the streaming should be in the form as below:<br>>     https://<ip>/<path>/stream/<signal_name><br>> ---<br>>  obmc-rest | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++<br>> ++++-------<br>>  1 file changed, 106 insertions(+), 12 deletions(-)<br>>  mode change 100644 => 100755 obmc-rest<br>> <br>> diff --git a/obmc-rest b/obmc-rest<br>> old mode 100644<br>> new mode 100755<br>> index e7dbbba..304cb41<br>> --- a/obmc-rest<br>> +++ b/obmc-rest<br>> @@ -19,7 +19,9 @@<br>>  import os<br>>  import sys<br>>  import dbus<br>> +import gobject<br>>  import dbus.exceptions<br>> +import dbus.mainloop.glib<br>>  import json<br>>  import logging<br>>  from xml.etree import ElementTree<br>> @@ -32,6 +34,10 @@ import obmc.mapper<br>>  import spwd<br>>  import grp<br>>  import crypt<br>> +import threading<br>> +import gevent<br>> +from gevent.pywsgi import WSGIServer<br>> +from gevent.queue import Queue<br>>  <br>>  DBUS_UNKNOWN_INTERFACE = 'org.freedesktop.UnknownInterface'<br>>  DBUS_UNKNOWN_INTERFACE_ERROR = 'org.freedesktop.DBus.Error.UnknownInterface'<br>> @@ -72,13 +78,14 @@ class UserInGroup:<br>>  class RouteHandler(object):<br>>      _require_auth = obmc.utils.misc.makelist(valid_user)<br>>  <br>> -    def __init__(self, app, bus, verbs, rules):<br>> +    def __init__(self, app, bus, verbs, rules, skips=[]):<br>>          self.app = app<br>>          self.bus = bus<br>>          self.mapper = obmc.mapper.Mapper(bus)<br>>          self._verbs = obmc.utils.misc.makelist(verbs)<br>>          self._rules = rules<br>>          self.intf_match = obmc.utils.misc.org_dot_openbmc_match<br>> +        self._skips = skips<br>>  <br>>      def _setup(self, **kw):<br>>          request.route_data = {}<br>> @@ -94,7 +101,7 @@ class RouteHandler(object):<br>>  <br>>      def install(self):<br>>          self.app.route(<br>> -            self._rules, callback=self,<br>> +            self._rules, callback=self,
skip=self._skips,<br>>              method=['GET', 'PUT',
'PATCH', 'POST', 'DELETE'])<br>>  <br>>      @staticmethod<br>> @@ -126,6 +133,57 @@ class RouteHandler(object):<br>>                  return
None<br>>              raise<br>>  <br>> +class SignalHandler(RouteHandler):<br>> +    verbs = ['GET']<br>> +    rules = '<path:path>/stream/<signal>'<br>> +<br>> +    def __init__(self, app, bus):<br>> +        super(SignalHandler, self).__init__(<br>> +                app, bus,
self.verbs, self.rules)<br>> +<br>> +    def find(self, path, signal):<br>> +        busses = self.try_mapper_call(self.mapper.get_object,<br>> +                path = path)<br>> +        for items in busses.iteritems():<br>> +            s = self.find_signal_on_bus(path,
signal, *items)<br>> +            if s:<br>> +                return s<br>> +<br>> +        abort(404, _4034_msg %('signal', 'found',
signal))<br>> +<br>> +    def setup(self, path, signal):<br>> +        request.route_data['map'] = self.find(path,
signal)<br>> +<br>> +    def do_get(self, path, signal):<br>> +                body = Queue()<br>> +                dsignal =
DbusSignal(bus, request.route_data['map'][0],<br>> +                    
                request.route_data['map'][1],
path)<br>> +                dsignal.onData(body.put)<br>> +                dsignal.onFinish(lambda:
body.put(StopIteration))<br>> +                dsignal.signalSnooping()<br>> +                return body<br>> +<br>> +    @staticmethod<br>> +    def find_signal(signal, signals):<br>> +        if signals is None:<br>> +            return None<br>> +<br>> +        signal = find_case_insensitive(signal,
signals.keys())<br>> +        if signal is not None:<br>> +                    
   return signal<br>> +<br>> +    def find_signal_on_bus(self, path, signal, bus, interfaces):<br>> +        obj = self.bus.get_object(bus, path,
introspect = False)<br>> +        iface = dbus.Interface(obj, dbus.INTROSPECTABLE_IFACE)<br>> +        data = iface.Introspect()<br>> +        parser = IntrospectionNodeParser(<br>> +                ElementTree.fromstring(data),<br>> +                intf_match
= ListMatch(interfaces))<br>> +        for x,y in parser.get_interfaces().iteritems():<br>> +            s = self.find_signal(signal,<br>> +                    
                     
  y.get('signal'))<br>> +            if s:<br>> +                return (x,s)<br>>  <br>>  class DirectoryHandler(RouteHandler):<br>>      verbs = 'GET'<br>> @@ -702,7 +760,8 @@ class RestApp(Bottle):<br>>          self.install(AuthorizationPlugin())<br>>          self.install(JsonpPlugin(**json_kw))<br>>          self.install(JSONPlugin(**json_kw))<br>> -        self.install(JsonApiResponsePlugin())<br>> +        self.json_response_plugin = JsonApiResponsePlugin()<br>> +        self.install(self.json_response_plugin)<br>>          self.install(JsonApiRequestPlugin())<br>>          self.install(JsonApiRequestTypePlugin())<br>>  <br>> @@ -713,6 +772,7 @@ class RestApp(Bottle):<br>>  <br>>      def create_handlers(self):<br>>          # create route handlers<br>> +        self.signal_handler = SignalHandler(self,
self.bus)<br>>          self.session_handler = SessionHandler(self,
self.bus)<br>>          self.directory_handler = DirectoryHandler(self,
self.bus)<br>>          self.list_names_handler = ListNamesHandler(self,
self.bus)<br>> @@ -723,6 +783,11 @@ class RestApp(Bottle):<br>>          self.instance_handler = InstanceHandler(self,
self.bus)<br>>  <br>>      def install_handlers(self):<br>> +        # Skip json response for signal handler
because it requires to<br>> +        # return a gevent iterable which cannot
be handled by json<br>> +        # response plugin<br>> +        self.signal_handler._skips = [self.json_response_plugin]<br>> +        self.signal_handler.install()<br>>          self.session_handler.install()<br>>          self.directory_handler.install()<br>>          self.list_names_handler.install()<br>> @@ -754,19 +819,48 @@ class RestApp(Bottle):<br>>          parts = filter(bool, path.split('/'))<br>>          request.environ['PATH_INFO'] = '/'
+ '/'.join(parts) + trailing<br>>  <br>> +class DbusSignal():<br>> +    def __init__(self, bus, dbus_interface, signal_name,
path):<br>> +        # Register the dbus recieve handler<br>> +        bus.add_signal_receiver(self.signalReciever,<br>> +                    
           dbus_interface = dbus_interface,<br>> +                    
           signal_name = signal_name,<br>> +                    
           path = path)<br>> +<br>> +        self.snooping = True<br>> +<br>> +    def signalReciever(self, msg):<br>> +        self.send("Recieved message: %s"
% msg)<br>> +        self.snooping = False<br>> +<br>> +    def onData(self, send):<br>> +        self.send = send<br>> +<br>> +    def onFinish(self, f):<br>> +        self.finish = f<br>> +<br>> +    def signalSnooping(self):<br>> +        while self.snooping:<br>> +            mainloop = gobject.MainLoop()<br>> +            gevent.sleep(1)<br>> +            gobject.timeout_add(1,
mainloop.quit)<br>> +            mainloop.run()<br>> +<br>> +        self.finish()<br>> +<br>>  if __name__ == '__main__':<br>>      log = logging.getLogger('Rocket.Errors')<br>>      log.setLevel(logging.INFO)<br>>      log.addHandler(logging.StreamHandler(sys.stdout))<br>>  <br>> +    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)<br>>      bus = dbus.SystemBus()<br>>      app = RestApp(bus)<br>> -    default_cert = os.path.join(<br>> -        sys.prefix, 'share', os.path.basename(__file__),
'cert.pem')<br>> -<br>> -    server = Rocket(<br>> -        ('0.0.0.0', 443, default_cert, default_cert),<br>> -        'wsgi', {'wsgi_app': app},<br>> -        min_threads=1,<br>> -        max_threads=1)<br>> -    server.start()<br>> +<br>> +    default_cert = os.path.join(sys.prefix, 'share',<br>> +            os.path.basename(__file__),
'cert.pem')<br>> +<br>> +    server = WSGIServer(("0.0.0.0", 443), app,
keyfile = default_cert,<br>> +                    
   certfile = default_cert)<br>> +<br>> +    server.serve_forever()<br>> -- <br>> 2.8.3<br>> <br>> <br>> _______________________________________________<br>> openbmc mailing list<br>> openbmc@lists.ozlabs.org<br>> </font></tt><a href=https://lists.ozlabs.org/listinfo/openbmc><tt><font size=2>https://lists.ozlabs.org/listinfo/openbmc</font></tt></a><tt><font size=2><br></font></tt><BR>