Source code for pychron.experiment.automated_run.automated_run

# ===============================================================================
# Copyright 2011 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# # ============= enthought library imports =======================

import ast
import importlib
import os
import re
import time
import weakref
from pprint import pformat
from threading import Event as TEvent, Thread

from numpy import Inf, polyfit, linspace, polyval
from traits.api import (
    Any,
    Str,
    List,
    Property,
    Event,
    Instance,
    Bool,
    HasTraits,
    Float,
    Int,
    Long,
    Tuple,
    Dict,
)
from traits.trait_errors import TraitError

from pychron.core.helpers.filetools import add_extension
from pychron.core.helpers.filetools import get_path
from pychron.core.helpers.iterfuncs import groupby_key
from pychron.core.helpers.strtools import to_bool
from pychron.core.ui.gui import invoke_in_main_thread
from pychron.core.ui.preference_binding import set_preference

# from pychron.core.ui.thread import Thread
from pychron.core.yaml import yload
from pychron.experiment import ExtractionException
from pychron.experiment.automated_run.hop_util import parse_hops
from pychron.experiment.automated_run.persistence_spec import PersistenceSpec
from pychron.experiment.conditional.conditional import (
    TruncationConditional,
    ActionConditional,
    TerminationConditional,
    conditional_from_dict,
    CancelationConditional,
    conditionals_from_file,
    QueueModificationConditional,
    EquilibrationConditional,
)
from pychron.experiment.plot_panel import PlotPanel
from pychron.experiment.utilities.conditionals import (
    test_queue_conditionals_name,
    QUEUE,
    SYSTEM,
    RUN,
)
from pychron.experiment.utilities.environmentals import set_environmentals
from pychron.experiment.utilities.identifier import convert_identifier
from pychron.experiment.utilities.script import assemble_script_blob
from pychron.globals import globalv
from pychron.loggable import Loggable
from pychron.paths import paths
from pychron.pychron_constants import (
    NULL_STR,
    MEASUREMENT_COLOR,
    EXTRACTION_COLOR,
    SCRIPT_KEYS,
    AR_AR,
    NO_BLANK_CORRECT,
    EXTRACTION,
    MEASUREMENT,
    EM_SCRIPT_KEYS,
    SCRIPT_NAMES,
    POST_MEASUREMENT,
    POST_EQUILIBRATION,
    FAILED,
    TRUNCATED,
    SUCCESS,
    CANCELED,
)
from pychron.spectrometer.base_spectrometer import NoIntensityChange
from pychron.spectrometer.isotopx.manager.ngx import NGXSpectrometerManager
from pychron.spectrometer.pfeiffer.manager.quadera import QuaderaSpectrometerManager
from pychron.spectrometer.thermo.manager.base import ThermoSpectrometerManager

DEBUG = False


class ScriptInfo(HasTraits):
    measurement_script_name = Str
    extraction_script_name = Str
    post_measurement_script_name = Str
    post_equilibration_script_name = Str


SCRIPTS = {}
WARNED_SCRIPTS = []


