Source code for pychron.experiment.automated_run.data_collector

# ===============================================================================
# Copyright 2013 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

from queue import Queue
from threading import Event, Thread

import time
# ============= enthought library imports =======================
from traits.api import Any, List, CInt, Int, Bool, Enum, Str, Instance

from pychron.envisage.consoleable import Consoleable
from pychron.pychron_constants import AR_AR, SIGNAL, BASELINE, WHIFF, SNIFF


[docs]class DataCollector(Consoleable): """ Base class for ``Collector`` objects. Provides logic for iterative measurement. """ measurement_script = Any automated_run = Instance('pychron.experiment.automated_run.automated_run.AutomatedRun') measurement_result = Str detectors = List check_conditionals = Bool(True) ncounts = CInt is_baseline = Bool(False) for_peak_hop = Bool(False) fits = List series_idx = Int fit_series_idx = Int canceled = False terminated = False _truncate_signal = False starttime = None _alive = False _evt = None _warned_no_fit = None _warned_no_det = None collection_kind = Enum((SNIFF, WHIFF, BASELINE, SIGNAL)) refresh_age = False _data = None _temp_conds = None _result = None _queue = None err_message = Str no_intensity_threshold = 100 not_intensity_count = 0 trigger = None def wait(self): st = time.time() self.debug('wait started') while 1: if self._evt and self._evt.set(): break self.debug('wait complete {:0.1f}s'.format(time.time() - st)) def set_truncated(self): self._truncate_signal = True def stop(self): self._alive = False if self._evt: self._evt.set() def measure(self): if self.canceled: return self.measurement_result = '' self.terminated = False self._truncate_signal = False self._warned_no_fit = [] self._warned_no_det = [] st = time.time() if self.starttime is None: self.starttime = st et = self.ncounts * self.period_ms * 0.001 # evt = self._evt # if evt: # evt.set() # else: # evt = Event() # self._evt = evt # evt = Event() # evt.clear() # self._evt = evt self._alive = True self._measure() tt = time.time() - st self.debug('estimated time: {:0.3f} actual time: :{:0.3f}'.format(et, tt)) def plot_data(self, *args, **kw): from pychron.core.ui.gui import invoke_in_main_thread invoke_in_main_thread(self._plot_data, *args, **kw) def set_temporary_conditionals(self, cd): self._temp_conds = cd def clear_temporary_conditionals(self): self._temp_conds = None # private def _measure(self): self.debug('starting measurement') self._queue = q = Queue() self._evt = Event() evt = self._evt def writefunc(): writer = self.data_writer while not q.empty() or not evt.wait(10): dets = self.detectors while not q.empty(): x, keys, signals = q.get() writer(dets, x, keys, signals) # only write to file every 10 seconds and not on main thread t = Thread(target=writefunc) # t.setDaemon(True) t.start() self.debug('measurement period (ms) = {}'.format(self.period_ms)) period = self.period_ms * 0.001 i = 1 # elapsed = 0 while not evt.is_set(): result = self._check_iteration(i) if not result: if not self._pre_trigger_hook(): break self.trigger() evt.wait(period) self.automated_run.plot_panel.counts = i if not self._iter_hook(i): break self._post_iter_hook(i) i += 1 else: if result == 'cancel': self.canceled = True elif result == 'terminate': self.terminated = True break evt.set() self.debug('waiting for write to finish') t.join() self.debug('measurement finished') def _post_iter_hook(self, i): if self.experiment_type == AR_AR and self.refresh_age and not i % 5: self.isotope_group.calculate_age(force=True) # t = Timer(0.05, self.isotope_group.calculate_age, kwargs={'force': True}) # t.start() def _pre_trigger_hook(self): return True def _iter_hook(self, i): return self._iteration(i) def _iteration(self, i, detectors=None): try: data = self._get_data(detectors) except (AttributeError, TypeError, ValueError) as e: self.debug('failed getting data {}'.format(e)) return if not data: return k, s = data if k is not None and s is not None: x = self._get_time() self._save_data(x, k, s) self._plot_data(i, x, k, s) return True def _get_time(self): return time.time() - self.starttime def _get_data(self, detectors=None): try: data = next(self.data_generator) except StopIteration: self.debug('data generator stopped') return if data: if detectors: data = list(zip(*[d for d in zip(*data) if d[0] in detectors])) self._data = data return data def _save_data(self, x, keys, signals): self._queue.put((x, keys, signals)) # update arar_age if self.is_baseline and self.for_peak_hop: self._update_baseline_peak_hop(x, keys, signals) else: self._update_isotopes(x, keys, signals) def _update_baseline_peak_hop(self, x, keys, signals): ig = self.isotope_group for iso in ig.itervalues(): signal = self._get_signal(keys, signals, iso.detector) if signal is not None: if not ig.append_data(iso.name, iso.detector, x, signal, 'baseline'): self.debug('baselines - failed appending data for {}. ' 'not a current isotope {}'.format(iso, ig.isotope_keys)) def _update_isotopes(self, x, keys, signals): a = self.isotope_group kind = self.collection_kind for dn in keys: dn = self._get_detector(dn) if dn: iso = dn.isotope signal = self._get_signal(keys, signals, dn.name) if signal is not None: if not a.append_data(iso, dn.name, x, signal, kind): self.debug('{} - failed appending data for {}. not a current isotope {}'.format(kind, iso, a.isotope_keys)) def _get_signal(self, keys, signals, det): try: return signals[keys.index(det)] except ValueError: if det not in self._warned_no_det: self.warning('Detector {} is not available'.format(det)) self._warned_no_det.append(det) self.canceled = True self.stop() def _get_detector(self, d): if isinstance(d, str): d = next((di for di in self.detectors if di.name == d), None) return d def _plot_data(self, cnt, x, keys, signals): for dn, signal in zip(keys, signals): det = self._get_detector(dn) if det: self._set_plot_data(cnt, det.isotope, det.name, x, signal) else: print('no detector obj for {}'.format(dn), [d.name for d in self.detectors]) self.plot_panel.update() def _set_plot_data(self, cnt, iso, det, x, signal): if self.collection_kind == SNIFF: gs = [(self.plot_panel.sniff_graph, iso, None, 0, 0), (self.plot_panel.isotope_graph, iso, None, 0, 0)] elif self.collection_kind == BASELINE: iso = self.isotope_group.get_isotope(detector=det, kind='baseline') if iso is not None: fit = iso.get_fit(cnt) else: fit = 'average' gs = [(self.plot_panel.baseline_graph, det, fit, 0, 0)] else: title = self.isotope_group.get_isotope_title(name=iso, detector=det) iso = self.isotope_group.get_isotope(name=iso, detector=det) fit = iso.get_fit(cnt) gs = [(self.plot_panel.isotope_graph, title, fit, self.series_idx, self.fit_series_idx)] dd = self._get_detector(det) ypadding = dd.ypadding for g, name, fit, series, fit_series in gs: pid = g.get_plotid_by_ytitle(name) g.add_datum((x, signal), series=series, plotid=pid, update_y_limits=True, ypadding=ypadding) if fit: g.set_fit(fit, plotid=pid, series=fit_series) # =============================================================================== # # =============================================================================== # =============================================================================== # checks # =============================================================================== # def _check_modification_conditionals(self, cnt): # tripped = self._check_conditionals(self.modification_conditionals, cnt) # if tripped: # queue = self.automated_run.experiment_executor.experiment_queue # tripped.do_modifications(queue, self.automated_run) # if tripped.use_truncation: # return self._set_run_truncated() def _check_conditionals(self, conditionals, cnt): self.err_message = '' for ti in conditionals: if ti.check(self.automated_run, self._data, cnt): m = 'Conditional tripped: {}'.format(ti.to_string()) self.info(m) self.err_message = m return ti def _check_iteration(self, i): if self._temp_conds: ti = self._check_conditionals(self._temp_conds, i) if ti: self.measurement_result = ti.action return 'break' j = i - 1 user_counts = 0 if self.plot_panel is None else self.plot_panel.ncounts script_counts = 0 if self.measurement_script is None else self.measurement_script.ncounts original_counts = self.ncounts count_args = (j, original_counts) # self.debug('user_counts={}, script_counts={}, original_counts={}'.format(user_counts, # script_counts, # original_counts)) def set_truncated(): self.state = 'truncated' self.automated_run.truncated = True self.automated_run.spec.state = 'truncated' return 'break' if not self._alive: self.info('measurement iteration executed {}/{} counts'.format(*count_args)) return 'cancel' if user_counts != original_counts: if i > user_counts: self.info('user termination. measurement iteration executed {}/{} counts'.format(*count_args)) self.plot_panel.total_counts -= (original_counts - i) return set_truncated() elif script_counts != original_counts: if i > script_counts: self.info('script termination. measurement iteration executed {}/{} counts'.format(*count_args)) return set_truncated() elif i > original_counts: return 'break' if self._truncate_signal: self.info('measurement iteration executed {}/{} counts'.format(*count_args)) self._truncate_signal = False return set_truncated() if self.check_conditionals: def modification_func(tr): queue = self.automated_run.experiment_executor.experiment_queue tr.do_modifications(queue, self.automated_run) self.measurement_script.abbreviated_count_ratio = tr.abbreviated_count_ratio if tr.use_truncation: return set_truncated() elif tr.use_termination: return 'terminate' def truncation_func(tr): self.measurement_script.abbreviated_count_ratio = tr.abbreviated_count_ratio return set_truncated() def action_func(tr): tr.perform(self.measurement_script) if not tr.resume: return 'break' for tag, func, conditionals in (('modification', modification_func, self.modification_conditionals), ('truncation', truncation_func, self.truncation_conditionals), ('action', action_func, self.action_conditionals), ('termination', lambda x: 'terminate', self.termination_conditionals), ('cancelation', lambda x: 'cancel', self.cancelation_conditionals)): tripped = self._check_conditionals(conditionals, i) if tripped: self.info('{} conditional {}. measurement iteration executed {}/{} counts'.format(tag, tripped.message, j, original_counts), color='red') self.automated_run.show_conditionals(tripped=tripped) return func(tripped) @property def isotope_group(self): if self.automated_run: return self.automated_run.isotope_group @property def plot_panel(self): if self.automated_run: return self.automated_run.plot_panel @property def modification_conditionals(self): if self.automated_run: return self.automated_run.modification_conditionals @property def truncation_conditionals(self): if self.automated_run: return self.automated_run.truncation_conditionals @property def termination_conditionals(self): if self.automated_run: return self.automated_run.termination_conditionals @property def action_conditionals(self): if self.automated_run: return self.automated_run.action_conditionals @property def cancelation_conditionals(self): if self.automated_run: return self.automated_run.cancelation_conditionals
# ============= EOF ============================================= # def _plot_baseline_for_peak_hop(self, cnt, x, keys, signals): # for dn, signal in zip(keys, signals): # det = self._get_detector(dn) # if det: # self._set_plot_data(cnt, det.isotope, det.name, x, signal) # # # for k, v in six.iteritems(self.isotope_group): # # signal = signals[keys.index(v.detector)] # # self._set_plot_data(i, None, v.detector, x, signal) # # def _plot_data_(self, cnt, x, keys, signals): # print('n={} keys={}'.format(len(keys), keys)) # print('n={} signals={}'.format(len(signals), signals)) # # for dn, signal in zip(keys, signals): # det = self._get_detector(dn) # if det: # self._set_plot_data(cnt, det.isotope, det.name, x, signal) # else: # print('no detector obj for {}'.format(dn), [d.name for d in self.detectors]) # # def _get_fit(self, cnt, det, iso): # # isotopes = self.isotope_group.isotopes # # # print 'isotopes', ['{}{}'.format(i.name, i.detector) for i in isotopes.itervalues()] # # print 'fff', cnt, det, iso # # for k, v in isotopes.iteritems(): # # print v.detector, v.name # # # print 'pairs', [(k, v.detector, v.name) for k, v in isotopes.iteritems()] # # print 'get_fit', det, iso, name # # print 'gff', cnt, det, iso, ix.name, name # name = iso # if self.is_baseline: # ix = self.isotope_group.get_isotope(detector=det, kind='baseline') # # name, ix = next(((k, v) for k, v in six.iteritems(isotopes) if v.detector == det), # # (None, None)) # # ix = ix.baseline # name = det # else: # # print('get fit isio={}, det={}'.format(iso, det)) # ix = self.isotope_group.get_isotope(iso, det) # if ix is not None: # name = ix.name # # name, ix = next(((k, v) for k, v in six.iteritems(isotopes) if v.detector == det and v.name == iso), # # (None, None)) # # fit = None # if ix is not None and self.collection_kind != SNIFF: # fit = ix.get_fit(cnt) # # else: # # print('fff', cnt, det, iso) # # for k, v in six.iteritems(isotopes): # # print(v.detector, v.name) # # return fit, name # # def _set_plot_data(self, cnt, iso, det, x, signal): # """ # if is_baseline than use detector to get isotope # """ # self.debug('set plot data {} {} {} {}'.format(cnt, iso, det, signal)) # # def update_graph(g, sidx, fidx): # if iso is None: # pids = [] # for isotope in self.isotope_group.itervalues(): # # print('{:<10s}{:<10s}{:<5s}'.format(isotope.name, isotope.detector, det)) # if isotope.detector == det: # pid = g.get_plotid_by_ytitle(isotope.detector) # # print('pid', det, pid) # if pid is not None: # try: # fit, _ = self._get_fit(cnt, det, isotope.name) # except BaseException as e: # self.debug('set_plot_data, is_baseline={} det={}, get_fit {}'.format(self.is_baseline, # det, e)) # continue # pids.append((pid, fit)) # else: # # try: # # get fit and name # fit, name = self._get_fit(cnt, det, iso) # except AttributeError as e: # name = None # self.debug('set_plot_data, get_fit {}'.format(e)) # # pid = None # if name is not None: # pid = g.get_plotid_by_ytitle(name) # # if pid is None: # pid = g.get_plotid_by_ytitle(iso) # # pids = [(pid, fit)] # # print('pids={}'.format(pids)) # # for p, f in pids: # if p is not None: # # if self.collection_kind == SNIFF: # # print cnt, p, iso, det # g.add_datum((x, signal), # series=sidx, # plotid=p, # update_y_limits=True, # ypadding='0.1') # if self.collection_kind == BASELINE: # print('setting fit {} p={} idx={}'.format(f, p, fidx)) # if f: # g.set_fit(f, plotid=p, series=fidx) # # igraph = self.plot_panel.isotope_graph # if self.collection_kind == SNIFF: # sgraph = self.plot_panel.sniff_graph # update_graph(sgraph, 0, 0) # update_graph(igraph, self.series_idx, self.fit_series_idx) # elif self.collection_kind == BASELINE: # bgraph = self.plot_panel.baseline_graph # update_graph(bgraph, 0, 0) # # update_graph(igraph, self.series_idx, self.fit_series_idx) # else: # update_graph(igraph, self.series_idx, self.fit_series_idx) # # def _plot_data(self, i, x, keys, signals): # if globalv.experiment_debug: # x *= (self.period_ms * 0.001) ** -1 # # if self.is_baseline and self.for_peak_hop: # self._plot_baseline_for_peak_hop(i, x, keys, signals) # else: # self._plot_data_(i, x, keys, signals) # # self.plot_panel.update()