Source code for microdrop.plugin_manager

"""
Copyright 2011 Ryan Fobel

This file is part of dmf_control_board.

MicroDrop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

MicroDrop is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with MicroDrop.  If not, see <http://www.gnu.org/licenses/>.
"""
from StringIO import StringIO
from collections import namedtuple
from contextlib import closing
import logging
import os
import platform
import re
import subprocess
import sys
import traceback

from path_helpers import path
from pyutilib.component.core import ExtensionPoint, PluginGlobals
# TODO Update plugins to import from `pyutilib.component.core` directly
# instead of importing from here.
from pyutilib.component.core import Plugin, SingletonPlugin, implements
from run_exe import run_exe
import task_scheduler

from .interfaces import IPlugin, IWaveformGenerator, ILoggingPlugin


ScheduleRequest = namedtuple('ScheduleRequest', 'before after')


[docs]def load_plugins(plugins_dir='plugins'): plugins_dir = path(plugins_dir) logging.info('Loading plugins:') if plugins_dir.parent.abspath() not in sys.path: sys.path.insert(0, plugins_dir.parent.abspath()) for package in plugins_dir.dirs(): try: logging.info('\t %s' % package.abspath()) import_statement = 'import %s.%s' % \ (plugins_dir.name, package.name) logging.debug(import_statement) exec(import_statement) except Exception: logging.info(''.join(traceback.format_exc())) logging.error('Error loading %s plugin.', package.name, exc_info=True) # Create an instance of each of the plugins, but set it to disabled e = PluginGlobals.env('microdrop.managed') for class_ in e.plugin_registry.values(): service = class_() service.disable()
[docs]def post_install(install_path): # __NB__ The `cwd` directory ["is not considered when searching the # executable, so you can't specify the program's path relative to # `cwd`."][cwd]. Therefore, we manually change to the directory # containing the hook script and change back to the original working # directory when we're done. # # [cwd]: https://docs.python.org/2/library/subprocess.html#popen-constructor cwd = os.getcwd() if platform.system() in ('Linux', 'Darwin'): system_name = platform.system() hooks_path = install_path.joinpath('hooks', system_name).abspath() on_install_path = hooks_path.joinpath('on_plugin_install.sh') if on_install_path.isfile(): # There is an `on_plugin_install` script to run. try: os.chdir(hooks_path) subprocess.check_call(['sh', on_install_path.name, sys.executable], cwd=hooks_path) finally: os.chdir(cwd) elif platform.system() == 'Windows': hooks_path = install_path.joinpath('hooks', 'Windows').abspath() for ext in ('exe', 'bat'): on_install_path = hooks_path.joinpath('on_plugin_install.%s' % ext) if on_install_path.isfile(): # There is an `on_plugin_install` script to run. # Request elevated privileges if an error occurs. run_exe(on_install_path.name, '"%s"' % sys.executable, try_admin=True, working_dir=hooks_path) break
[docs]def log_summary(): observers = ExtensionPoint(IPlugin) logging.info('Registered plugins:') for observer in observers: logging.info('\t %s' % observer) observers = ExtensionPoint(IWaveformGenerator) logging.info('Registered function generator plugins:') for observer in observers: logging.info('\t %s' % observer) observers = ExtensionPoint(ILoggingPlugin) logging.info('Registered logging plugins:') for observer in observers: logging.info('\t %s' % observer)
[docs]def get_plugin_names(env=None): if env is None: env = 'pca' e = PluginGlobals.env(env) return list(e.plugin_registry.keys())
[docs]def get_service_class(name, env='microdrop.managed'): e = PluginGlobals.env(env) if name not in e.plugin_registry: raise KeyError, 'No plugin registered with name: %s' % name return e.plugin_registry[name]
[docs]def get_service_instance_by_name(name, env='microdrop.managed'): e = PluginGlobals.env(env) plugins = [p for i, p in enumerate(e.services) if name == p.name] if plugins: return plugins[0] else: raise KeyError, 'No plugin registered with name: %s' % name
[docs]def get_service_instance_by_package_name(name, env='microdrop.managed'): e = PluginGlobals.env(env) plugins = [p for i, p in enumerate(e.services) \ if name == get_plugin_package_name(p.__class__.__module__)] if plugins: return plugins[0] else: raise KeyError, 'No plugin registered with package name: %s' % name
[docs]def get_plugin_package_name(class_name): match = re.search(r'plugins\.(?P<name>.*)', class_name) if match is None: logging.error('Could not determine package name from: %s'\ % class_name) return None return match.group('name')
[docs]def get_service_instance(class_, env='microdrop.managed'): e = PluginGlobals.env(env) for service in e.services: if isinstance(service, class_): # A plugin of this type is registered return service return None
[docs]def get_service_names(env='microdrop.managed'): e = PluginGlobals.env(env) service_names = [] for name in get_plugin_names(env): plugin_class = e.plugin_registry[name] service = get_service_instance(plugin_class, env=env) service_names.append(service.name) return service_names
[docs]def get_schedule(observers, function): # Query plugins for schedule requests for 'function' schedule_requests = {} for observer in observers.values(): if hasattr(observer, 'get_schedule_requests'): schedule_requests[observer.name] =\ observer.get_schedule_requests(function) if schedule_requests: scheduler = task_scheduler.TaskScheduler(observers.keys()) for request in [r for name, requests in schedule_requests.items() for r in requests]: try: scheduler.request_order(*request) except AssertionError: logging.info('[PluginManager] emit_signal(%s) could not '\ 'add schedule request %s' % (function, request)) continue return scheduler.get_schedule() else: return observers.keys()
[docs]def get_observers(function, interface=IPlugin): observers = {} for obs in ExtensionPoint(interface): if hasattr(obs, function): observers[obs.name] = obs return observers
[docs]def emit_signal(function, args=None, interface=IPlugin): try: observers = get_observers(function, interface) schedule = get_schedule(observers, function) return_codes = {} for observer_name in schedule: observer = observers[observer_name] logging.debug('emit_signal: %s.%s()' % (observer.name, function)) try: if args is None: args = [] elif type(args) is not list: args = [args] f = getattr(observer, function) return_codes[observer.name] = f(*args) except Exception, why: with closing(StringIO()) as message: if hasattr(observer, "name"): if interface == ILoggingPlugin: # If this is a logging plugin, do not try to log # since that will result in infinite recursion. # Instead, just continue onto the next plugin. continue print >> message, \ '%s plugin crashed processing %s signal.' % \ (observer.name, function) print >> message, 'Reason:', str(why) logging.error(message.getvalue().strip()) logging.info(''.join(traceback.format_exc())) return return_codes except Exception, why: logging.error(why, exc_info=True) return {}
[docs]def enable(name, env='microdrop.managed'): service = get_service_instance_by_name(name, env) if not service.enabled(): service.enable() logging.info('[PluginManager] Enabled plugin: %s' % name) if hasattr(service, "on_plugin_enable"): service.on_plugin_enable() emit_signal('on_plugin_enabled', [env, service])
[docs]def disable(name, env='microdrop.managed'): service = get_service_instance_by_name(name, env) if service and service.enabled(): service.disable() if hasattr(service, "on_plugin_disable"): service.on_plugin_disable() emit_signal('on_plugin_disabled', [env, service]) logging.info('[PluginManager] Disabled plugin: %s' % name)
PluginGlobals.pop_env()