[PATCH phosphor-rest-server] The streaming support for obmc-rest.

Cyril Bur cyrilbur at gmail.com
Fri Jan 29 11:23:37 AEDT 2016


On Thu, 28 Jan 2016 02:00:37 -0600
OpenBMC Patches <openbmc-patches at stwcx.xyz> wrote:

> From: shgoupf <shgoupf at cn.ibm.com>
> 

Hi Peng,

So I'm don't really know python all that well but I do believe this language is
white space sensitive... I'll let a pythoner respond about the rest...

> Changes:
> 1) The main idea of this change is to have a streaming path as below:
>     dbus signal -> obmc-rest capture the dbus signal -> obmc-rest notify the client of the signal receiving.
> 2) Replace rocket with gevent WSGI server to support multiple async accesses.
> 3) Use gevent queue to notify the dbus signal receiving.
> 4) The uri to the streaming should be in the form as below:
>     https://<ip>/<path>/stream/<signal_name>
> ---
>  obmc-rest | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 104 insertions(+), 11 deletions(-)
>  mode change 100644 => 100755 obmc-rest
> 
> diff --git a/obmc-rest b/obmc-rest
> old mode 100644
> new mode 100755
> index c6d2949..481dafa
> --- a/obmc-rest
> +++ b/obmc-rest
> @@ -3,7 +3,9 @@
>  import os
>  import sys
>  import dbus
> +import gobject
>  import dbus.exceptions
> +import dbus.mainloop.glib
>  import json
>  import logging
>  from xml.etree import ElementTree
> @@ -14,6 +16,10 @@ from OpenBMCMapper import Mapper, PathTree, IntrospectionNodeParser, ListMatch
>  import spwd
>  import grp
>  import crypt
> +import threading
> +import gevent
> +from gevent.pywsgi import WSGIServer
> +from gevent.queue import Queue
>  
>  DBUS_UNKNOWN_INTERFACE = 'org.freedesktop.UnknownInterface'
>  DBUS_UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod'
> @@ -59,12 +65,13 @@ def makelist(data):
>  
>  class RouteHandler(object):
>  	_require_auth = makelist(valid_user)
> -	def __init__(self, app, bus, verbs, rules):
> +	def __init__(self, app, bus, verbs, rules, skips = []):
>  		self.app = app
>  		self.bus = bus
>  		self.mapper = Mapper(bus)
>  		self._verbs = makelist(verbs)
>  		self._rules = rules
> +                self._skips = skips
>  
>  	def _setup(self, **kw):
>  		request.route_data = {}
> @@ -79,7 +86,7 @@ class RouteHandler(object):
>  		return getattr(self, 'do_' + request.method.lower())(**kw)
>  
>  	def install(self):
> -		self.app.route(self._rules, callback = self,
> +		self.app.route(self._rules, callback = self, skip = self._skips,
>  				method = ['GET', 'PUT', 'PATCH', 'POST', 'DELETE'])
>  
>  	@staticmethod
> @@ -108,6 +115,58 @@ class RouteHandler(object):
>  				return None
>  			raise
>  
> +class SignalHandler(RouteHandler):
> +	verbs = ['GET']
> +	rules = '<path:path>/stream/<signal>'
> +
> +	def __init__(self, app, bus):
> +		super(SignalHandler, self).__init__(
> +				app, bus, self.verbs, self.rules)
> +
> +	def find(self, path, signal):
> +		busses = self.try_mapper_call(self.mapper.get_object,
> +				path = path)
> +		for items in busses.iteritems():
> +			s = self.find_signal_on_bus(path, signal, *items)
> +			if s:
> +				return s
> +
> +		abort(404, _4034_msg %('signal', 'found', signal))
> +
> +	def setup(self, path, signal):
> +		request.route_data['map'] = self.find(path, signal)
> +
> +	def do_get(self, path, signal):
> +                body = Queue()
> +                dsignal = DbusSignal(bus, request.route_data['map'][0],
> +                                     request.route_data['map'][1], path)
> +                dsignal.onData(body.put)
> +                dsignal.onFinish(lambda: body.put(StopIteration))
> +                dsignal.signalSnooping()
> +                return body
> +
> +	@staticmethod
> +	def find_signal(signal, signals):
> +		if signals is None:
> +			return None
> +
> +		signal = find_case_insensitive(signal, signals.keys())
> +		if signal is not None:
> +                        return signal
> +
> +	def find_signal_on_bus(self, path, signal, bus, interfaces):
> +		obj = self.bus.get_object(bus, path, introspect = False)
> +		iface = dbus.Interface(obj, dbus.INTROSPECTABLE_IFACE)
> +		data = iface.Introspect()
> +		parser = IntrospectionNodeParser(
> +				ElementTree.fromstring(data),
> +				intf_match = ListMatch(interfaces))
> +		for x,y in parser.get_interfaces().iteritems():
> +			s = self.find_signal(signal,
> +                                             y.get('signal'))
> +			if s:
> +				return (x,s)
> +
>  class DirectoryHandler(RouteHandler):
>  	verbs = 'GET'
>  	rules = '<path:path>/'
> @@ -715,7 +774,8 @@ class RestApp(Bottle):
>  		self.install(JSONPlugin(**json_kw))
>  		self.install(JsonApiErrorsPlugin(**json_kw))
>  		self.install(AuthorizationPlugin())
> -		self.install(JsonApiResponsePlugin())
> +                self.json_response_plugin = JsonApiResponsePlugin()

Indenting?

> +		self.install(self.json_response_plugin)
>  		self.install(JsonApiRequestPlugin())
>  		self.install(JsonApiRequestTypePlugin())
>  
> @@ -726,6 +786,7 @@ class RestApp(Bottle):
>  
>  	def create_handlers(self):
>  		# create route handlers
> +		self.signal_handler = SignalHandler(self, self.bus)
>  		self.session_handler = SessionHandler(self, self.bus)
>  		self.directory_handler = DirectoryHandler(self, self.bus)
>  		self.list_names_handler = ListNamesHandler(self, self.bus)
> @@ -736,6 +797,11 @@ class RestApp(Bottle):
>  		self.instance_handler = InstanceHandler(self, self.bus)
>  
>  	def install_handlers(self):
> +                # Skip json response for signal handler because it requires to
> +                # return a gevent iterable which cannot be handled by json
> +                # response plugin
> +                self.signal_handler._skips = [self.json_response_plugin]

Indenting?

> +		self.signal_handler.install()
>  		self.session_handler.install()
>  		self.directory_handler.install()
>  		self.list_names_handler.install()
> @@ -766,21 +832,48 @@ class RestApp(Bottle):
>  		parts = filter(bool, path.split('/'))
>  		request.environ['PATH_INFO'] = '/' + '/'.join(parts) + trailing
>  
> +class DbusSignal():
> +    def __init__(self, bus, dbus_interface, signal_name, path):
> +        # Register the dbus recieve handler
> +        bus.add_signal_receiver(self.signalReciever,
> +                                dbus_interface = dbus_interface,
> +                                signal_name = signal_name,
> +                                path = path)
> +
> +        self.snooping = True
> +
> +    def signalReciever(self, msg):
> +        self.send("Recieved message: %s" % msg)
> +        self.snooping = False
> +
> +    def onData(self, send):
> +        self.send = send
> +
> +    def onFinish(self, f):
> +        self.finish = f
> +
> +    def signalSnooping(self):
> +        while self.snooping:
> +            mainloop = gobject.MainLoop()
> +            gevent.sleep(1)
> +            gobject.timeout_add(1, mainloop.quit)
> +            mainloop.run()
> +
> +        self.finish()
> +
>  if __name__ == '__main__':
>  	log = logging.getLogger('Rocket.Errors')
>  	log.setLevel(logging.INFO)
>  	log.addHandler(logging.StreamHandler(sys.stdout))
>  
> +        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

Indenting?

>  	bus = dbus.SystemBus()
>  	app = RestApp(bus)
> +

?

>  	default_cert = os.path.join(sys.prefix, 'share',
>  			os.path.basename(__file__), 'cert.pem')
>  
> -	server = Rocket(('0.0.0.0',
> -			443,
> -			default_cert,
> -			default_cert),
> -		'wsgi', {'wsgi_app': app},
> -		min_threads = 1,
> -		max_threads = 1)
> -	server.start()
> +        server = WSGIServer(("0.0.0.0", 443), app, keyfile = default_cert,
> +                            certfile = default_cert)
> +
> +        server.serve_forever()

Indenting?





More information about the openbmc mailing list