"""
Copyright 2011 Ryan Fobel
This file is part of MicroDrop.
MicroDrop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MicroDrop is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with MicroDrop. If not, see <http://www.gnu.org/licenses/>.
"""
from collections import OrderedDict
from copy import deepcopy
try:
import cPickle as pickle
except ImportError:
import pickle
import datetime as dt
import logging
import os
import time
import uuid
from microdrop_utility import is_int, Version, FutureVersionError
from path_helpers import path
import arrow
import pandas as pd
import yaml
logger = logging.getLogger(__name__)
[docs]def log_data_to_frame(log_data_i):
'''
Parameters
----------
log_data_i : microdrop.experiment_log.ExperimentLog
MicroDrop experiment log, as pickled in the ``data``
file in each experiment log directory.
Returns
-------
(pd.Series, pd.DataFrame)
Tuple containing:
- Experiment information, including UTC start time,
MicroDrop software version, list of plugin versions,
etc.
- Data frame with multi-index columns, indexed first by
plugin name, then by plugin field name.
.. note::
Values may be Python objects. In future versions
of MicroDrop, values *may* be restricted to json
compatible types.
'''
def log_frame_experiment_info(df_log):
experiment_info = df_log['core'].iloc[0].copy()
experiment_info.update(df_log['core'].iloc[-1])
start_time = arrow.get(experiment_info['start time']).naive
experiment_info['utc_start_time'] = start_time.isoformat()
for k in ('step', 'start time', 'time', 'attempt',
'utc_timestamp'):
if k in experiment_info.index:
del experiment_info[k]
return experiment_info.dropna()
plugin_names_i = sorted(reduce(lambda a, b:
a.union(b.keys()),
log_data_i.data, set()))
frames_i = OrderedDict()
for plugin_name_ij in plugin_names_i:
try:
frame_ij = pd.DataFrame(map(lambda v: pickle.loads(v)
if v else {},
[s.get(plugin_name_ij)
for s in log_data_i.data]))
except Exception, exception:
print plugin_name_ij, exception
else:
frames_i[plugin_name_ij] = frame_ij
df_log_i = pd.concat(frames_i.values(), axis=1, keys=frames_i.keys())
start_time_i = arrow.get(df_log_i
.iloc[0][('core', 'start time')]).naive
df_log_i[('core', 'utc_timestamp')] = \
(start_time_i + df_log_i[('core', 'time')]
.map(lambda s: dt.timedelta(seconds=s) if s == s else None))
df_log_i.sort_index(axis=1, inplace=True)
experiment_info = log_frame_experiment_info(df_log_i)
experiment_info['uuid'] = log_data_i.uuid
df_log_i.dropna(subset=[('core', 'step'), ('core', 'attempt')],
inplace=True)
return experiment_info, df_log_i
[docs]class ExperimentLog():
class_version = str(Version(0,3,0))
def __init__(self, directory=None):
self.directory = directory
self.data = []
self.version = self.class_version
self.uuid = str(uuid.uuid4())
self._get_next_id()
self.metadata = {} # Meta data, keyed by plugin name.
logger.info('[ExperimentLog] new log with id=%s and uuid=%s' % (self.experiment_id, self.uuid))
def _get_next_id(self):
if self.directory is None:
self.experiment_id = None
return
if(os.path.isdir(self.directory)==False):
os.makedirs(self.directory)
logs = path(self.directory).listdir()
self.experiment_id = 0
for d in logs:
if is_int(d.name):
i = int(d.name)
if i >= self.experiment_id:
self.experiment_id = i
# increment the experiment_id if the current directory is not empty
if len(d.listdir()):
self.experiment_id += 1
log_path = self.get_log_path()
if not log_path.isdir():
log_path.makedirs_p()
def _upgrade(self):
"""
Upgrade the serialized object if necessary.
Raises:
FutureVersionError: file was written by a future version of the
software.
"""
logger.debug("[ExperimentLog]._upgrade()")
version = Version.fromstring(self.version)
logger.debug('[ExperimentLog] version=%s, class_version=%s' % (str(version), self.class_version))
if version > Version.fromstring(self.class_version):
logger.debug('[ExperimentLog] version>class_version')
raise FutureVersionError
if version < Version(0,1,0):
new_data = []
plugin_name = None
for step_data in self.data:
if "control board hardware version" in step_data.keys():
plugin_name = "wheelerlab.dmf_control_board_" + \
step_data["control board hardware version"]
for i in range(len(self.data)):
new_data.append({})
for k, v in self.data[i].items():
if plugin_name and (k=="FeedbackResults" or \
k=="SweepFrequencyResults" or k=="SweepVoltageResults"):
try:
new_data[i][plugin_name] = \
{k:pickle.loads(v)}
except Exception, e:
logger.error("Couldn't load experiment log data "
"for plugin: %s. %s." % \
(plugin_name, e))
else:
if not "core" in new_data[i]:
new_data[i]["core"] = {}
new_data[i]["core"][k] = v
# serialize objects to yaml strings
for i in range(len(self.data)):
for plugin_name, plugin_data in new_data[i].items():
new_data[i][plugin_name] = yaml.dump(plugin_data)
self.data = new_data
self.version = str(Version(0,1,0))
if version < Version(0,2,0):
self.uuid = str(uuid.uuid4())
self.version = str(Version(0,2,0))
if version < Version(0,3,0):
self.metadata = {}
self.version = str(Version(0,3,0))
# else the versions are equal and don't need to be upgraded
@classmethod
[docs] def load(cls, filename):
"""
Load an experiment log from a file.
Args:
filename: path to file.
Raises:
TypeError: file is not an experiment log.
FutureVersionError: file was written by a future version of the
software.
"""
logger.debug("[ExperimentLog].load(\"%s\")" % filename)
logger.info("Loading Experiment log from %s" % filename)
out = None
start_time = time.time()
with open(filename, 'rb') as f:
try:
out = pickle.load(f)
logger.debug("Loaded object from pickle.")
except Exception, e:
logger.debug("Not a valid pickle file. %s." % e)
if out==None:
with open(filename, 'rb') as f:
try:
out = yaml.load(f)
logger.debug("Loaded object from YAML file.")
except Exception, e:
logger.debug("Not a valid YAML file. %s." % e)
if out==None:
raise TypeError
out.filename = filename
# check type
if out.__class__!=cls:
raise TypeError
if not hasattr(out, 'version'):
out.version = str(Version(0))
out._upgrade()
# load objects from serialized strings
for i in range(len(out.data)):
for plugin_name, plugin_data in out.data[i].items():
try:
out.data[i][plugin_name] = pickle.loads(plugin_data)
except Exception, e:
logger.debug("Not a valid pickle string ("
"plugin: %s). %s." % (plugin_name, e))
try:
out.data[i][plugin_name] = yaml.load(plugin_data)
except Exception, e:
logger.error("Couldn't load experiment log data for "
"plugin: %s. %s." % (plugin_name, e))
logger.debug("[ExperimentLog].load() loaded in %f s." % \
(time.time()-start_time))
return out
[docs] def save(self, filename=None, format='pickle'):
if filename==None:
log_path = self.get_log_path()
filename = os.path.join(log_path,"data")
else:
log_path = path(filename).parent
if self.data:
out = deepcopy(self)
# serialize plugin dictionaries to strings
for i in range(len(out.data)):
for plugin_name, plugin_data in out.data[i].items():
if format=='pickle':
out.data[i][plugin_name] = pickle.dumps(plugin_data)
elif format=='yaml':
out.data[i][plugin_name] = yaml.dump(plugin_data)
else:
raise TypeError
with open(filename, 'wb') as f:
if format=='pickle':
pickle.dump(out, f, -1)
elif format=='yaml':
yaml.dump(out, f)
else:
raise TypeError
return log_path
[docs] def start_time(self):
data = self.get("start time")
for val in data:
if val:
return val
start_time = time.time()
self.add_data({"start time":start_time})
return start_time
[docs] def get_log_path(self):
return path(self.directory).joinpath(str(self.experiment_id))
[docs] def add_step(self, step_number, attempt=0):
self.data.append({'core': {'step': step_number,
'time': (time.time() - self.start_time()),
'attempt': attempt}})
[docs] def add_data(self, data, plugin_name='core'):
if not self.data:
self.data.append({})
if not plugin_name in self.data[-1]:
self.data[-1][plugin_name] = {}
for k, v in data.items():
self.data[-1][plugin_name][k] = v
[docs] def get(self, name, plugin_name='core'):
var = []
for d in self.data:
if plugin_name in d and d[plugin_name].keys().count(name):
var.append(d[plugin_name][name])
else:
var.append(None)
return var
[docs] def to_frame(self):
'''
Returns
-------
(pd.Series, pd.DataFrame)
Tuple containing:
- Experiment information, including UTC start time, MicroDrop
software version, list of plugin versions, etc.
- Data frame with multi-index columns, indexed first by plugin
name, then by plugin field name.
.. note::
Values may be Python objects. In future versions
of MicroDrop, values *may* be restricted to json
compatible types.
'''
return log_data_to_frame(self)