Source code for microdrop.protocol

"""
Copyright 2011 Ryan Fobel and Christian Fobel

This file is part of MicroDrop.

MicroDrop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

MicroDrop is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with MicroDrop.  If not, see <http://www.gnu.org/licenses/>.
"""

from collections import OrderedDict
from copy import deepcopy
try:
    import cPickle as pickle
except ImportError:
    import pickle
import cStringIO as StringIO
import itertools as it
import json
import logging
import re
import sys
import time

from microdrop_utility import Version, FutureVersionError
import numpy as np
import pandas as pd
import yaml
import zmq_plugin as zp
import zmq_plugin.schema

from .plugin_manager import emit_signal, get_service_names


logger = logging.getLogger(__name__)


[docs]def protocol_to_frame(protocol_i): ''' Parameters ---------- protocol_i : microdrop.protocol.Protocol MicroDrop protocol. .. note:: A MicroDrop protocol object is stored as pickled in the ``protocol`` file in each experiment log directory. Returns ------- pandas.DataFrame Data frame with rows indexed by 0-based step number and columns indexed (multi-index) first by plugin name, then by step field name. .. note:: Values may be Python objects. In future versions of MicroDrop, values *may* be restricted to json compatible types. ''' plugin_names_i = sorted(reduce(lambda a, b: a.union(b.plugin_data.keys()), protocol_i.steps, set())) frames_i = OrderedDict() for plugin_name_ij in plugin_names_i: try: frame_ij = pd.DataFrame(map(pickle.loads, [s.plugin_data.get(plugin_name_ij) for s in protocol_i.steps])) except Exception, exception: print >> sys.stderr, exception else: frames_i[plugin_name_ij] = frame_ij df_protocol = pd.concat(frames_i.values(), axis=1, keys=frames_i.keys()) df_protocol.index.name = 'step_i' df_protocol.columns.names = ['plugin_name', 'step_field'] return df_protocol
[docs]def protocol_to_json(protocol): ''' Parameters ---------- protocol : microdrop.protocol.Protocol MicroDrop protocol. .. note:: A MicroDrop protocol object is stored as pickled in the ``protocol`` file in each experiment log directory. Returns ------- str json-encoded dictionary, with two top-level keys: - ``keys``: * Each key is a list containing a plugin name and a corresponding step field name. - ``values``: * Maps to list of records (i.e., lists), one per protocol step. Each record in the ``values`` list may be *zipped together* with ``keys`` to yield a plugin field name to value mapping for a single protocol step. ''' df_protocol = protocol.to_frame() return json.dumps({'values': df_protocol.values.tolist(), 'keys': df_protocol.columns.values.tolist()}, cls=zp.schema.PandasJsonEncoder)
[docs]class Protocol(): class_version = str(Version(0,2)) def __init__(self, name=None): self.steps = [Step()] self.name = None self.plugin_data = {} self.plugin_fields = {} self.n_repeats = 1 self.current_step_attempt = 0 self.current_step_number = 0 self.current_repetition = 0 self.version = self.class_version @classmethod
[docs] def load(cls, filename): """ Load a Protocol from a file. Parameters ---------- filename : str Path to file. Raises ------ TypeError If file is not a :class:`Protocol`. FutureVersionError If file was written by a future version of the software. """ logger.debug("[Protocol].load(\"%s\")" % filename) logger.info("Loading Protocol from %s" % filename) start_time = time.time() out = None with open(filename, 'rb') as f: try: out = pickle.load(f) logger.debug("Loaded object from pickle.") except Exception, e: logger.debug("Not a valid pickle file. %s." % e) if out==None: with open(filename, 'rb') as f: try: out = yaml.load(f) logger.debug("Loaded object from YAML file.") except Exception, e: logger.debug("Not a valid YAML file. %s." % e) if out==None: raise TypeError out.filename = filename # enable loading of old protocols that were loaded as relative packages # (i.e., not subpackages of microdrop). if str(out.__class__) == 'protocol.Protocol': out.__class__ = cls # check type if out.__class__ != cls: raise TypeError, "File is wrong type: %s" % out.__class__ if not hasattr(out, 'version'): out.version = str(Version(0)) out._upgrade() enabled_plugins = get_service_names(env='microdrop.managed') + \ get_service_names('microdrop') for k, v in out.plugin_data.items(): if k in enabled_plugins: try: out.plugin_data[k] = pickle.loads(v) except Exception, e: out.plugin_data[k] = yaml.load(v) for i in range(len(out)): for k, v in out[i].plugin_data.items(): if k in enabled_plugins: try: out[i].plugin_data[k] = pickle.loads(v) except Exception, e: # enable loading of old protocols where the # dmf_device_controller was imported as a relative # package v = v.replace('!!python/object:gui.' 'dmf_device_controller.', '!!python/object:microdrop.gui.' 'dmf_device_controller.') out[i].plugin_data[k] = yaml.load(v) logger.debug("[Protocol].load() loaded in %f s." % \ (time.time()-start_time)) return out
def _upgrade(self): """ Upgrade the serialized object if necessary. Raises: FutureVersionError: file was written by a future version of the software. """ logger.debug("[Protocol]._upgrade()") version = Version.fromstring(self.version) logger.debug('[Protocol] version=%s, class_version=%s' % (str(version), self.class_version)) if version > Version.fromstring(self.class_version): logger.debug('[Protocol] version>class_version') raise FutureVersionError(Version.fromstring(self.class_version), version) elif version < Version.fromstring(self.class_version): if version < Version(0,1): for k, v in self.plugin_data.items(): self.plugin_data[k] = yaml.dump(v) for step in self.steps: for k, v in step.plugin_data.items(): step.plugin_data[k] = yaml.dump(v) self.version = str(Version(0,1)) logger.debug('[Protocol] upgrade to version %s' % self.version) if version < Version(0,2): self.current_step_attempt = 0 self.version = str(Version(0,2)) logger.debug('[Protocol] upgrade to version %s' % self.version) # else the versions are equal and don't need to be upgraded @property def plugins(self): return set(self.plugin_data.keys())
[docs] def plugin_name_lookup(self, name, re_pattern=False): if not re_pattern: return name for plugin_name in self.plugins: if re.search(name, plugin_name): return plugin_name return None
[docs] def get_step_values(self, plugin_name): logging.debug('[Protocol] plugin_data=%s' % self.plugin_data) return self.plugin_data.get(plugin_name)
[docs] def get_data(self, plugin_name): logging.debug('[Protocol] plugin_data=%s' % self.plugin_data) return self.plugin_data.get(plugin_name)
[docs] def set_data(self, plugin_name, data): self.plugin_data[plugin_name] = data
def __len__(self): return len(self.steps) def __getitem__(self, i): return self.steps[i]
[docs] def save(self, filename, format='pickle'): out = deepcopy(self) if hasattr(out, 'filename'): del out.filename # convert plugin data objects to strings for k, v in out.plugin_data.items(): out.plugin_data[k] = pickle.dumps(v, -1) for step in out.steps: for k, v in step.plugin_data.items(): step.plugin_data[k] = pickle.dumps(v, -1) with open(filename, 'wb') as f: if format=='pickle': pickle.dump(out, f, -1) elif format=='yaml': yaml.dump(out, f) else: raise TypeError
[docs] def get_step_number(self, default): if default is None: return self.current_step_number return default
[docs] def get_step(self, step_number=None): step_number = self.get_step_number(step_number) return self.steps[step_number]
[docs] def current_step(self): return self.steps[self.current_step_number]
[docs] def insert_steps(self, step_number=None, count=None, values=None): if values is None and count is None: raise ValueError, 'Either count or values must be specified' elif values is None: values = [Step()] * count for value in values[::-1]: self.insert_step(step_number, value, notify=False) emit_signal('on_steps_inserted', args=range(step_number, step_number + len(values)))
[docs] def insert_step(self, step_number=None, value=None, notify=True): if step_number is None: step_number = self.current_step_number if value is None: value = Step() self.steps.insert(step_number, value) emit_signal('on_step_created', args=[self.current_step_number]) if notify: emit_signal('on_step_inserted', args=[self.current_step_number])
[docs] def delete_step(self, step_number): step_to_remove = self.steps[step_number] del self.steps[step_number] emit_signal('on_step_removed', args=[step_number, step_to_remove]) if len(self.steps) == 0: # If we deleted the last remaining step, we need to insert a new # default Step self.insert_step(0, Step()) self.goto_step(0) elif self.current_step_number == len(self.steps): self.goto_step(step_number - 1) else: self.goto_step(self.current_step_number)
[docs] def delete_steps(self, step_ids): sorted_ids = sorted(step_ids) # Process deletion of steps in reverse order to avoid ID mismatch due # to deleted rows. sorted_ids.reverse() for id in sorted_ids: self.delete_step(id)
[docs] def next_step(self): if self.current_step_number == len(self.steps) - 1: self.insert_step(step_number=self.current_step_number, value=self.current_step().copy(), notify=False) self.next_step() emit_signal('on_step_inserted', args=[self.current_step_number]) else: self.goto_step(self.current_step_number + 1)
[docs] def next_repetition(self): if self.current_repetition < self.n_repeats - 1: self.current_repetition += 1 self.goto_step(0)
[docs] def prev_step(self): if self.current_step_number > 0: self.goto_step(self.current_step_number - 1)
[docs] def first_step(self): self.current_repetition = 0 self.goto_step(0)
[docs] def last_step(self): self.goto_step(len(self.steps) - 1)
[docs] def goto_step(self, step_number): logging.debug('[Protocol].goto_step(%s)' % step_number) original_step_number = self.current_step_number self.current_step_number = step_number emit_signal('on_step_swapped', [original_step_number, step_number])
[docs] def to_frame(self): ''' Returns ------- pandas.DataFrame Data frame with multi-index columns, indexed first by plugin name, then by plugin step field name. .. note:: If an exception is encountered while processing a plugin value, the plugin causing the exception is skipped and protocol values related to the plugin are not included in the result. See Also -------- :meth:`to_json`, :meth:`to_ndjson` ''' return protocol_to_frame(self)
[docs] def to_json(self): ''' Returns ------- str JSON-encoded dictionary, with two top-level keys: - ``keys``: * Each key is a list containing a plugin name and a corresponding step field name. - ``values``: * Maps to list of records (i.e., lists), one per protocol step. Each record in the ``values`` list may be *zipped together* with ``keys`` to yield a plugin field name to value mapping for a single protocol step. See Also -------- :meth:`to_frame`, :meth:`to_ndjson` ''' return protocol_to_json(self)
[docs] def to_ndjson(self, ostream=None): ''' Write protocol as newline delimted JSON (i.e., `ndjson`_, see `specification`_). Each subsequent line in the output is a nested JSON record, list), one line per protocol step. The keys of the top-level object of each record correspond to plugin names. The second-level keys correspond to the step field name. Parameters ---------- ostream : file-like, optional Output stream to write to. Returns ------- None or str If :data:`ostream` parameter is ``None``, return output as string. See Also -------- :meth:`to_frame`, :meth:`to_json` .. _`ndjson`: http://ndjson.org/ .. _`specification`: http://specs.frictionlessdata.io/ndjson/ ''' df_protocol = self.to_frame() if ostream is None: ostream = StringIO.StringIO() return_required = True else: return_required = False field_groups = [(group_i, list(fields_i)) for group_i, fields_i in it.groupby(df_protocol.columns.values, lambda v: v[0])] field_counts = np.cumsum([len(f[1]) for f in field_groups]) field_bases = field_counts.copy() field_bases[1:] = field_counts[:-1] field_bases[0] = 0 try: for row_i in df_protocol.values: row_dict_i = dict([(plugin_name_j, dict(zip(zip(*fields_j)[1], row_i[start_j:end_j]))) for (plugin_name_j, fields_j), start_j, end_j in zip(field_groups, field_bases[:-1], field_counts[:-1])]) print >> ostream, json.dumps(row_dict_i, cls=zp.schema.PandasJsonEncoder) finally: if return_required: return ostream.getvalue()
[docs]class Step(object): def __init__(self, plugin_data=None): if plugin_data is None: self.plugin_data = {} else: self.plugin_data = deepcopy(plugin_data)
[docs] def copy(self): return Step(plugin_data=deepcopy(self.plugin_data))
@property def plugins(self): return set(self.plugin_data.keys())
[docs] def plugin_name_lookup(self, name, re_pattern=False): if not re_pattern: return name for plugin_name in self.plugins: if re.search(name, plugin_name): return plugin_name return None
[docs] def get_data(self, plugin_name): logging.debug('[Step] plugin_data=%s' % self.plugin_data) return self.plugin_data.get(plugin_name)
[docs] def set_data(self, plugin_name, data): self.plugin_data[plugin_name] = data