Source code for pychron.experiment.automated_run.peak_hop_collector

# ===============================================================================
# Copyright 2013 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
from __future__ import absolute_import
from traits.api import List, Int, Instance

from pychron.core.helpers.color_generators import colornames
from pychron.experiment.automated_run.data_collector import DataCollector
from pychron.experiment.automated_run.hop_util import generate_hops
from six.moves import zip


[docs]class PeakHopCollector(DataCollector): """ Collector class for doing a peak hop measurement. Measure one or more intensities at given mass for ncounts then jump magnet to next new mass. """ hops = List settling_time = 0 ncycles = Int hop_generator = None _was_deflected = False _detectors = None def set_hops(self, hops): self.hops = hops self.debug("make new hop generatior") self.hop_generator = generate_hops(self.hops) def _pre_trigger_hook(self): args = self._do_hop() if args: is_baseline, dets, isos = args self._detectors = dets return True def _iter_hook(self, i): return self._iteration(i, detectors=self._detectors) # args = self._do_hop() # # if args: # is_baseline, dets, isos = args # if not is_baseline: # return self._iteration(i, detectors=dets) def _do_hop(self): """ is it time for a magnet move """ # from pychron.core.ui.gui import invoke_in_main_thread hop = next(self.hop_generator) hop_idx = hop["idx"] cycle = hop["cycle"] is_baseline = hop["is_baseline"] dets = hop["detectors"] isos = hop["isotopes"] defls = hop["deflections"] settle = hop["settle"] count = hop["count"] pdets = hop["protect_detectors"] active_dets = hop["active_detectors"] current_color = colornames[hop_idx] use_dac = False positioning = hop["positioning"] if positioning: if "dac" in positioning: use_dac = True isotope = positioning["dac"] detector = "" else: detector = positioning["detector"] isotope = positioning["isotope"] else: detector = active_dets[0] isotope = isos[0] if count == 0: self.debug("$$$$$$$$$$$$$$$$$ SETTING is_baseline {}".format(is_baseline)) arun = self.automated_run if is_baseline: arun.is_peak_hop = False # remember original settings. return to these values after baseline finished ocounts = self.measurement_script.ncounts arun.measurement_script.increment_series_count(2, 1) ocycles = self.plot_panel.ncycles pocounts = self.plot_panel.ncounts self.debug("START BASELINE MEASUREMENT {} {}".format(isotope, detector)) arun.measurement_script.baselines(count, mass=isotope, detector=detector) self.debug("BASELINE MEASUREMENT COMPLETE") arun.measurement_script.increment_series_count(-2, -1) change = arun.set_magnet_position( isotope, detector, use_dac=use_dac, update_detectors=False, update_labels=False, update_isotopes=True, remove_non_active=False, ) if change: msg = "delaying {} for detectors to settle after peak hop".format( settle ) arun.wait(settle, msg) self.debug(msg) self.plot_panel._ncounts = pocounts self.measurement_script.ncounts = ocounts self.plot_panel.ncycles = ocycles arun.plot_panel.is_peak_hop = True arun.is_peak_hop = True else: # self.debug('c={} pc={} nc={}'.format(cycle, self.plot_panel.ncycles, self.ncycles)) if self.plot_panel.ncycles != self.ncycles: if cycle >= self.plot_panel.ncycles: self.info( "user termination. measurement iteration executed {}/{} cycles".format( cycle, self.ncycles ) ) self.stop() return elif cycle >= self.ncycles: return if count == 0: zd = list(zip(dets, defls)) self.debug("Peak hop Detectors={}".format(dets)) self.debug("Peak hop Deflections={}".format(defls)) self.debug("Peak hop DeflectionsPairs={}".format(zd)) # set deflections # only set deflections deflections were changed or need changing deflect = len([d for d in defls if d is not None]) if deflect or self._was_deflected: self._was_deflected = False for det, defl in zd: # use the measurement script to set the deflections # this way defaults from the config can be used if defl is not None: self._was_deflected = True arun.set_deflection(det, defl) self._protect_detectors(pdets) self.debug( "----------------------- HOP {} {}".format(isotope, detector) ) change = arun.set_magnet_position( isotope, detector, update_detectors=False, update_labels=False, update_isotopes=False, # update_isotopes=not is_baseline, remove_non_active=False, ) self._protect_detectors(pdets, False) arun.update_detector_isotope_pairing(active_dets, isos) if change: g = self.plot_panel.isotope_graph if hop_idx: for d in active_dets: det = arun.get_detector(d) plot = g.get_plot_by_ytitle( "{}{}".format(det.isotope, det.name) ) if not plot: plot = g.get_plot_by_ytitle(det.isotope) if plot: scatter = plot.plots[ "data{}".format(self.fit_series_idx) ][0] scatter.color = current_color scatter.outline_color = current_color else: self.debug( "could not locate det={} iso={}".format( d, det.isotope ) ) try: arun.plot_panel.counts += int(settle) except AttributeError: pass msg = "delaying {} for detectors to settle after peak hop".format( settle ) arun.wait(settle, msg) self.debug(msg) # self.debug('cycle {} count {} {}'.format(cycle, count, id(self))) if self.plot_panel.is_baseline: isotope = "{}bs".format(isotope) dac = arun.get_current_dac() # invoke_in_main_thread(self.plot_panel.trait_set, # current_cycle='{}({:0.6f}) - {} cyc={} cnt={}'.format(isotope, dac, detector, # cycle + 1, count + 1), # current_color=current_color) self.plot_panel.trait_set( current_cycle="{}({:0.6f}) - {} cyc={} cnt={}".format( isotope, dac, detector, cycle + 1, count + 1 ), current_color=current_color, ) return is_baseline, active_dets, isos def _protect_detectors(self, pdets, protect=True): for pd in pdets: self.automated_run.protect_detector(pd, protect)
# ============= EOF =============================================