Source code for pychron.experiment.automated_run.data_collector

# ===============================================================================
# Copyright 2013 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import time
from datetime import datetime
from threading import Event

# ============= enthought library imports =======================
from apptools.preferences.preference_binding import bind_preference
from traits.api import Any, List, CInt, Int, Bool, Enum, Str, Instance

from pychron.envisage.consoleable import Consoleable
from pychron.pychron_constants import AR_AR, SIGNAL, BASELINE, WHIFF, SNIFF


[docs]class DataCollector(Consoleable): """ Base class for ``Collector`` objects. Provides logic for iterative measurement. """ measurement_script = Any automated_run = Instance( "pychron.experiment.automated_run.automated_run.AutomatedRun" ) measurement_result = Str detectors = List check_conditionals = Bool(True) ncounts = CInt is_baseline = Bool(False) for_peak_hop = Bool(False) fits = List series_idx = Int fit_series_idx = Int canceled = False terminated = False _truncate_signal = False starttime = None starttime_abs = None _alive = False _evt = None _warned_no_fit = None _warned_no_det = None collection_kind = Enum((SNIFF, WHIFF, BASELINE, SIGNAL)) refresh_age = False _data = None _temp_conds = None _result = None _queue = None err_message = Str no_intensity_threshold = 100 not_intensity_count = 0 trigger = None plot_panel_update_period = Int(1) def __init__(self, *args, **kw): super(DataCollector, self).__init__(*args, **kw) bind_preference( self, "plot_panel_update_period", "pychron.experiment.plot_panel_update_period", ) # def wait(self): # st = time.time() # self.debug('wait started') # while 1: # if self._evt and self._evt.set(): # break # self.debug('wait complete {:0.1f}s'.format(time.time() - st)) def set_truncated(self): self._truncate_signal = True def stop(self): self._alive = False if self._evt: self._evt.set() def set_starttime(self, s): self.starttime = s if s is not None: # convert s (result of time.time()) to a datetime object self.starttime_abs = datetime.fromtimestamp(s) def measure(self): if self.canceled: return self.measurement_result = "" self.terminated = False self._truncate_signal = False self._warned_no_fit = [] self._warned_no_det = [] if self.starttime is None: self.starttime = time.time() self.starttime_abs = datetime.now() et = self.ncounts * self.period_ms * 0.001 self._alive = True self._measure() tt = time.time() - self.starttime self.debug("estimated time: {:0.3f} actual time: :{:0.3f}".format(et, tt)) # def plot_data(self, *args, **kw): # from pychron.core.ui.gui import invoke_in_main_thread # invoke_in_main_thread(self._plot_data, *args, **kw) def set_temporary_conditionals(self, cd): self._temp_conds = cd def clear_temporary_conditionals(self): self._temp_conds = None # private def _measure(self): self.debug("starting measurement") self._evt = evt = Event() # self._queue = q = Queue() # def writefunc(): # writer = self.data_writer # while not q.empty() or not evt.wait(10): # dets = self.detectors # while not q.empty(): # x, keys, signals = q.get() # writer(dets, x, keys, signals) # # # only write to file every 10 seconds and not on main thread # t = Thread(target=writefunc) # # t.setDaemon(True) # t.start() self.debug("measurement period (ms) = {}".format(self.period_ms)) period = self.period_ms * 0.001 i = 1 while not evt.is_set(): result = self._check_iteration(i) if not result: if not self._pre_trigger_hook(): break if self.trigger: self.trigger() evt.wait(period) self.automated_run.plot_panel.counts = i inc = self._iter_hook(i) if inc is None: break self._post_iter_hook(i) if inc: i += 1 else: if result == "cancel": self.canceled = True elif result == "terminate": self.terminated = True break evt.set() # self.debug('waiting for write to finish') # t.join() self.debug("measurement finished") def _pre_trigger_hook(self): return True def _post_iter_hook(self, i): if self.experiment_type == AR_AR and self.refresh_age and not i % 5: self.isotope_group.calculate_age(force=True) def _pre_trigger_hook(self): return True def _iter_hook(self, i): return self._iteration(i) def _iteration(self, i, detectors=None): try: data = self._get_data(detectors) if not data: return k, s, t, inc = data except (AttributeError, TypeError, ValueError) as e: self.debug("failed getting data {}".format(e)) return if k is not None and s is not None: x = self._get_time(t) self._save_data(x, k, s) self._plot_data(i, x, k, s) return inc def _get_time(self, t): if t is None: t = time.time() r = t - self.starttime else: # t is provided by the spectrometer. t should be a python datetime object # since t is in absolute time use self.starttime_abs r = t - self.starttime_abs # convert to seconds r = r.total_seconds() return r def _get_data(self, detectors=None): try: data = next(self.data_generator) except StopIteration: self.debug("data generator stopped") return if data: keys, signals, ct, inc = data if detectors: # data = list(zip(*(d for d in zip(*data) if d[0] in detectors))) nkeys, nsignals = [], [] for k, s in zip(keys, signals): if k in detectors: nkeys.append(k) nsignals.append(s) data = (nkeys, nsignals, ct, inc) self._data = (nkeys, nsignals) else: self._data = (keys, signals) return data def _save_data(self, x, keys, signals): # self._queue.put((x, keys, signals)) self.data_writer(self.detectors, x, keys, signals) # update arar_age if self.is_baseline and self.for_peak_hop: self._update_baseline_peak_hop(x, keys, signals) else: self._update_isotopes(x, keys, signals) def _update_baseline_peak_hop(self, x, keys, signals): ig = self.isotope_group for iso in ig.itervalues(): signal = self._get_signal(keys, signals, iso.detector) if signal is not None: if not ig.append_data(iso.name, iso.detector, x, signal, "baseline"): self.debug( "baselines - failed appending data for {}. " "not a current isotope {}".format(iso, ig.isotope_keys) ) def _update_isotopes(self, x, keys, signals): a = self.isotope_group kind = self.collection_kind for dn in keys: dn = self._get_detector(dn) if dn: iso = dn.isotope signal = self._get_signal(keys, signals, dn.name) if signal is not None: if not a.append_data(iso, dn.name, x, signal, kind): self.debug( "{} - failed appending data for {}. not a current isotope {}".format( kind, iso, a.isotope_keys ) ) def _get_signal(self, keys, signals, det): try: return signals[keys.index(det)] except ValueError: if det not in self._warned_no_det: self.warning("Detector {} is not available".format(det)) self._warned_no_det.append(det) self.canceled = True self.stop() def _get_detector(self, d): if isinstance(d, str): d = next((di for di in self.detectors if di.name == d), None) return d def _plot_data(self, cnt, x, keys, signals): for dn, signal in zip(keys, signals): det = self._get_detector(dn) if det: self._set_plot_data(cnt, det, x, signal) if not cnt % self.plot_panel_update_period: self.plot_panel.update() def _set_plot_data(self, cnt, det, x, signal): iso = det.isotope detname = det.name ypadding = det.ypadding if self.collection_kind == SNIFF: gs = [ (self.plot_panel.sniff_graph, iso, None, 0, 0), (self.plot_panel.isotope_graph, iso, None, 0, 0), ] elif self.collection_kind == BASELINE: iso = self.isotope_group.get_isotope(detector=detname, kind="baseline") if iso is not None: fit = iso.get_fit(cnt) else: fit = "average" gs = [(self.plot_panel.baseline_graph, detname, fit, 0, 0)] else: title = self.isotope_group.get_isotope_title(name=iso, detector=detname) iso = self.isotope_group.get_isotope(name=iso, detector=detname) fit = iso.get_fit(cnt) gs = [ ( self.plot_panel.isotope_graph, title, fit, self.series_idx, self.fit_series_idx, ) ] for g, name, fit, series, fit_series in gs: pid = g.get_plotid_by_ytitle(name) if pid is None: self.critical( "failed to locate {}, ytitles={}".format(name, g.get_plot_ytitles()) ) continue g.add_datum( (x, signal), series=series, plotid=pid, update_y_limits=True, ypadding=ypadding, ) if fit: g.set_fit(fit, plotid=pid, series=fit_series) # =============================================================================== # # =============================================================================== # =============================================================================== # checks # =============================================================================== # def _check_modification_conditionals(self, cnt): # tripped = self._check_conditionals(self.modification_conditionals, cnt) # if tripped: # queue = self.automated_run.experiment_executor.experiment_queue # tripped.do_modifications(queue, self.automated_run) # if tripped.use_truncation: # return self._set_run_truncated() def _check_conditionals(self, conditionals, cnt): self.err_message = "" for ti in conditionals: if ti.check(self.automated_run, self._data, cnt): m = "Conditional tripped: {}".format(ti.to_string()) self.info(m) self.err_message = m return ti def _equilibration_func(self, tr): if tr.use_truncation: self.measurement_script.abbreviated_count_ratio = tr.abbreviated_count_ratio return self._set_truncated() elif tr.use_termination: return "terminate" def _modification_func(self, tr): run = self.automated_run ex = run.experiment_executor queue = ex.experiment_queue tr.do_modifications(run, ex, queue) self.measurement_script.abbreviated_count_ratio = tr.abbreviated_count_ratio if tr.use_truncation: return self._set_truncated() elif tr.use_termination: return "terminate" def _truncation_func(self, tr): self.measurement_script.abbreviated_count_ratio = tr.abbreviated_count_ratio return self._set_truncated() def _action_func(self, tr): tr.perform(self.measurement_script) if not tr.resume: return "break" def _set_truncated(self): self.state = "truncated" self.automated_run.truncated = True self.automated_run.spec.state = "truncated" return "break" def _check_iteration(self, i): if self._temp_conds: ti = self._check_conditionals(self._temp_conds, i) if ti: self.measurement_result = ti.action return "break" j = i - 1 user_counts = 0 if self.plot_panel is None else self.plot_panel.ncounts script_counts = ( 0 if self.measurement_script is None else self.measurement_script.ncounts ) original_counts = self.ncounts count_args = (j, original_counts) # self.debug('user_counts={}, script_counts={}, original_counts={}'.format(user_counts, # script_counts, # original_counts)) if not self._alive: self.info("measurement iteration executed {}/{} counts".format(*count_args)) return "cancel" if user_counts != original_counts: if i > user_counts: self.info( "user termination. measurement iteration executed {}/{} counts".format( *count_args ) ) self.plot_panel.total_counts -= original_counts - i return self._set_truncated() elif script_counts != original_counts: if i > script_counts: self.info( "script termination. measurement iteration executed {}/{} counts".format( *count_args ) ) return self._set_truncated() elif i > original_counts: return "break" if self._truncate_signal: self.info("measurement iteration executed {}/{} counts".format(*count_args)) self._truncate_signal = False return self._set_truncated() if self.check_conditionals: for tag, func, conditionals in ( ( "modification", self._modification_func, self.modification_conditionals, ), ("truncation", self._truncation_func, self.truncation_conditionals), ("action", self._action_func, self.action_conditionals), ("termination", lambda x: "terminate", self.termination_conditionals), ("cancelation", lambda x: "cancel", self.cancelation_conditionals), ( "equilibration", self._equilibration_func, self.equilibration_conditionals, ), ): if tag == "equilibration" and self.collection_kind != SNIFF: continue tripped = self._check_conditionals(conditionals, i) if tripped: self.info( "{} conditional {}. measurement iteration executed {}/{} counts".format( tag, tripped.message, j, original_counts ), color="red", ) self.automated_run.show_conditionals(tripped=tripped) return func(tripped) @property def isotope_group(self): if self.automated_run: return self.automated_run.isotope_group @property def plot_panel(self): if self.automated_run: return self.automated_run.plot_panel @property def modification_conditionals(self): if self.automated_run: return self.automated_run.modification_conditionals @property def truncation_conditionals(self): if self.automated_run: return self.automated_run.truncation_conditionals @property def termination_conditionals(self): if self.automated_run: return self.automated_run.termination_conditionals @property def action_conditionals(self): if self.automated_run: return self.automated_run.action_conditionals @property def cancelation_conditionals(self): if self.automated_run: return self.automated_run.cancelation_conditionals @property def equilibration_conditionals(self): if self.automated_run: return self.automated_run.equilibration_conditionals
# ============= EOF =============================================