Source code for microdrop.experiment_log

"""
Copyright 2011 Ryan Fobel

This file is part of MicroDrop.

MicroDrop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

MicroDrop is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with MicroDrop.  If not, see <http://www.gnu.org/licenses/>.
"""
from collections import OrderedDict
from copy import deepcopy
try:
    import cPickle as pickle
except ImportError:
    import pickle
import datetime as dt
import logging
import os
import time
import uuid

from microdrop_utility import is_int, Version, FutureVersionError
from path_helpers import path
import arrow
import pandas as pd
import yaml

logger = logging.getLogger(__name__)


[docs]def log_data_to_frame(log_data_i): ''' Parameters ---------- log_data_i : microdrop.experiment_log.ExperimentLog MicroDrop experiment log, as pickled in the ``data`` file in each experiment log directory. Returns ------- (pd.Series, pd.DataFrame) Tuple containing: - Experiment information, including UTC start time, MicroDrop software version, list of plugin versions, etc. - Data frame with multi-index columns, indexed first by plugin name, then by plugin field name. .. note:: Values may be Python objects. In future versions of MicroDrop, values *may* be restricted to json compatible types. ''' def log_frame_experiment_info(df_log): experiment_info = df_log['core'].iloc[0].copy() experiment_info.update(df_log['core'].iloc[-1]) start_time = arrow.get(experiment_info['start time']).naive experiment_info['utc_start_time'] = start_time.isoformat() for k in ('step', 'start time', 'time', 'attempt', 'utc_timestamp'): if k in experiment_info.index: del experiment_info[k] return experiment_info.dropna() plugin_names_i = sorted(reduce(lambda a, b: a.union(b.keys()), log_data_i.data, set())) frames_i = OrderedDict() for plugin_name_ij in plugin_names_i: try: frame_ij = pd.DataFrame(map(lambda v: pickle.loads(v) if v else {}, [s.get(plugin_name_ij) for s in log_data_i.data])) except Exception, exception: print plugin_name_ij, exception else: frames_i[plugin_name_ij] = frame_ij df_log_i = pd.concat(frames_i.values(), axis=1, keys=frames_i.keys()) start_time_i = arrow.get(df_log_i .iloc[0][('core', 'start time')]).naive df_log_i[('core', 'utc_timestamp')] = \ (start_time_i + df_log_i[('core', 'time')] .map(lambda s: dt.timedelta(seconds=s) if s == s else None)) df_log_i.sort_index(axis=1, inplace=True) experiment_info = log_frame_experiment_info(df_log_i) experiment_info['uuid'] = log_data_i.uuid df_log_i.dropna(subset=[('core', 'step'), ('core', 'attempt')], inplace=True) return experiment_info, df_log_i
[docs]class ExperimentLog(): class_version = str(Version(0,3,0)) def __init__(self, directory=None): self.directory = directory self.data = [] self.version = self.class_version self.uuid = str(uuid.uuid4()) self._get_next_id() self.metadata = {} # Meta data, keyed by plugin name. logger.info('[ExperimentLog] new log with id=%s and uuid=%s' % (self.experiment_id, self.uuid)) def _get_next_id(self): if self.directory is None: self.experiment_id = None return if(os.path.isdir(self.directory)==False): os.makedirs(self.directory) logs = path(self.directory).listdir() self.experiment_id = 0 for d in logs: if is_int(d.name): i = int(d.name) if i >= self.experiment_id: self.experiment_id = i # increment the experiment_id if the current directory is not empty if len(d.listdir()): self.experiment_id += 1 log_path = self.get_log_path() if not log_path.isdir(): log_path.makedirs_p() def _upgrade(self): """ Upgrade the serialized object if necessary. Raises: FutureVersionError: file was written by a future version of the software. """ logger.debug("[ExperimentLog]._upgrade()") version = Version.fromstring(self.version) logger.debug('[ExperimentLog] version=%s, class_version=%s' % (str(version), self.class_version)) if version > Version.fromstring(self.class_version): logger.debug('[ExperimentLog] version>class_version') raise FutureVersionError if version < Version(0,1,0): new_data = [] plugin_name = None for step_data in self.data: if "control board hardware version" in step_data.keys(): plugin_name = "wheelerlab.dmf_control_board_" + \ step_data["control board hardware version"] for i in range(len(self.data)): new_data.append({}) for k, v in self.data[i].items(): if plugin_name and (k=="FeedbackResults" or \ k=="SweepFrequencyResults" or k=="SweepVoltageResults"): try: new_data[i][plugin_name] = \ {k:pickle.loads(v)} except Exception, e: logger.error("Couldn't load experiment log data " "for plugin: %s. %s." % \ (plugin_name, e)) else: if not "core" in new_data[i]: new_data[i]["core"] = {} new_data[i]["core"][k] = v # serialize objects to yaml strings for i in range(len(self.data)): for plugin_name, plugin_data in new_data[i].items(): new_data[i][plugin_name] = yaml.dump(plugin_data) self.data = new_data self.version = str(Version(0,1,0)) if version < Version(0,2,0): self.uuid = str(uuid.uuid4()) self.version = str(Version(0,2,0)) if version < Version(0,3,0): self.metadata = {} self.version = str(Version(0,3,0)) # else the versions are equal and don't need to be upgraded @classmethod
[docs] def load(cls, filename): """ Load an experiment log from a file. Args: filename: path to file. Raises: TypeError: file is not an experiment log. FutureVersionError: file was written by a future version of the software. """ logger.debug("[ExperimentLog].load(\"%s\")" % filename) logger.info("Loading Experiment log from %s" % filename) out = None start_time = time.time() with open(filename, 'rb') as f: try: out = pickle.load(f) logger.debug("Loaded object from pickle.") except Exception, e: logger.debug("Not a valid pickle file. %s." % e) if out==None: with open(filename, 'rb') as f: try: out = yaml.load(f) logger.debug("Loaded object from YAML file.") except Exception, e: logger.debug("Not a valid YAML file. %s." % e) if out==None: raise TypeError out.filename = filename # check type if out.__class__!=cls: raise TypeError if not hasattr(out, 'version'): out.version = str(Version(0)) out._upgrade() # load objects from serialized strings for i in range(len(out.data)): for plugin_name, plugin_data in out.data[i].items(): try: out.data[i][plugin_name] = pickle.loads(plugin_data) except Exception, e: logger.debug("Not a valid pickle string (" "plugin: %s). %s." % (plugin_name, e)) try: out.data[i][plugin_name] = yaml.load(plugin_data) except Exception, e: logger.error("Couldn't load experiment log data for " "plugin: %s. %s." % (plugin_name, e)) logger.debug("[ExperimentLog].load() loaded in %f s." % \ (time.time()-start_time)) return out
[docs] def save(self, filename=None, format='pickle'): if filename==None: log_path = self.get_log_path() filename = os.path.join(log_path,"data") else: log_path = path(filename).parent if self.data: out = deepcopy(self) # serialize plugin dictionaries to strings for i in range(len(out.data)): for plugin_name, plugin_data in out.data[i].items(): if format=='pickle': out.data[i][plugin_name] = pickle.dumps(plugin_data) elif format=='yaml': out.data[i][plugin_name] = yaml.dump(plugin_data) else: raise TypeError with open(filename, 'wb') as f: if format=='pickle': pickle.dump(out, f, -1) elif format=='yaml': yaml.dump(out, f) else: raise TypeError return log_path
[docs] def start_time(self): data = self.get("start time") for val in data: if val: return val start_time = time.time() self.add_data({"start time":start_time}) return start_time
[docs] def get_log_path(self): return path(self.directory).joinpath(str(self.experiment_id))
[docs] def add_step(self, step_number, attempt=0): self.data.append({'core': {'step': step_number, 'time': (time.time() - self.start_time()), 'attempt': attempt}})
[docs] def add_data(self, data, plugin_name='core'): if not self.data: self.data.append({}) if not plugin_name in self.data[-1]: self.data[-1][plugin_name] = {} for k, v in data.items(): self.data[-1][plugin_name][k] = v
[docs] def get(self, name, plugin_name='core'): var = [] for d in self.data: if plugin_name in d and d[plugin_name].keys().count(name): var.append(d[plugin_name][name]) else: var.append(None) return var
[docs] def to_frame(self): ''' Returns ------- (pd.Series, pd.DataFrame) Tuple containing: - Experiment information, including UTC start time, MicroDrop software version, list of plugin versions, etc. - Data frame with multi-index columns, indexed first by plugin name, then by plugin field name. .. note:: Values may be Python objects. In future versions of MicroDrop, values *may* be restricted to json compatible types. ''' return log_data_to_frame(self)