[docs]class AutomatedRun(Loggable): """ The ``AutomatedRun`` object is used to execute automated analyses. It mostly delegates responisbility to other objects. It provides an interface for ``MeasurementPyscripts``. All measurement script commands have a corresponding function defined here. A commands corresponding function is defined as py_{function_name} for example ``position_magnet`` calls ``AutomatedRun.py_position_magnet`` data collection is handled by either ``MultiCollector`` or ``PeakHopCollector`` persistence (saving to file and database) is handled by ``AutomatedRunPersister`` An automated run is executed in four steps by the ``ExperimentExecutor``. #. start #. extraction #. measurement a. equilibration b. post_equilibration #. post_measurement equilibration and post_equilibration are executed concurrently with the measurement script this way equilibration gas can be measured. four pyscripts (all optional) are used to program analysis execution 1. extraction 2. measurement 3. post_equilibration 4. post_measurement four types of conditionals are available 1. termination_conditionals 2. truncation_conditionals 3. action_conditionals 4. cancelation_conditionals """ spectrometer_manager = Any extraction_line_manager = Any # experiment_executor = Any ion_optics_manager = Any multi_collector = Instance( "pychron.experiment.automated_run.multi_collector.MultiCollector" ) peak_hop_collector = Instance( "pychron.experiment.automated_run.peak_hop_collector.PeakHopCollector" ) persister = Instance( "pychron.experiment.automated_run.persistence.AutomatedRunPersister", () ) dvc_persister = Instance("pychron.dvc.dvc_persister.DVCPersister") labspy_client = Instance("pychron.labspy.client.LabspyClient") xls_persister = Instance( "pychron.experiment.automated_run.persistence.ExcelPersister" ) collector = Property script_info = Instance(ScriptInfo, ()) runner = Any monitor = Any plot_panel = Any isotope_group = Instance("pychron.processing.isotope_group.IsotopeGroup") spec = Any runid = Str uuid = Str analysis_id = Long fits = List eqtime = Float use_syn_extraction = Bool(False) is_first = Bool(False) is_last = Bool(False) is_peak_hop = Bool(False) truncated = Bool measuring = Bool(False) dirty = Bool(False) update = Event use_db_persistence = Bool(True) use_dvc_persistence = Bool(False) use_xls_persistence = Bool(False) measurement_script = Instance( "pychron.pyscripts.measurement_pyscript.MeasurementPyScript" ) post_measurement_script = Instance( "pychron.pyscripts.extraction_line_pyscript.ExtractionPyScript" ) post_equilibration_script = Instance( "pychron.pyscripts.extraction_line_pyscript.ExtractionPyScript" ) extraction_script = Instance( "pychron.pyscripts.extraction_line_pyscript.ExtractionPyScript" ) termination_conditionals = List truncation_conditionals = List equilibration_conditionals = List action_conditionals = List cancelation_conditionals = List modification_conditionals = List pre_run_termination_conditionals = List post_run_termination_conditionals = List tripped_conditional = Instance( "pychron.experiment.conditional.conditional.BaseConditional" ) peak_center = None coincidence_scan = None info_color = None signal_color = None baseline_color = None executor_event = Event ms_pumptime_start = None previous_blanks = Tuple previous_baselines = Dict _active_detectors = List _peak_center_detectors = List _loaded = False _measured = False _aborted = False _alive = Bool(False) _truncate_signal = Bool _equilibration_done = False _integration_seconds = Float(1.1) min_ms_pumptime = Int(60) overlap_evt = None use_peak_center_threshold = Bool peak_center_threshold = Float(3) peak_center_threshold_window = Int(10) persistence_spec = Instance(PersistenceSpec) experiment_type = Str(AR_AR) laboratory = Str instrument_name = Str intensity_scalar = Float _intensities = None log_path = Str failed_intensity_count_threshold = Int(3) use_equilibration_analysis = Bool(False) def set_preferences(self, preferences): self.debug("set preferences") for attr, cast in ( ("experiment_type", str), ("laboratory", str), ("instrument_name", str), ("use_equilibration_analysis", to_bool), ("use_peak_center_threshold", to_bool), ("peak_center_threshold", float), ("peak_center_threshold_window", int), ("failed_intensity_count_threshold", int), ): set_preference( preferences, self, attr, "pychron.experiment.{}".format(attr), cast ) for p in (self.persister, self.xls_persister, self.dvc_persister): if p is not None: p.set_preferences(preferences) self.multi_collector.console_set_preferences(preferences, "pychron.experiment") self.peak_hop_collector.console_set_preferences( preferences, "pychron.experiment" ) # =============================================================================== # pyscript interface # =============================================================================== def py_measure(self): return self.spectrometer_manager.measure() def py_get_intensity(self, detector): if self._intensities: try: idx = self._intensities["tags"].index(detector) except ValueError: return return self._intensities["signals"][idx] def py_set_intensity_scalar(self, v): self.intensity_scalar = v return True def py_set_isotope_group(self, name): if self.plot_panel: self.plot_panel.add_isotope_graph(name) def py_generate_ic_mftable(self, detectors, refiso, peak_center_config=None, n=1): return self._generate_ic_mftable(detectors, refiso, peak_center_config, n) def py_whiff( self, ncounts, conditionals, starttime, starttime_offset, series=0, fit_series=0 ): return self._whiff( ncounts, conditionals, starttime, starttime_offset, series, fit_series ) def py_reset_data(self): self.debug("reset data") self._persister_action("pre_measurement_save") def py_clear_cached_configuration(self): self.spectrometer_manager.spectrometer.clear_cached_config() def py_send_spectrometer_configuration(self): self.spectrometer_manager.spectrometer.send_configuration() def py_reload_mftable(self): self.spectrometer_manager.spectrometer.reload_mftable() def py_set_integration_time(self, v): self.set_integration_time(v) def py_is_last_run(self): return self.is_last def py_define_detectors(self, isotope, det): self._define_detectors(isotope, det) def py_position_hv(self, pos, detector): self._set_hv_position(pos, detector) def py_position_magnet(self, pos, detector, use_dac=False, for_collection=True): if not self._alive: return self._set_magnet_position( pos, detector, use_dac=use_dac, for_collection=for_collection ) def py_activate_detectors(self, dets, peak_center=False): if not self._alive: return if not self.spectrometer_manager: self.warning("no spectrometer manager") return if peak_center: self._peak_center_detectors = self._set_active_detectors(dets) else: self._activate_detectors(dets) def py_set_fits(self, fits): isotopes = self.isotope_group.isotopes if not fits: fits = self._get_default_fits() elif len(fits) == 1: fits = {i: fits[0] for i in isotopes} else: fits = dict([f.split(":") for f in fits]) g = self.plot_panel.isotope_graph for k, iso in isotopes.items(): try: fi = fits[k] except KeyError: try: fi = fits[iso.name] except KeyError: try: fi = fits["{}{}".format(iso.name, iso.detector)] except KeyError: fi = "linear" self.warning( 'No fit for "{}". defaulting to {}. ' 'check the measurement script "{}"'.format( k, fi, self.measurement_script.name ) ) iso.set_fit_blocks(fi) self.debug('set "{}" to "{}"'.format(k, fi)) idx = self._get_plot_id_by_ytitle(g, k, iso) if idx is not None: g.set_regressor(iso.regressor, idx) def py_set_baseline_fits(self, fits): if not fits: fits = self._get_default_fits(is_baseline=True) elif len(fits) == 1: fits = {i.detector: fits[0] for i in self.isotope_group.values()} elif isinstance(fits, str): fits = {i.detector: fits for i in self.isotope_group.values()} else: fits = dict([f.split(":") for f in fits]) for k, iso in self.isotope_group.items(): try: fi = fits[iso.detector] except KeyError: fi = ("average", "SEM") self.warning( 'No fit for "{}". defaulting to {}. ' 'check the measurement script "{}"'.format( iso.detector, fi, self.measurement_script.name ) ) iso.baseline.set_fit_blocks(fi) self.debug('set "{}" to "{}"'.format(iso.detector, fi)) def py_get_spectrometer_parameter(self, name): self.info("getting spectrometer parameter {}".format(name)) if self.spectrometer_manager: return self.spectrometer_manager.spectrometer.get_parameter(name) def py_set_spectrometer_parameter(self, name, v): self.info("setting spectrometer parameter {} {}".format(name, v)) if self.spectrometer_manager: self.spectrometer_manager.spectrometer.set_parameter(name, v) def py_raw_spectrometer_command(self, cmd): if self.spectrometer_manager: self.spectrometer_manager.spectrometer.ask(cmd) def py_data_collection( self, obj, ncounts, starttime, starttime_offset, series=0, fit_series=0, group="signal", integration_time=None, ): if not self._alive: return if self.plot_panel: self.plot_panel.is_baseline = False self.plot_panel.show_isotope_graph() self.persister.build_tables(group, self._active_detectors, ncounts) self.multi_collector.is_baseline = False self.multi_collector.fit_series_idx = fit_series check_conditionals = obj == self.measurement_script if integration_time: self.set_integration_time(integration_time) result = self._measure( group, self.persister.get_data_writer(group), ncounts, starttime, starttime_offset, series, check_conditionals, self.signal_color, obj, ) return result def py_post_equilibration(self, **kw): self.do_post_equilibration(**kw) _equilibration_thread = None _equilibration_evt = None def py_equilibration( self, eqtime=None, inlet=None, outlet=None, do_post_equilibration=True, close_inlet=True, delay=None, ): # evt = TEvent() # if not self._alive: # evt.set() # return evt self.heading("Equilibration Started") inlet = self._convert_valve(inlet) outlet = self._convert_valve(outlet) elm = self.extraction_line_manager if elm: if outlet: # close mass spec ion pump for o in outlet: for i in range(3): ok, changed = elm.close_valve(o, mode="script") if ok: break else: time.sleep(0.1) else: from pychron.core.ui.gui import invoke_in_main_thread invoke_in_main_thread( self.warning_dialog, 'Equilibration: Failed to Close "{}"'.format(o), ) self.cancel_run(do_post_equilibration=False) return if inlet: self.debug( "waiting {}s before opening inlet value {}".format(delay, inlet) ) # evt.wait(delay) time.sleep(delay) self.debug("delay completed") # open inlet for i in inlet: for j in range(3): ok, changed = elm.open_valve(i, mode="script") if ok: break else: time.sleep(0.5) else: from pychron.core.ui.gui import invoke_in_main_thread invoke_in_main_thread( self.warning_dialog, 'Equilibration: Failed to Open "{}"'.format(i), ) self.cancel_run(do_post_equilibration=False) return # set the passed in event # evt.set() self._equilibration_thread = Thread( name="equilibration", target=self._equilibrate, args=(None,), kwargs=dict( eqtime=eqtime, inlet=inlet, outlet=outlet, delay=delay, close_inlet=close_inlet, do_post_equilibration=do_post_equilibration, ), ) self._equilibration_thread.start() return True # self._equilibration_evt = evt # return evt def py_sniff(self, ncounts, starttime, starttime_offset, series=0, block=True): if block: return self._sniff(ncounts, starttime, starttime_offset, series) else: t = Thread( target=self._sniff, name="sniff", args=(ncounts, starttime, starttime_offset, series), ) # t.setDaemon(True) t.start() return True def py_baselines( self, ncounts, starttime, starttime_offset, mass, detector, series=0, fit_series=0, settling_time=4, integration_time=None, use_dac=False, check_conditionals=True, ): if not self._alive: return gn = "baseline" self.debug("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Baseline") self.persister.build_tables(gn, self._active_detectors, ncounts) ion = self.ion_optics_manager if mass: if ion is not None: if detector is None: detector = self._active_detectors[0].name ion.position(mass, detector, use_dac=use_dac) msg = "Delaying {}s for detectors to settle".format(settling_time) self.info(msg) if self.plot_panel: self.plot_panel.total_counts += settling_time self.plot_panel.total_seconds += settling_time self.wait(settling_time, msg) if self.plot_panel: # self.plot_panel.set_ncounts(ncounts) self.plot_panel.is_baseline = True self.plot_panel.show_baseline_graph() self.multi_collector.is_baseline = True self.multi_collector.fit_series_idx = fit_series self.collector.for_peak_hop = self.plot_panel.is_peak_hop self.plot_panel.is_peak_hop = False if integration_time: self.set_integration_time(integration_time) result = self._measure( gn, self.persister.get_data_writer(gn), ncounts, starttime, starttime_offset, series, check_conditionals, self.baseline_color, ) if self.plot_panel: self.plot_panel.is_baseline = False self.multi_collector.is_baseline = False return result
[docs] def py_define_hops(self, hopstr): """ set the detector each isotope add additional isotopes and associated plots if necessary """ if self.plot_panel is None: self.plot_panel = self._new_plot_panel( self.plot_panel, stack_order="top_to_bottom" ) self.plot_panel.is_peak_hop = True a = self.isotope_group g = self.plot_panel.isotope_graph g.clear() self.measurement_script.reset_series() hops = parse_hops(hopstr, ret="iso,det,is_baseline") for iso, det, is_baseline in hops: if is_baseline: continue name = iso if name in a.isotopes: ii = a.isotopes[name] if ii.detector != det: name = "{}{}".format(iso, det) ii = a.isotope_factory(name=iso, detector=det) else: ii = a.isotope_factory(name=name, detector=det) pid = self._get_plot_id_by_ytitle(g, name) if pid is None: plot = self.plot_panel.new_isotope_plot() pid = g.plots.index(plot) else: plot = g.plots[pid] plot.y_axis.title = name g.set_regressor(ii.regressor, pid) a.isotopes[name] = ii self._load_previous() self.plot_panel.analysis_view.load(self)
# map_mass = self.spectrometer_manager.spectrometer.map_mass # hops = [(map_mass(hi[0]),) + tuple(hi) for hi in hops] # # for mass, dets in groupby_key(hops, key=itemgetter(0), reverse=True): # dets = list(dets) # iso = dets[0][1] # if dets[0][3]: # continue # # for _, _, di, _ in dets: # self._add_active_detector(di) # name = iso # if iso in a.isotopes: # ii = a.isotopes[iso] # if ii.detector != di: # name = '{}{}'.format(iso, di) # ii = a.isotope_factory(name=name, detector=di) # else: # ii = a.isotope_factory(name=iso, detector=di) # # pid = self._get_plot_id_by_ytitle(g, ii, di) # if pid is None: # plots = self.plot_panel.new_isotope_plot() # plot = plots['isotope'] # pid = g.plots.index(plot) # # # this line causes and issue when trying to plot the sniff on the isotope graph # # g.new_series(type='scatter', fit='linear', plotid=pid) # # g.set_regressor(ii.regressor, pid) # a.isotopes[name] = ii # plot.y_axis.title = name # # self._load_previous() # # self.plot_panel.analysis_view.load(self) def py_peak_hop( self, cycles, counts, hops, mftable, starttime, starttime_offset, series=0, fit_series=0, group="signal", ): if not self._alive: return with self.ion_optics_manager.mftable_ctx(mftable): is_baseline = False self.peak_hop_collector.is_baseline = is_baseline self.peak_hop_collector.fit_series_idx = fit_series if self.plot_panel: self.plot_panel.trait_set( is_baseline=is_baseline, _ncycles=cycles, hops=hops ) self.plot_panel.show_isotope_graph() # required for mass spec self.persister.save_as_peak_hop = True self.is_peak_hop = True check_conditionals = True self._add_conditionals() ret = self._peak_hop( cycles, counts, hops, group, starttime, starttime_offset, series, check_conditionals, ) self.is_peak_hop = False return ret def py_peak_center( self, detector=None, save=True, isotope=None, directions="Increase", config_name="default", check_intensity=None, peak_center_threshold=None, peak_center_threshold_window=None, **kw ): if not self._alive: return if check_intensity is None: check_intensity = self.use_peak_center_threshold if peak_center_threshold is None: peak_center_threshold = self.peak_center_threshold if peak_center_threshold_window is None: peak_center_threshold_window = self.peak_center_threshold_window ion = self.ion_optics_manager if ion is not None: if self.isotope_group and check_intensity: iso = self.isotope_group.get_isotope(isotope, detector) if iso: ys = iso.ys[-peak_center_threshold_window:] ym = ys.mean() self.debug( "peak center: mean={} threshold={}".format( ym, self.peak_center_threshold ) ) if ym < peak_center_threshold: self.warning( "Skipping peak center. intensities too small. {}<{}".format( ym, self.peak_center_threshold ) ) return else: self.debug( 'No isotope="{}", Det="{}" in isotope group. {}'.format( isotope, detector, self.isotope_group.isotope_keys ) ) if not self.plot_panel: p = self._new_plot_panel(self.plot_panel, stack_order="top_to_bottom") self.plot_panel = p self.debug("peak center started") ad = [di.name for di in self._peak_center_detectors if di.name != detector] pc = ion.setup_peak_center( detector=[detector] + ad, plot_panel=self.plot_panel, isotope=isotope, directions=directions, config_name=config_name, use_configuration_dac=False, **kw ) self.peak_center = pc self.debug("do peak center. {}".format(pc)) ion.do_peak_center( new_thread=False, save=save, message="automated run peakcenter", timeout=300, ) self._update_persister_spec(peak_center=pc) if pc.result: self.persister.save_peak_center_to_file(pc) def py_coincidence_scan(self): pass # sm = self.spectrometer_manager # obj, t = sm.do_coincidence_scan() # self.coincidence_scan = obj # t.join() # =============================================================================== # conditionals # ===============================================================================
[docs] def py_add_cancelation(self, **kw): """ cancel experiment if teststr evaluates to true """ self._conditional_appender( "cancelation", kw, CancelationConditional, level=RUN, location=self.measurement_script.name, )
[docs] def py_add_action(self, **kw): """ attr must be an attribute of arar_age perform a specified action if teststr evaluates to true """ self._conditional_appender( "action", kw, ActionConditional, level=RUN, location=self.measurement_script.name, )
[docs] def py_add_termination(self, **kw): """ attr must be an attribute of arar_age terminate run and continue experiment if teststr evaluates to true """ self._conditional_appender( "termination", kw, TerminationConditional, level=RUN, location=self.measurement_script.name, )
[docs] def py_add_truncation(self, **kw): """ attr must be an attribute of arar_age truncate measurement and continue run if teststr evaluates to true default kw: attr='', comp='',start_count=50, frequency=5, abbreviated_count_ratio=1.0 """ self._conditional_appender( "truncation", kw, TruncationConditional, level=RUN, location=self.measurement_script.name, )
def py_clear_conditionals(self): self.debug("$$$$$ Clearing conditionals") self.py_clear_terminations() self.py_clear_truncations() self.py_clear_actions() self.py_clear_cancelations() def py_clear_cancelations(self): self.cancelation_conditionals = [] def py_clear_terminations(self): self.termination_conditionals = [] def py_clear_truncations(self): self.truncation_conditionals = [] def py_clear_actions(self): self.action_conditionals = [] def py_clear_modifications(self): self.modification_conditionals = [] # =============================================================================== # run termination # =============================================================================== def set_end_after(self): self.is_last = True self.executor_event = {"kind": "end_after"} def abort_run(self, do_post_equilibration=True): self._aborted = True self.debug("Abort run do_post_equilibration={}".format(do_post_equilibration)) self._persister_action("trait_set", save_enabled=False) for s in EM_SCRIPT_KEYS: script = getattr(self, "{}_script".format(s)) if script is not None: script.abort() if self.peak_center: self.debug("cancel peak center") self.peak_center.cancel() self.do_post_termination(do_post_equilibration=do_post_equilibration) self.finish() if self.spec.state != "not run": self.spec.state = "aborted" self.experiment_queue.refresh_table_needed = True
[docs] def cancel_run(self, state="canceled", do_post_equilibration=True): """ terminate the measurement script immediately do post termination post_eq and post_meas don't save run """ self.debug( "Cancel run state={} do_post_equilibration={}".format( state, do_post_equilibration ) ) self.collector.canceled = True self._persister_action("trait_set", save_enabled=False) for s in EM_SCRIPT_KEYS: script = getattr(self, "{}_script".format(s)) if script is not None: script.cancel() if self.peak_center: self.debug("cancel peak center") self.peak_center.cancel() if self.spectrometer_manager: self.spectrometer_manager.spectrometer.cancel() self.do_post_termination(do_post_equilibration=do_post_equilibration) self.finish() if state: if self.spec.state != "not run": self.spec.state = state self.experiment_queue.refresh_table_needed = True
[docs] def truncate_run(self, style="normal"): """ truncate the measurement script style: normal- truncate current measure iteration and continue quick- truncate current measure iteration use truncated_counts for following measure iterations """ if self.measuring: style = style.lower() if style == "normal": self.measurement_script.truncate("normal") elif style == "quick": self.measurement_script.truncate("quick") self.collector.set_truncated() self.truncated = True self.spec.state = "truncated" self.experiment_queue.refresh_table_needed = True
# =============================================================================== # # =============================================================================== def show_conditionals(self, tripped=None): self.tripped_conditional = tripped self.executor_event = {"kind": "show_conditionals", "tripped": tripped} def teardown(self): self.debug("tear down") if self.measurement_script: self.measurement_script.automated_run = None if self.extraction_script: self.extraction_script.automated_run = None if self.collector: self.collector.automated_run = None # if self.plot_panel: # self.plot_panel.automated_run = None self._persister_action("trait_set", persistence_spec=None, monitor=None) def finish(self): self.debug("----------------- finish -----------------") if self.monitor: self.monitor.stop() if self.spec: if self.spec.state not in ( "not run", CANCELED, SUCCESS, TRUNCATED, "aborted", ): self.spec.state = FAILED self.experiment_queue.refresh_table_needed = True self.spectrometer_manager.spectrometer.active_detectors = [] self.stop() def stop(self): self.debug("----------------- stop -----------------") self._alive = False self.collector.stop() def start(self): self.debug("----------------- start -----------------") self._aborted = False self.persistence_spec = PersistenceSpec() for p in (self.persister, self.xls_persister, self.dvc_persister): if p is not None: p.per_spec = self.persistence_spec if self.monitor is None: return self._start() if self.monitor.monitor(): try: return self._start() except AttributeError as e: self.warning("failed starting run: {}".format(e)) else: self.warning("failed to start monitor") def is_alive(self): return self._alive def heading(self, msg, color=None, *args, **kw): super(AutomatedRun, self).info(msg, *args, **kw) if color is None: color = self.info_color if color is None: color = "light green" self.executor_event = { "kind": "heading", "message": msg, "color": color, "log": False, } def info(self, msg, color=None, *args, **kw): super(AutomatedRun, self).info(msg, *args, **kw) if color is None: color = self.info_color if color is None: color = "light green" self.executor_event = { "kind": "info", "message": msg, "color": color, "log": False, }
[docs] def get_interpolation_value(self, value): """ value is a string in the format of $VALUE. Search for VALUE first in the options file then in the extraction scripts metadata :param value: :return: """ v = None if self.extraction_script: for vv in (value, value.upper(), value.lower()): try: v = getattr(self.extraction_script, vv) except AttributeError: v = self._get_extraction_parameter(vv, None) if v is None: continue break if v is None: self.warning( "Could not interpolate {}. Make sure value is defined in either the options file" "or embedded in the extraction scripts metadata. Defaulting to 0".format( value ) ) v = 0 return v
def get_ratio(self, r, non_ic_corr=True): if self.isotope_group: return self.isotope_group.get_ratio(r, non_ic_corr=non_ic_corr) def get_reference_peakcenter_result(self): if self.persistence_spec: pc = self.persistence_spec.peak_center if pc: rn = pc.reference_detector.name return pc.get_result(rn) def get_device_value(self, dev_name): return self.extraction_line_manager.get_device_value(dev_name) def get_pressure(self, attr): controller, name = attr.split(".") return self.extraction_line_manager.get_pressure(controller, name) def get_deflection(self, det, current=False): return self.spectrometer_manager.spectrometer.get_deflection(det, current) def get_detector(self, det): return self.spectrometer_manager.spectrometer.get_detector(det) def set_integration_time(self, v): spectrometer = self.spectrometer_manager.spectrometer nv = spectrometer.set_integration_time(v, force=True) self._integration_seconds = nv def set_magnet_position(self, *args, **kw): return self._set_magnet_position(*args, **kw) def set_deflection(self, det, defl): self.spectrometer_manager.set_deflection(det, defl) def protect_detector(self, det, protect): self.spectrometer_manager.protect_detector(det, protect) def wait(self, t, msg=""): self.executor_event = {"kind": "wait", "duration": t, "message": msg}
[docs] def wait_for_overlap(self): """ by default overlap_evt is set after equilibration finished """ self.info("waiting for overlap signal") self._alive = True self.overlap_evt = evt = TEvent() evt.clear() i = 1 st = time.time() while self._alive and not evt.is_set(): time.sleep(1) if i % 5 == 0: et = time.time() - st self.debug( "waiting for overlap signal. elapsed time={:0.2f}".format(et) ) i = 0 i += 1 if not self._alive: return self.info("overlap signal set") overlap, mp = self.spec.overlap self.info("starting overlap delay {}".format(overlap)) starttime = time.time() i = 1 while self._alive: et = time.time() - starttime if et > overlap: break time.sleep(1.0) if i % 50 == 0: self.debug( "waiting overlap delay {}. elapsed time={:0.2f}".format(overlap, et) ) i = 0 i += 1
def post_finish(self): if self.use_dvc_persistence: if self.log_path: self.dvc_persister.save_run_log_file(self.log_path) else: self.debug("no log path to save") def save(self): self.debug( "post measurement save measured={} aborted={}".format( self._measured, self._aborted ) ) if self._measured and not self._aborted: # set filtering self._set_filtering() conds = ( self.termination_conditionals, self.truncation_conditionals, self.action_conditionals, self.cancelation_conditionals, self.modification_conditionals, self.equilibration_conditionals, ) env = self._get_environmentals() if env: set_environmentals(self.spec, env) tag = "ok" if self.spec.state in (CANCELED, FAILED): tag = self.spec.state self._update_persister_spec( active_detectors=self._active_detectors, conditionals=[c for cond in conds for c in cond], tag=tag, tripped_conditional=self.tripped_conditional, **env ) # save to database self._persister_save_action("post_measurement_save") self.spec.new_result(self) if self.plot_panel: self.plot_panel.analysis_view.refresh_needed = True if self.persister.secondary_database_fail: self.executor_event = { "kind": "cancel", "cancel_run": True, "msg": self.persister.secondary_database_fail, } else: return True else: return True # =============================================================================== # setup # =============================================================================== def setup_persister(self): sens = self._get_extraction_parameter("sensitivity_multiplier", default=1) # setup persister. mirror a few of AutomatedRunsAttributes script_name, script_blob = self._assemble_script_blob() eqn, eqb = "", "" queue = self.experiment_queue eqn = queue.name auto_save_detector_ic = queue.auto_save_detector_ic self.debug( "$$$$$$$$$$$$$$$ auto_save_detector_ic={}".format(auto_save_detector_ic) ) ext_name, ext_blob = "", "" if self.extraction_script: ext_name = self.extraction_script.name ext_blob = self._assemble_extraction_blob() ms_name, ms_blob, sfods, bsfods = "", "", {}, {} hops_name, hops_blob = "", "" if self.measurement_script: ms_name = self.measurement_script.name ms_blob = self.measurement_script.toblob() hops_name = self.measurement_script.hops_name hops_blob = self.measurement_script.hops_blob sfods, bsfods = self._get_default_fods() pe_name, pe_blob = "", "" if self.post_equilibration_script: pe_name = self.post_equilibration_script.name pe_blob = self.post_equilibration_script.toblob() pm_name, pm_blob = "", "" if self.post_measurement_script: pm_name = self.post_measurement_script.name pm_blob = self.post_measurement_script.toblob() ext_pos = [] if self.extraction_script: ext_pos = self.extraction_script.get_extraction_positions() self._update_persister_spec( save_as_peak_hop=False, run_spec=self.spec, isotope_group=self.isotope_group, positions=self.spec.get_position_list(), auto_save_detector_ic=auto_save_detector_ic, extraction_positions=ext_pos, sensitivity_multiplier=sens, experiment_type=self.experiment_type, experiment_queue_name=eqn, experiment_queue_blob=eqb, extraction_name=ext_name, extraction_blob=ext_blob, measurement_name=ms_name, measurement_blob=ms_blob, post_measurement_name=pm_name, post_measurement_blob=pm_blob, post_equilibration_name=pe_name, post_equilibration_blob=pe_blob, hops_name=hops_name, hops_blob=hops_blob, runscript_name=script_name, runscript_blob=script_blob, signal_fods=sfods, baseline_fods=bsfods, intensity_scalar=self.intensity_scalar, laboratory=self.laboratory, instrument_name=self.instrument_name, ) # =============================================================================== # doers # =============================================================================== def start_extraction(self): return self._start_script(EXTRACTION) def start_measurement(self): return self._start_script(MEASUREMENT) def do_extraction(self): self.debug("do extraction") self._persister_action("pre_extraction_save") self.info_color = EXTRACTION_COLOR script = self.extraction_script msg = "Extraction Started {}".format(script.name) self.heading("{}".format(msg)) self.spec.state = "extraction" self.experiment_queue.refresh_table_needed = True self.debug("DO EXTRACTION {}".format(self.runner)) script.set_run_identifier(self.runid) queue = self.experiment_queue script.set_load_identifier(queue.load_name) syn_extractor = None if script.syntax_ok(warn=False): if self.use_syn_extraction and self.spec.syn_extraction: p = os.path.join( paths.scripts_dir, "syn_extraction", self.spec.syn_extraction ) p = add_extension(p, ".yaml") if os.path.isfile(p): from pychron.experiment.automated_run.syn_extraction import ( SynExtractionCollector, ) dur = script.calculate_estimated_duration(force=True) syn_extractor = SynExtractionCollector( arun=weakref.ref(self)(), path=p, extraction_duration=dur ) syn_extractor.start() else: self.warning( "Cannot start syn extraction collection. Configuration file does not exist. {}".format( p ) ) else: self.warning('Invalid script syntax for "{}"'.format(script.name)) return try: ex_result = script.execute() except ExtractionException as e: ex_result = False self.debug("extraction exception={}".format(e)) if ex_result: if syn_extractor: syn_extractor.stop() # report the extraction results ach, req = script.output_achieved() self.info("Requested Output= {:0.3f}".format(req)) self.info("Achieved Output= {:0.3f}".format(ach)) rblob = script.get_response_blob() oblob = script.get_output_blob() sblob = script.get_setpoint_blob() snapshots = script.snapshots videos = script.videos extraction_context = script.extraction_context grain_polygons = script.get_grain_polygons() or [] self.debug("grain polygons n={}".format(len(grain_polygons))) ext_pos = script.get_extraction_positions() pid = script.get_active_pid_parameters() self._update_persister_spec( pid=pid or "", grain_polygons=grain_polygons, power_achieved=ach, response_blob=rblob, output_blob=oblob, setpoint_blob=sblob, snapshots=snapshots, videos=videos, extraction_positions=ext_pos, extraction_context=extraction_context, ) self._persister_save_action("post_extraction_save") self.heading("Extraction Finished") self.info_color = None # if overlapping need to wait for previous runs min mass spec pump time self._wait_for_min_ms_pumptime() else: if syn_extractor: syn_extractor.stop() self.do_post_equilibration() self.do_post_measurement() self.finish() self.heading("Extraction Finished unsuccessfully", color="red") self.info_color = None return bool(ex_result) def do_measurement(self, script=None, use_post_on_fail=True): self.debug("do measurement") self.debug( "L#={} analysis type={}".format( self.spec.labnumber, self.spec.analysis_type ) ) if not self._alive: self.warning("run is not alive") return if script is None: script = self.measurement_script if script is None: self.warning("no measurement script") return # use a measurement_script to explicitly define # measurement sequence self.info_color = MEASUREMENT_COLOR msg = "Measurement Started {}".format(script.name) self.heading("{}".format(msg)) self.spec.state = MEASUREMENT self.experiment_queue.refresh_table_needed = True # get current spectrometer values sm = self.spectrometer_manager if sm: self.debug("setting trap, emission, spec, defl, and gains") self._update_persister_spec( spec_dict=sm.make_configuration_dict(), defl_dict=sm.make_deflections_dict(), settings=sm.make_settings(), gains=sm.make_gains_dict(), trap=sm.read_trap_current(), emission=sm.read_emission(), ) self._persister_action("pre_measurement_save") self.measuring = True self._persister_action("trait_set", save_enabled=True) if script.execute(): # mem_log('post measurement execute') self.heading("Measurement Finished") self.measuring = False self.info_color = None self._measured = True return True else: if use_post_on_fail: self.do_post_equilibration() self.do_post_measurement() self.finish() self.heading( "Measurement Finished unsuccessfully. Aborted={}".format(self._aborted), color="red", ) self.measuring = False self.info_color = None return self._aborted def do_post_measurement(self, script=None): if script is None: script = self.post_measurement_script if not script: return True if not self._alive: return msg = "Post Measurement Started {}".format(script.name) self.heading("{}".format(msg)) if script.execute(): self.debug("setting _ms_pumptime") self.executor_event = {"kind": "ms_pumptime_start", "time": time.time()} self.heading("Post Measurement Finished") return True else: self.heading("Post Measurement Finished unsuccessfully") return False _post_equilibration_thread = None def do_post_equilibration(self, block=False): if block: self._post_equilibration() else: t = Thread(target=self._post_equilibration, name="post_equil") # t.setDaemon(True) self._post_equilibration_thread = t t.start() def do_post_termination(self, do_post_equilibration=True): self.heading("Post Termination Started") if do_post_equilibration: self.do_post_equilibration() self.do_post_measurement() self.stop() self.heading("Post Termination Finished") # =============================================================================== # utilities # =============================================================================== def get_current_dac(self): return self.spectrometer_manager.spectrometer.magnet.dac def assemble_report(self): signal_string = "" signals = self.get_baseline_corrected_signals() if signals: signal_string = "\n".join( [ "{} {} {}".format(ai.name, ai.isotope, signals[ai.isotope]) for ai in self._active_detectors ] ) age = "" if self.isotope_group: age = self.isotope_group.age age_string = "age={}".format(age) return """runid={} timestamp={} {} anaylsis_type={} # =============================================================================== # signals # =============================================================================== {} {} """.format( self.runid, self.persister.rundate, self.persister.runtime, self.spec.analysis_type, signal_string, age_string, ) def get_baselines(self): if self.isotope_group: return { iso.name: (iso.detector, iso.baseline.uvalue) for iso in self.isotope_group.values() } # return dict([(iso.name, (iso.detector, iso.baseline.uvalue)) for iso in # self.isotope_group.values()]) def get_baseline_corrected_signals(self): if self.isotope_group: d = dict() for k, iso in self.isotope_group.items(): d[k] = (iso.detector, iso.get_baseline_corrected_value()) return d def setup_context(self, *args, **kw): self._setup_context(*args, **kw) def refresh_scripts(self): self._refresh_scripts() def update_detector_isotope_pairing(self, detectors, isotopes): self.debug("update detector isotope pairing") self.debug("detectors={}".format(detectors)) self.debug("isotopes={}".format(isotopes)) for di in self._active_detectors: di.isotope = "" for di, iso in zip(detectors, isotopes): self.debug("updating pairing {} - {}".format(di, iso)) det = self.get_detector(di) det.isotope = iso # =============================================================================== # private # =============================================================================== def _get_environmentals(self): self.info("getting environmentals") env = {} lclient = self.labspy_client tst = time.time() if lclient: if lclient.connect(): for tag in ("lab_temperatures", "lab_humiditys", "lab_pneumatics"): st = time.time() try: env[tag] = getattr(lclient, "get_latest_{}".format(tag))() self.debug( "Get latest {}. elapsed: {}".format(tag, time.time() - st) ) except BaseException as e: self.debug("Get Labspy Environmentals: {}".format(e)) self.debug_exception() else: self.debug( "failed to connect to labspy client. Could not retrieve environmentals" ) self.debug("Environmentals: {}".format(pformat(env))) else: self.debug("LabspyClient not enabled. Could not retrieve environmentals") self.info( "getting environmentals finished: total duration: {}".format( time.time() - tst ) ) return env def _start(self): # for testing only # self._get_environmentals() if self.isotope_group is None: # load arar_age object for age calculation if self.experiment_type == AR_AR: from pychron.processing.arar_age import ArArAge klass = ArArAge else: from pychron.processing.isotope_group import IsotopeGroup klass = IsotopeGroup self.isotope_group = klass() es = self.extraction_script if es is not None: # get sensitivity multiplier from extraction script v = self._get_yaml_parameter(es, "sensitivity_multiplier", default=1) self.isotope_group.sensitivity_multiplier = v ln = self.spec.labnumber ln = convert_identifier(ln) self.debug( "**************** Experiment Type: {}, {}".format( self.experiment_type, AR_AR ) ) if self.experiment_type == AR_AR: if not self.datahub.load_analysis_backend(ln, self.isotope_group): self.debug("failed load analysis backend") return self.isotope_group.calculate_decay_factors() self.py_clear_conditionals() # setup default/queue conditionals # clear the conditionals for good measure. # conditionals should be cleared during teardown. try: self._add_conditionals() except BaseException as e: self.warning("Failed adding conditionals {}".format(e)) return try: # add queue conditionals self._add_queue_conditionals() except BaseException as e: self.warning("Failed adding queue conditionals. err={}".format(e)) return try: # add default conditionals self._add_system_conditionals() except BaseException as e: self.warning("Failed adding system conditionals. err={}".format(e)) return self.info("Start automated run {}".format(self.runid)) self.measuring = False self.truncated = False self._alive = True if self.plot_panel: self.plot_panel.start() # self.plot_panel.set_analysis_view(self.experiment_type) self.multi_collector.canceled = False self.multi_collector.is_baseline = False self.multi_collector.for_peak_hop = False self._equilibration_done = False # setup the scripts ip = self.spec.script_options if ip: ip = os.path.join(paths.scripts_dir, "options", add_extension(ip, ".yaml")) if self.measurement_script: self.measurement_script.reset(self) # set the interpolation path self.measurement_script.interpolation_path = ip for si in ("extraction", "post_measurement", "post_equilibration"): script = getattr(self, "{}_script".format(si)) if script: self._setup_context(script) script.interpolation_path = ip # load extraction metadata self.eqtime = self._get_extraction_parameter("eqtime", -1) self.time_zero_offset = self.spec.collection_time_zero_offset # setup persister. mirror a few of AutomatedRunsAttributes self.setup_persister() return True def _set_filtering(self): self.debug("Set filtering") def _get_filter_outlier_dict(iso, kind): if kind == "baseline": fods = self.persistence_spec.baseline_fods key = iso.detector else: fods = self.persistence_spec.signal_fods key = iso.name try: fod = fods[key] except KeyError: fod = {"filter_outliers": False, "iterations": 1, "std_devs": 2} return fod for i in self.isotope_group.values(): fod = _get_filter_outlier_dict(i, "signal") self.debug("setting fod for {}= {}".format(i.name, fod)) i.set_filtering(fod) fod = _get_filter_outlier_dict(i, "baseline") i.baseline.set_filtering(fod) self.debug("setting fod for {}= {}".format(i.detector, fod)) def _update_persister_spec(self, **kw): ps = self.persistence_spec for k, v in kw.items(): try: ps.trait_set(**{k: v}) except TraitError as e: self.warning( "failed setting persistence spec attr={}, value={} error={}".format( k, v, e ) ) def _persister_save_action(self, func, *args, **kw): self.debug("persistence save...") if self.use_db_persistence: self.debug("persistence save - db") getattr(self.persister, func)(*args, **kw) if self.use_dvc_persistence: self.debug("persistence save - dvc") getattr(self.dvc_persister, func)(*args, **kw) if self.use_xls_persistence: self.debug("persistence save - xls") getattr(self.xls_persister, func)(*args, **kw) def _persister_action(self, func, *args, **kw): getattr(self.persister, func)(*args, **kw) for i, p in enumerate((self.xls_persister, self.dvc_persister)): if p is None: continue try: getattr(p, func)(*args, **kw) except BaseException as e: self.warning( "{} persister action failed. {} func={}, excp={}".format( i, p.__class__.__name__, func, e ) ) import traceback traceback.print_exc() def _post_equilibration(self): if self._equilibration_done: return self._equilibration_done = True if not self._alive: return if self.post_equilibration_script is None: return msg = "Post Equilibration Started {}".format( self.post_equilibration_script.name ) self.heading("{}".format(msg)) if self.post_equilibration_script.execute(): self.heading("Post Equilibration Finished") else: self.heading("Post Equilibration Finished unsuccessfully") def _generate_ic_mftable(self, detectors, refiso, peak_center_config, n): ret = True from pychron.experiment.ic_mftable_generator import ICMFTableGenerator e = ICMFTableGenerator() if not e.make_mftable(self, detectors, refiso, peak_center_config, n): ret = False return ret def _add_system_conditionals(self): self.debug("add default conditionals") p = get_path(paths.spectrometer_dir, ".*conditionals", (".yaml", ".yml")) if p is not None: self.info("adding default conditionals from {}".format(p)) self._add_conditionals_from_file(p, level=SYSTEM) else: self.warning("no Default Conditionals file. {}".format(p)) def _add_queue_conditionals(self): """ load queue global conditionals (truncations, actions, terminations) """ self.debug("Add queue conditionals") name = self.spec.queue_conditionals_name if test_queue_conditionals_name(name): p = get_path(paths.queue_conditionals_dir, name, (".yaml", ".yml")) if p is not None: self.info("adding queue conditionals from {}".format(p)) self._add_conditionals_from_file(p, level=QUEUE) else: self.warning("Invalid Conditionals file. {}".format(p)) def _add_conditionals_from_file(self, p, level=None): d = conditionals_from_file(p, level=level) for k, v in d.items(): # if k in ('actions', 'truncations', 'terminations', 'cancelations'): var = getattr(self, "{}_conditionals".format(k[:-1])) var.extend(v) def _conditional_appender(self, name, cd, klass, level=None, location=None): if not self.isotope_group: self.warning("No ArArAge to use for conditional testing") return attr = cd.get("attr") if not attr: self.debug("no attr for this {} cd={}".format(name, cd)) return if attr == "age" and self.spec.analysis_type not in ("unknown", "cocktail"): self.debug("not adding because analysis_type not unknown or cocktail") # don't check if isotope_group has the attribute. it may be added to isotope group later obj = getattr(self, "{}_conditionals".format(name)) con = conditional_from_dict(cd, klass, level=level, location=location) if con: self.info( 'adding {} attr="{}" ' 'test="{}" start="{}"'.format( name, con.attr, con.teststr, con.start_count ) ) obj.append(con) else: self.warning("Failed adding {}, {}".format(name, cd)) def _refresh_scripts(self): for name in SCRIPT_KEYS: setattr(self, "{}_script".format(name), self._load_script(name)) def _get_default_fits_file(self): p = self._get_measurement_parameter("default_fits") if p: dfp = os.path.join(paths.fits_dir, add_extension(p, ".yaml")) if os.path.isfile(dfp): return dfp else: self.warning_dialog("Cannot open default fits file: {}".format(dfp)) def _get_default_fits(self, is_baseline=False): """ get name of default fits file from measurement docstr return dict of iso:fit pairs """ dfp = self._get_default_fits_file() if dfp: self.debug("using default fits file={}".format(dfp)) yd = yload(dfp) key = "baseline" if is_baseline else "signal" fd = {yi["name"]: (yi["fit"], yi["error_type"]) for yi in yd[key]} else: self.debug("no default fits file") fd = {} return fd def _get_default_fods(self): def extract_fit_dict(fods, yd): for yi in yd: fod = { "filter_outliers": yi.get("filter_outliers", False), "iterations": yi.get("filter_iterations", 0), "std_devs": yi.get("filter_std_devs", 0), } fods[yi["name"]] = fod sfods, bsfods = {}, {} dfp = self._get_default_fits_file() if dfp: ys = yload(dfp) for fod, key in ((sfods, "signal"), (bsfods, "baseline")): try: extract_fit_dict(fod, ys[key]) except BaseException: self.debug_exception() try: yload(dfp, reraise=True) except BaseException as ye: self.warning( "Failed getting signal from fits file. Please check the syntax. {}".format( ye ) ) return sfods, bsfods def _start_script(self, name): script = getattr(self, "{}_script".format(name)) self.debug("start {}".format(name)) if not self._alive: self.warning("run is not alive") return if not script: self.warning("no {} script".format(name)) return return True def _add_active_detector(self, di): spec = self.spectrometer_manager.spectrometer det = spec.get_detector(di) if det not in self._active_detectors: self._active_detectors.append(det) def _set_active_detectors(self, dets): spec = self.spectrometer_manager.spectrometer return [spec.get_detector(n) for n in dets] def _define_detectors(self, isotope, det): if self.spectrometer_manager: spec = self.spectrometer_manager.spectrometer spec.update_isotopes(isotope, det) def _activate_detectors(self, dets): """ !!! this is a potential problem !!! need more sophisticated way to set up plot panel e.g PP has detectors H1, AX but AX, CDD are active. need to remove H1 and add CDD. or if memory leak not a problem simply always "create" new plots instead of only clearing data. or use both techniques if plot panel detectors != active detectors "create" """ self.debug("activate detectors") create = True # if self.plot_panel is None: # create = True # else: # cd = set([d.name for d in self.plot_panel.detectors]) # ad = set(dets) # create = cd - ad or ad - cd p = self._new_plot_panel(self.plot_panel, stack_order="top_to_bottom") self._active_detectors = self._set_active_detectors(dets) self.spectrometer_manager.spectrometer.active_detectors = self._active_detectors if create: p.create(self._active_detectors) else: p.isotope_graph.clear_plots() self.debug("clear isotope group") self.isotope_group.clear_isotopes() self.isotope_group.clear_error_components() self.isotope_group.clear_blanks() cb = ( False if any(self.spec.analysis_type.startswith(at) for at in NO_BLANK_CORRECT) else True ) for d in self._active_detectors: self.debug("setting isotope det={}, iso={}".format(d.name, d.isotope)) self.isotope_group.set_isotope( d.isotope, d.name, (0, 0), correct_for_blank=cb ) self._load_previous() # self.debug('load analysis view') # p.analysis_view.load(self) self.plot_panel = p def _load_previous(self): if not self.spec.analysis_type.startswith( "blank" ) and not self.spec.analysis_type.startswith("background"): runid, blanks = self.previous_blanks self.debug("setting previous blanks") for iso, v in blanks.items(): self.isotope_group.set_blank(iso, v[0], v[1]) self._update_persister_spec( previous_blanks=blanks, previous_blank_runid=runid ) self.isotope_group.clear_baselines() baselines = self.previous_baselines for iso, v in baselines.items(): self.isotope_group.set_baseline(iso, v[0], v[1]) def _add_conditionals(self): t = self.spec.conditionals self.debug("adding conditionals {}".format(t)) if t: p = os.path.join(paths.conditionals_dir, add_extension(t, ".yaml")) if os.path.isfile(p): self.debug("extract conditionals from file. {}".format(p)) yd = yload(p) failure = False for kind, items in yd.items(): try: okind = kind if kind.endswith("s"): kind = kind[:-1] if kind == "modification": klass_name = "QueueModification" elif kind in ("pre_run_termination", "post_run_termination"): continue else: klass_name = kind.capitalize() mod = "pychron.experiment.conditional.conditional" mod = importlib.import_module(mod) klass = getattr(mod, "{}Conditional".format(klass_name)) except (ImportError, AttributeError): self.critical( 'Invalid conditional kind="{}", klass_name="{}"'.format( okind, klass_name ) ) continue for cd in items: try: self._conditional_appender(kind, cd, klass, location=p) except BaseException as e: self.debug( 'Failed adding {}. excp="{}", cd={}'.format(kind, e, cd) ) failure = True if failure: if not self.confirmation_dialog( "Failed to add Conditionals. Would you like to continue?" ): self.cancel_run(do_post_equilibration=False) else: try: c, start = t.split(",") pat = "<=|>=|[<>=]" attr = re.split(pat, c)[0] freq = 1 acr = 0.5 except Exception as e: self.debug("conditionals parse failed {} {}".format(e, t)) return self.py_add_truncation( attr=attr, teststr=c, start_count=int(start), frequency=freq, abbreviated_count_ratio=acr, ) def _get_measurement_parameter(self, key, default=None): return self._get_yaml_parameter(self.measurement_script, key, default) def _get_extraction_parameter(self, key, default=None): return self._get_yaml_parameter(self.extraction_script, key, default) def _new_plot_panel(self, plot_panel, stack_order="bottom_to_top"): title = self.runid sample, irradiation = self.spec.sample, self.spec.display_irradiation if sample: title = "{} {}".format(title, sample) if irradiation: title = "{} {}".format(title, irradiation) # if plot_panel is None: # from pychron.experiment.plot_panel import PlotPanel plot_panel = PlotPanel( stack_order=stack_order, info_func=self.info, isotope_group=self.isotope_group, ) self.debug("*************** Set Analysis View {}".format(self.experiment_type)) plot_panel.set_analysis_view( self.experiment_type, analysis_type=self.spec.analysis_type, analysis_id=self.runid, ) # an = plot_panel.analysis_view # an.load(self) plot_panel.trait_set(plot_title=title) return plot_panel def _convert_valve(self, valve): if isinstance(valve, int): valve = str(valve) if valve and not isinstance(valve, (tuple, list)): if "," in valve: valve = [v.strip() for v in valve.split(",")] else: valve = (valve,) return valve def _equilibrate( self, evt, eqtime=15, inlet=None, outlet=None, delay=3, do_post_equilibration=True, close_inlet=True, ): inlet = self._convert_valve(inlet) elm = self.extraction_line_manager # delay for eq time self.info("equilibrating for {}sec".format(eqtime)) time.sleep(eqtime) if self._alive: # analyze the equilibration try: self._analyze_equilibration() except BaseException as e: self.debug( "AutomatedRun._equilibrate _analyze_equilibration error. Exception={}".format( e ) ) self.heading("Equilibration Finished") if elm and inlet and close_inlet: for i in inlet: elm.close_valve(i, mode="script") if do_post_equilibration: self.do_post_equilibration() if self.overlap_evt: self.debug("setting overlap event. next run ok to start extraction") self.overlap_evt.set() def _analyze_equilibration(self): if self.use_equilibration_analysis and self.plot_panel: g = self.plot_panel.sniff_graph xmi, xma = g.get_x_limits() xma *= 1.25 g.set_x_limits(xmi, xma) fxs = linspace(xmi, xma) for i, p in enumerate(g.plots): try: xs = g.get_data(i) except IndexError: continue ys = g.get_data(i, axis=1) if ys is None: continue for ni, color, yoff in ( (5, "red", 30), (4, "green", 10), (3, "blue", -10), (2, "orange", -30), ): xsi, ysi = xs[-ni:], ys[-ni:] g.new_series( xsi, ysi, type="scatter", plotid=i, color=color, marker_size=2.5 ) coeffs = polyfit(xsi, ysi, 1) fys = polyval(coeffs, fxs) g.new_series(fxs, fys, type="line", plotid=i, color=color) txt = "Slope ({})={:0.3f}".format(ni, coeffs[0]) g.add_plot_label( txt, plotid=i, overlay_position="inside right", font="modern 14", bgcolor="white", color=color, y_offset=yoff, ) g.redraw() def _update_labels(self): self.debug("update labels {}".format(self.plot_panel)) if self.plot_panel: for g in (self.plot_panel.isotope_graph, self.plot_panel.sniff_graph): if g: self.debug('update labels "{}"'.format(g)) # update the plot_panel labels plots = g.plots n = len(plots) names = [] multiples = [] for i, det in enumerate(self._active_detectors): if i < n: name = det.isotope if name in names: multiples.append(name) name = "{}{}".format(name, det.name) plots[i].y_axis.title = name self.debug( "setting label {} {} {}".format(i, det.name, name) ) names.append(name) for i, det in enumerate(self._active_detectors): if i < n: name = det.isotope if name in multiples: self.debug( "second setting label {} {} {}".format( i, det.name, name ) ) plots[i].y_axis.title = "{}{}".format(name, det.name) g.refresh() def _update_detectors(self): for det in self._active_detectors: self.isotope_group.set_isotope_detector(det) for det in self._active_detectors: self.isotope_group.set_isotope_detector(det, add=True) self._load_previous() def _set_hv_position( self, pos, detector, update_detectors=True, update_labels=True, update_isotopes=True, ): ion = self.ion_optics_manager if ion is not None: change = ion.hv_position(pos, detector, update_isotopes=update_isotopes) def _set_magnet_position( self, pos, detector, use_dac=False, update_detectors=True, update_labels=True, update_isotopes=True, remove_non_active=True, for_collection=True, ): change = False ion = self.ion_optics_manager if ion is not None: change = ion.position( pos, detector, use_dac=use_dac, update_isotopes=update_isotopes ) if for_collection: if update_labels: self._update_labels() if update_detectors: self._update_detectors() if remove_non_active: keys = list(self.isotope_group.keys()) for k in keys: iso = self.isotope_group.isotopes[k] det = next( (di for di in self._active_detectors if di.isotope == iso.name), None, ) if det is None: self.isotope_group.pop(k) def key(v): return v[1].name def key2(v): return v[1].detector self.debug("per cleaned {}".format(keys)) for name, items in groupby_key(self.isotope_group.items(), key): items = list(items) if len(items) > 1: for det, items in groupby_key(items, key2): items = list(items) if len(items) > 1: for k, v in items: if v.name == k: self.isotope_group.isotopes.pop(k) self.debug("cleaned isotope group {}".format(keys)) if self.plot_panel: self.debug("load analysis view") self.plot_panel.analysis_view.load(self) # self.plot_panel.analysis_view.refresh_needed = True return change def _data_generator(self): cnt = 0 fcnt = self.failed_intensity_count_threshold spec = self.spectrometer_manager.spectrometer self._intensities = {} while 1: try: k, s, t, inc = spec.get_intensities(tagged=True, trigger=False) except NoIntensityChange: self.warning( "Canceling Run. Intensity from mass spectrometer not changing" ) try: self.info("Saving run. Analysis did not complete successfully") self.save() except BaseException: self.warning("Failed to save run") self.cancel_run(state=FAILED) yield None if not k: cnt += 1 self.info( "Failed getting intensity from mass spectrometer {}/{}".format( cnt, fcnt ) ) if cnt >= fcnt: try: self.info("Saving run. Analysis did not complete successfully") self.save() except BaseException: self.warning("Failed to save run") self.warning( "Canceling Run. Failed getting intensity from mass spectrometer" ) # do we need to cancel the experiment or will the subsequent pre run # checks sufficient to catch spectrometer communication errors. self.cancel_run(state=FAILED) yield None else: yield None, None, None, False else: # reset the counter cnt = 0 if self.intensity_scalar: s = [si * self.intensity_scalar for si in s] self._intensities["tags"] = k self._intensities["signals"] = s yield k, s, t, inc # return gen() def _whiff( self, ncounts, conditionals, starttime, starttime_offset, series, fit_series ): """ conditionals: list of dicts """ for ci in conditionals: if ci.get("start") is None: ci["start"] = ncounts conds = [conditional_from_dict(ci, ActionConditional) for ci in conditionals] self.isotope_group.conditional_modifier = "whiff" self.collector.set_temporary_conditionals(conds) self.py_data_collection( None, ncounts, starttime, starttime_offset, series, fit_series, group="whiff", ) self.collector.clear_temporary_conditionals() self.isotope_group.conditional_modifier = None result = self.collector.measurement_result self._update_persister_spec(whiff_result=result) self.debug("WHIFF Result={}".format(result)) return result def _peak_hop( self, ncycles, ncounts, hops, grpname, starttime, starttime_offset, series, check_conditionals, ): """ ncycles: int hops: list of tuples hop = 'Isotope:Det[,Isotope:Det,...]', Count, Settling Time(s) ex. hop = 'Ar40:H1,Ar36:CDD', 10, 1 """ self.peak_hop_collector.trait_set(ncycles=ncycles) self.peak_hop_collector.set_hops(hops) self.persister.build_peak_hop_tables(grpname, hops) data_writer = self.persister.get_data_writer(grpname) return self._measure( grpname, data_writer, ncounts, starttime, starttime_offset, series, check_conditionals, self.signal_color, ) def _sniff(self, ncounts, starttime, starttime_offset, series): self.debug("py_sniff") if not self._alive: return p = self.plot_panel if p: p._ncounts = ncounts p.is_baseline = False self.plot_panel.show_sniff_graph() gn = "sniff" self.persister.build_tables(gn, self._active_detectors, ncounts) # mem_log('build tables') check_conditionals = True writer = self.persister.get_data_writer(gn) result = self._measure( gn, writer, ncounts, starttime, starttime_offset, series, check_conditionals, self.sniff_color, ) return result def _measure( self, grpname, data_writer, ncounts, starttime, starttime_offset, series, check_conditionals, color, script=None, ): if script is None: script = self.measurement_script # mem_log('pre measure') if not self.spectrometer_manager: self.warning("no spectrometer manager") return True self.info( "measuring {}. ncounts={}".format(grpname, ncounts), color=MEASUREMENT_COLOR ) if globalv.experiment_debug: period = 1 else: period = self.spectrometer_manager.spectrometer.get_update_period( it=self._integration_seconds ) m = self.collector m.trait_set( measurement_script=script, detectors=self._active_detectors, collection_kind=grpname, series_idx=series, check_conditionals=check_conditionals, ncounts=ncounts, period_ms=period * 1000, data_generator=self._data_generator(), data_writer=data_writer, experiment_type=self.experiment_type, refresh_age=self.spec.analysis_type in ("unknown", "cocktail"), ) m.set_starttime(starttime) if hasattr(self.spectrometer_manager.spectrometer, "trigger_acq"): m.trait_set(trigger=self.spectrometer_manager.spectrometer.trigger_acq) if self.plot_panel: self.plot_panel.integration_time = self._integration_seconds self.plot_panel.set_ncounts(ncounts) if grpname == "sniff": self._setup_isotope_graph(starttime_offset, color, grpname) self._setup_sniff_graph(starttime_offset, color) elif grpname == "baseline": self._setup_baseline_graph(starttime_offset, color) else: self._setup_isotope_graph(starttime_offset, color, grpname) # time.sleep(0.5) with self.persister.writer_ctx(): m.measure() # mem_log('post measure') if m.terminated: self.debug("measurement terminated") self.cancel_run(state="terminated") if m.canceled: self.debug("measurement collection canceled") self.cancel_run() self.executor_event = { "kind": "cancel", "confirm": False, "err": m.err_message, } return not m.canceled def _get_plot_id_by_ytitle(self, graph, pair, iso=None): """ pair is string in form <Iso><Det>, iso is just <Iso> :param graph: :param pair: :param secondary: :return: """ idx = graph.get_plotid_by_ytitle(pair) if idx is None and iso: if not isinstance(iso, str): iso = iso.name idx = graph.get_plotid_by_ytitle(iso) return idx def _update_limits(self, graph, starttime_offset): # update limits mi, ma = graph.get_x_limits() max_ = ma min_ = mi tc = self.plot_panel.total_seconds if tc > ma or ma == Inf: max_ = tc if starttime_offset > mi: min_ = -starttime_offset graph.set_x_limits(min_=min_, max_=max_ * 1.25) def _setup_baseline_graph(self, starttime_offset, color): graph = self.plot_panel.baseline_graph self._update_limits(graph, starttime_offset) for det in self._active_detectors: idx = graph.get_plotid_by_ytitle(det.name) if idx is not None: try: graph.series[idx][0] except IndexError as e: graph.new_series( marker="circle", color=color, type="scatter", marker_size=1.25, fit="linear", plotid=idx, use_error_envelope=False, add_inspector=False, add_tools=False, ) def _setup_sniff_graph(self, starttime_offset, color): graph = self.plot_panel.sniff_graph self._update_limits(graph, starttime_offset) series = self.collector.series_idx for k, iso in self.isotope_group.items(): idx = self._get_plot_id_by_ytitle(graph, k, iso) if idx is not None: try: graph.series[idx][series] except IndexError as e: graph.new_series( marker="circle", color=color, type="scatter", marker_size=1.25, fit=None, plotid=idx, use_error_envelope=False, add_inspector=False, add_tools=False, ) def _setup_isotope_graph(self, starttime_offset, color, grpname): """ execute in main thread is necessary. set the graph limits and construct the necessary series set 0-count fits """ graph = self.plot_panel.isotope_graph self._update_limits(graph, starttime_offset) regressing = grpname != "sniff" series = self.collector.series_idx for k, iso in self.isotope_group.items(): idx = self._get_plot_id_by_ytitle(graph, k, iso) if idx is not None: try: graph.series[idx][series] except IndexError as e: fit = None if grpname == "sniff" else iso.get_fit(0) graph.new_series( marker="circle", color=color, type="scatter", marker_size=1.25, fit=fit, plotid=idx, use_error_envelope=False, add_inspector=False, add_tools=False, ) if regressing: graph.set_regressor(iso.regressor, idx) scnt, fcnt = (2, 1) if regressing else (1, 0) self.debug( '"{}" increment series count="{}" ' 'fit count="{}" regressing="{}"'.format(grpname, scnt, fcnt, regressing) ) self.measurement_script.increment_series_counts(scnt, fcnt) def _wait_for(self, predicate, msg): st = time.time() i = 0 while self._alive: time.sleep(1.0) et = time.time() - st if predicate(et): break if i % 5 == 0: self.debug(msg(et)) i = 0 i += 1 def _wait_for_min_ms_pumptime(self): overlap, mp = self.spec.overlap pt = self.min_ms_pumptime if not overlap: self.debug("no overlap. not waiting for min ms pumptime") return if self.is_first: self.debug("this is the first run. not waiting for min ms pumptime") return if not mp: self.debug("using default min ms pumptime={}".format(pt)) mp = pt # ensure mim mass spectrometer pump time # wait until pumping started self.debug("wait for mass spec pump out to start") self._wait_for( lambda x: self.ms_pumptime_start is not None, lambda x: "waiting for mass spec pumptime to start {:0.2f}".format(x), ) # wait for min pump time self.debug("mass spec pump out to started") self._wait_for( lambda x: self.elapsed_ms_pumptime > mp, lambda x: "waiting for min mass spec pumptime {}, elapse={:0.2f}".format( mp, x ), ) self.debug("min pumptime elapsed {} {}".format(mp, self.elapsed_ms_pumptime)) # =============================================================================== # scripts # =============================================================================== def _load_script(self, name): script = None sname = getattr(self.script_info, "{}_script_name".format(name)) if sname and sname != NULL_STR: sname = self._make_script_name(sname) script = self._bootstrap_script(sname, name) return script def _bootstrap_script(self, fname, name): # global SCRIPTS global WARNED_SCRIPTS def warn(fn, e): self.spec.executable = False if fn not in WARNED_SCRIPTS: WARNED_SCRIPTS.append(fn) self.warning_dialog("Invalid Script {}\n{}".format(fn, e)) self.debug('loading script "{}"'.format(fname)) func = getattr(self, "_{}_script_factory".format(name)) s = func() if s and os.path.isfile(s.filename): if s.bootstrap(): s.set_default_context() else: fname = s.filename if s else fname e = "Not a file" warn(fname, e) return s def _measurement_script_factory(self): from pychron.pyscripts.measurement_pyscript import MeasurementPyScript sname = self.script_info.measurement_script_name sname = self._make_script_name(sname) klass = MeasurementPyScript if isinstance(self.spectrometer_manager, ThermoSpectrometerManager): from pychron.pyscripts.measurement.thermo_measurement_pyscript import ( ThermoMeasurementPyScript, ) klass = ThermoMeasurementPyScript elif isinstance(self.spectrometer_manager, NGXSpectrometerManager): from pychron.pyscripts.measurement.ngx_measurement_pyscript import ( NGXMeasurementPyScript, ) klass = NGXMeasurementPyScript elif isinstance(self.spectrometer_manager, QuaderaSpectrometerManager): from pychron.pyscripts.measurement.quadera_measurement_pyscript import ( QuaderaMeasurementPyScript, ) klass = QuaderaMeasurementPyScript ms = klass( root=paths.measurement_dir, name=sname, automated_run=self, runner=self.runner, ) return ms def _extraction_script_factory(self, klass=None): ext = self._ext_factory( paths.extraction_dir, self.script_info.extraction_script_name, klass=klass ) if ext is not None: ext.automated_run = self return ext def _post_measurement_script_factory(self): return self._ext_factory( paths.post_measurement_dir, self.script_info.post_measurement_script_name ) def _post_equilibration_script_factory(self): return self._ext_factory( paths.post_equilibration_dir, self.script_info.post_equilibration_script_name, ) def _ext_factory(self, root, file_name, klass=None): file_name = self._make_script_name(file_name) if os.path.isfile(os.path.join(root, file_name)): if klass is None: from pychron.pyscripts.extraction_line_pyscript import ( ExtractionPyScript, ) klass = ExtractionPyScript obj = klass(root=root, name=file_name, runner=self.runner) return obj def _make_script_name(self, name): name = "{}_{}".format(self.spec.mass_spectrometer.lower(), name) return add_extension(name, ".py") def _setup_context(self, script): """ setup_context to expose variables to the pyscript """ ctx = self.spec.make_script_context() script.setup_context(is_last=self.is_last, **ctx) def _get_yaml_parameter(self, script, key, default): if not script: return default m = ast.parse(script.text) docstr = ast.get_docstring(m) if docstr: docstr = docstr.strip() # self.debug('{} {} metadata\n{}'.format(script.name, key, docstr)) try: params = yload(docstr) return params[key] except KeyError: self.warning('No value "{}" in metadata'.format(key)) except TypeError: self.warning( 'Invalid yaml docstring in "{}". Could not retrieve "{}"'.format( script.name, key ) ) else: self.debug( 'No metadata section in "{}". Using default "{}" value for "{}"'.format( script.name, default, key ) ) return default def _get_collector(self): c = self.peak_hop_collector if self.is_peak_hop else self.multi_collector return c def _assemble_extraction_blob(self): _names, txt = self._assemble_script_blob( kinds=(EXTRACTION, POST_EQUILIBRATION, POST_MEASUREMENT) ) return txt def _assemble_script_blob(self, kinds=None): if kinds is None: kinds = SCRIPT_KEYS okinds = [] bs = [] for s in kinds: sc = getattr(self, "{}_script".format(s)) if sc is not None: bs.append((sc.name, sc.toblob())) okinds.append(s) return assemble_script_blob(bs, kinds=okinds) # =============================================================================== # handlers # =============================================================================== def _runner_changed(self, new): self.debug("Runner runner:{}".format(new)) for s in SCRIPT_NAMES: sc = getattr(self, s) if sc is not None: setattr(sc, "runner", new) # =============================================================================== # defaults # =============================================================================== def _peak_hop_collector_default(self): from pychron.experiment.automated_run.peak_hop_collector import PeakHopCollector c = PeakHopCollector(console_display=self.console_display, automated_run=self) return c def _multi_collector_default(self): from pychron.experiment.automated_run.multi_collector import MultiCollector c = MultiCollector(console_display=self.console_display, automated_run=self) return c # =============================================================================== # property get/set # =============================================================================== @property def elapsed_ms_pumptime(self): return time.time() - self.ms_pumptime_start
# ============= EOF =============================================