# ===============================================================================
# Copyright 2013 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
from __future__ import absolute_import
from traits.api import List, Int, Instance
from pychron.core.helpers.color_generators import colornames
from pychron.experiment.automated_run.data_collector import DataCollector
from pychron.experiment.automated_run.hop_util import generate_hops
from six.moves import zip
[docs]class PeakHopCollector(DataCollector):
"""
Collector class for doing a peak hop measurement. Measure one or more intensities at given mass for ncounts then
jump magnet to next new mass.
"""
hops = List
settling_time = 0
ncycles = Int
hop_generator = None
_was_deflected = False
_detectors = None
def set_hops(self, hops):
self.hops = hops
self.debug('make new hop generatior')
self.hop_generator = generate_hops(self.hops)
def _pre_trigger_hook(self):
args = self._do_hop()
if args:
is_baseline, dets, isos = args
self._detectors = dets
return True
def _iter_hook(self, i):
return self._iteration(i, detectors=self._detectors)
# args = self._do_hop()
# if args:
# is_baseline, dets, isos = args
# if not is_baseline:
# return self._iteration(i, detectors=dets)
def _do_hop(self):
"""
is it time for a magnet move
"""
# from pychron.core.ui.gui import invoke_in_main_thread
hop = next(self.hop_generator)
hop_idx = hop['idx']
cycle = hop['cycle']
is_baseline = hop['is_baseline']
dets = hop['detectors']
isos = hop['isotopes']
defls = hop['deflections']
settle = hop['settle']
count = hop['count']
pdets = hop['protect_detectors']
active_dets = hop['active_detectors']
current_color = colornames[hop_idx]
use_dac = False
positioning = hop['positioning']
if positioning:
if 'dac' in positioning:
use_dac = True
isotope = positioning['dac']
detector = ''
else:
detector = positioning['detector']
isotope = positioning['isotope']
else:
detector = active_dets[0]
isotope = isos[0]
if count == 0:
self.debug('$$$$$$$$$$$$$$$$$ SETTING is_baseline {}'.format(is_baseline))
arun = self.automated_run
if is_baseline:
arun.is_peak_hop = False
# remember original settings. return to these values after baseline finished
ocounts = self.measurement_script.ncounts
arun.measurement_script.increment_series_count(2, 1)
ocycles = self.plot_panel.ncycles
pocounts = self.plot_panel.ncounts
self.debug('START BASELINE MEASUREMENT {} {}'.format(isotope, detector))
arun.measurement_script.baselines(count, mass=isotope, detector=detector)
self.debug('BASELINE MEASUREMENT COMPLETE')
arun.measurement_script.increment_series_count(-2, -1)
change = arun.set_magnet_position(isotope, detector,
use_dac=use_dac,
update_detectors=False, update_labels=False,
update_isotopes=True,
remove_non_active=False)
if change:
msg = 'delaying {} for detectors to settle after peak hop'.format(settle)
arun.wait(settle, msg)
self.debug(msg)
self.plot_panel._ncounts = pocounts
self.measurement_script.ncounts = ocounts
self.plot_panel.ncycles = ocycles
arun.plot_panel.is_peak_hop = True
arun.is_peak_hop = True
else:
# self.debug('c={} pc={} nc={}'.format(cycle, self.plot_panel.ncycles, self.ncycles))
if self.plot_panel.ncycles != self.ncycles:
if cycle >= self.plot_panel.ncycles:
self.info(
'user termination. measurement iteration executed {}/{} cycles'.format(cycle, self.ncycles))
self.stop()
return
elif cycle >= self.ncycles:
return
if count == 0:
zd = list(zip(dets, defls))
self.debug('Peak hop Detectors={}'.format(dets))
self.debug('Peak hop Deflections={}'.format(defls))
self.debug('Peak hop DeflectionsPairs={}'.format(zd))
# set deflections
# only set deflections deflections were changed or need changing
deflect = len([d for d in defls if d is not None])
if deflect or self._was_deflected:
self._was_deflected = False
for det, defl in zd:
# use the measurement script to set the deflections
# this way defaults from the config can be used
if defl is not None:
self._was_deflected = True
arun.set_deflection(det, defl)
self._protect_detectors(pdets)
self.debug('----------------------- HOP {} {}'.format(isotope, detector))
change = arun.set_magnet_position(isotope, detector,
update_detectors=False,
update_labels=False,
update_isotopes=False,
# update_isotopes=not is_baseline,
remove_non_active=False)
self._protect_detectors(pdets, False)
arun.update_detector_isotope_pairing(active_dets, isos)
if change:
g = self.plot_panel.isotope_graph
if hop_idx:
for d in active_dets:
det = arun.get_detector(d)
plot = g.get_plot_by_ytitle('{}{}'.format(det.isotope, det.name))
if not plot:
plot = g.get_plot_by_ytitle(det.isotope)
if plot:
scatter = plot.plots['data{}'.format(self.fit_series_idx)][0]
scatter.color = current_color
scatter.outline_color = current_color
else:
self.debug('could not locate det={} iso={}'.format(d, det.isotope))
try:
arun.plot_panel.counts += int(settle)
except AttributeError:
pass
msg = 'delaying {} for detectors to settle after peak hop'.format(settle)
arun.wait(settle, msg)
self.debug(msg)
# self.debug('cycle {} count {} {}'.format(cycle, count, id(self)))
if self.plot_panel.is_baseline:
isotope = '{}bs'.format(isotope)
dac = arun.get_current_dac()
# invoke_in_main_thread(self.plot_panel.trait_set,
# current_cycle='{}({:0.6f}) - {} cyc={} cnt={}'.format(isotope, dac, detector,
# cycle + 1, count + 1),
# current_color=current_color)
self.plot_panel.trait_set(current_cycle='{}({:0.6f}) - {} cyc={} cnt={}'.format(isotope, dac, detector,
cycle + 1, count + 1),
current_color=current_color)
return is_baseline, active_dets, isos
def _protect_detectors(self, pdets, protect=True):
for pd in pdets:
self.automated_run.protect_detector(pd, protect)
# ============= EOF =============================================