Source code for microdrop.plugin_manager

from StringIO import StringIO
from collections import namedtuple
from contextlib import closing
import logging
import sys
import traceback

from pyutilib.component.core import ExtensionPoint, PluginGlobals
# TODO Update plugins to import from `pyutilib.component.core` directly
# instead of importing from here.
from pyutilib.component.core import Plugin, SingletonPlugin, implements
import path_helpers as ph
import task_scheduler

from .interfaces import IPlugin, IWaveformGenerator, ILoggingPlugin

logger = logging.getLogger(__name__)


ScheduleRequest = namedtuple('ScheduleRequest', 'before after')


[docs]def load_plugins(plugins_dir='plugins', import_from_parent=True): ''' Import each Python plugin module in the specified directory and create an instance of each contained plugin class for which an instance has not yet been created. Parameters ---------- plugins_dir : str Directory containing zero or more Python plugin modules to import. import_from_parent : bool Add parent of specified directory to system path and import ``<parent>.<module>``. ..notes:: **Not recommended**, but kept as default to maintain legacy protocol compatibility. Returns ------- list Newly created plugins (plugins are not recreated if they were previously loaded.) ''' logger.info('[load_plugins] plugins_dir=`%s`', plugins_dir) plugins_dir = ph.path(plugins_dir).realpath() logging.info('Loading plugins:') plugins_root = plugins_dir.parent if import_from_parent else plugins_dir if plugins_root not in sys.path: sys.path.insert(0, plugins_root) # Create an instance of each of the plugins, but set it to disabled e = PluginGlobals.env('microdrop.managed') initial_plugins = set(e.plugin_registry.values()) imported_plugins = set() for package_i in plugins_dir.dirs(): if package_i.isjunction() and not package_i.readlink().isdir(): # Plugin directory is a junction/link to a non-existent target # path. logging.info('Skip import of `%s` (broken link to `%s`).', package_i.name, package_i.readlink()) continue try: plugin_module = package_i.name if import_from_parent: plugin_module = '.'.join([plugins_dir.name, plugin_module]) import_statement = 'import {}'.format(plugin_module) logging.debug(import_statement) exec(import_statement) all_plugins = set(e.plugin_registry.values()) current_plugin = list(all_plugins - initial_plugins - imported_plugins)[0] logging.info('\t Imported: %s (%s)', current_plugin.__name__, package_i) imported_plugins.add(current_plugin) except Exception: logging.info(''.join(traceback.format_exc())) logging.error('Error loading %s plugin.', package_i.name, exc_info=True) # For each newly imported plugin class, create a service instance # initialized to the disabled state. new_plugins = [] for class_ in imported_plugins: service = class_() service.disable() new_plugins.append(service) logging.debug('\t Created new plugin services: %s', ','.join([p.__class__.__name__ for p in new_plugins])) return new_plugins
[docs]def log_summary(): ''' Dump summary of plugins to log. ''' observers = ExtensionPoint(IPlugin) logging.info('Registered plugins:') for observer in observers: logging.info('\t %s' % observer) observers = ExtensionPoint(IWaveformGenerator) logging.info('Registered function generator plugins:') for observer in observers: logging.info('\t %s' % observer) observers = ExtensionPoint(ILoggingPlugin) logging.info('Registered logging plugins:') for observer in observers: logging.info('\t %s' % observer)
[docs]def get_plugin_names(env=None): ''' Parameters ---------- env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). Returns ------- list(str) List of plugin names (e.g., ``['StepLabelPlugin', ...]``). ''' if env is None: env = 'pca' e = PluginGlobals.env(env) return list(e.plugin_registry.keys())
[docs]def get_service_class(name, env='microdrop.managed'): ''' Parameters ---------- name : str Plugin class name (e.g., ``App``). env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). Returns ------- class Class type matching specified plugin class name. ..notes:: Returns actual class type -- **not** an instance of the plugin service. ''' e = PluginGlobals.env(env) if name not in e.plugin_registry: raise KeyError('No plugin registered with name: %s' % name) return e.plugin_registry[name]
[docs]def get_service_instance_by_name(name, env='microdrop.managed'): ''' Parameters ---------- name : str Plugin name (e.g., ``microdrop.zmq_hub_plugin``). Corresponds to ``plugin_name`` key in plugin ``properties.yml`` file. env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). Returns ------- object Active service instance matching specified plugin name. Raises ------ KeyError If no plugin is found registered with the specified name. ''' e = PluginGlobals.env(env) plugins = [p for i, p in enumerate(e.services) if name == p.name] if plugins: return plugins[0] else: raise KeyError('No plugin registered with name: %s' % name)
[docs]def get_service_instance_by_package_name(name, env='microdrop.managed'): ''' Parameters ---------- name : str Plugin Python module name (e.g., ``dmf_control_board_plugin``). Corresponds to ``package_name`` key in plugin ``properties.yml`` file. env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). Returns ------- object Active service instance matching specified plugin module name. ''' e = PluginGlobals.env(env) plugins = [p for i, p in enumerate(e.services) if name == get_plugin_package_name(p.__class__.__module__)] if plugins: return plugins[0] else: raise KeyError('No plugin registered with package name: %s' % name)
[docs]def get_plugin_package_name(module_name): ''' Parameters ---------- module_name : str Fully-qualified class name (e.g., ``'plugins.dmf_control_board_plugin'``). Returns ------- str Relative module name (e.g., ``'dmf_control_board_plugin'``) ''' return module_name.split('.')[-1]
[docs]def get_service_instance(class_, env='microdrop.managed'): ''' Parameters ---------- class_ : class Plugin class type. env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). Returns ------- object or None Registered service instance for the specified plugin class type. Returns ``None`` if no service is registered for the specified plugin class type. ''' e = PluginGlobals.env(env) for service in e.services: if isinstance(service, class_): # A plugin of this type is registered return service return None
[docs]def get_service_names(env='microdrop.managed'): ''' Parameters ---------- env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). Returns ------- list List of plugin names (e.g., ``['microdrop.step_label_plugin', ...]``). ''' e = PluginGlobals.env(env) service_names = [] for name in get_plugin_names(env): plugin_class = e.plugin_registry[name] service = get_service_instance(plugin_class, env=env) if service is None: logger.warn('Plugin `%s` exists in registry, but instance cannot ' 'be found.', name) else: service_names.append(service.name) return service_names
[docs]def get_schedule(observers, function): ''' Generate observer order based on scheduling requests for specified function. Parameters ---------- observers : dict Mapping from service names to service instances. function : str Name of function to generate schedule for. Returns ------- list List of observer service names in scheduled order. ''' # Query plugins for schedule requests for 'function' schedule_requests = {} for observer in observers.values(): if hasattr(observer, 'get_schedule_requests'): schedule_requests[observer.name] =\ observer.get_schedule_requests(function) if schedule_requests: scheduler = task_scheduler.TaskScheduler(observers.keys()) for request in [r for name, requests in schedule_requests.items() for r in requests]: try: scheduler.request_order(*request) except AssertionError: logging.info('[PluginManager] emit_signal(%s) could not add ' 'schedule request %s', function, request) continue return scheduler.get_schedule() else: return observers.keys()
[docs]def get_observers(function, interface=IPlugin): ''' Get dictionary of observers implementing the specified function. Parameters ---------- function : str Name of function to generate schedule for. interface : class, optional Plugin interface class. Returns ------- dict Mapping from service names to service instances. ''' observers = {} for obs in ExtensionPoint(interface): if hasattr(obs, function): observers[obs.name] = obs return observers
[docs]def emit_signal(function, args=None, interface=IPlugin): ''' Call specified function on each enabled plugin implementing the function and collect results. Parameters ---------- function : str Name of function to generate schedule for. interface : class, optional Plugin interface class. Returns ------- dict Mapping from each service name to the respective function return value. ''' try: observers = get_observers(function, interface) schedule = get_schedule(observers, function) return_codes = {} for observer_name in schedule: observer = observers[observer_name] logging.debug('emit_signal: %s.%s()' % (observer.name, function)) try: if args is None: args = [] elif type(args) is not list: args = [args] f = getattr(observer, function) return_codes[observer.name] = f(*args) except Exception, why: with closing(StringIO()) as message: if hasattr(observer, "name"): if interface == ILoggingPlugin: # If this is a logging plugin, do not try to log # since that will result in infinite recursion. # Instead, just continue onto the next plugin. continue print >> message, \ '%s plugin crashed processing %s signal.' % \ (observer.name, function) print >> message, 'Reason:', str(why) logging.error(message.getvalue().strip()) logging.info(''.join(traceback.format_exc())) return return_codes except Exception, why: logging.error(why, exc_info=True) return {}
[docs]def enable(name, env='microdrop.managed'): ''' Enable specified plugin. Parameters ---------- name : str Plugin name (e.g., ``microdrop.zmq_hub_plugin``). Corresponds to ``plugin_name`` key in plugin ``properties.yml`` file. env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). ''' service = get_service_instance_by_name(name, env) if not service.enabled(): service.enable() logging.info('[PluginManager] Enabled plugin: %s' % name) if hasattr(service, "on_plugin_enable"): service.on_plugin_enable() emit_signal('on_plugin_enabled', [env, service])
[docs]def disable(name, env='microdrop.managed'): ''' Disable specified plugin. Parameters ---------- name : str Plugin name (e.g., ``microdrop.zmq_hub_plugin``). Corresponds to ``plugin_name`` key in plugin ``properties.yml`` file. env : str, optional Name of ``pyutilib.component.core`` plugin environment (e.g., ``'microdrop.managed``'). ''' service = get_service_instance_by_name(name, env) if service and service.enabled(): service.disable() if hasattr(service, "on_plugin_disable"): service.on_plugin_disable() emit_signal('on_plugin_disabled', [env, service]) logging.info('[PluginManager] Disabled plugin: %s' % name)
PluginGlobals.pop_env()