# ===============================================================================
# Copyright 2012 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
# ============= standard library imports ========================
from __future__ import absolute_import
from __future__ import print_function
import ast
import os
import time
import yaml
from six.moves.configparser import ConfigParser
from pychron.core.helpers.filetools import fileiter
from pychron.paths import paths
from pychron.pychron_constants import MEASUREMENT_COLOR
from pychron.pyscripts.pyscript import verbose_skip, count_verbose_skip, \
makeRegistry, CTXObject
from pychron.pyscripts.valve_pyscript import ValvePyScript
from pychron.spectrometer import get_spectrometer_config_path, set_spectrometer_config_name
ESTIMATED_DURATION_FF = 1.0
command_register = makeRegistry()
class MeasurementCTXObject(object):
def create(self, yd):
for k in ('baseline', 'multicollect', 'peakcenter', 'equilibration', 'whiff', 'peakhop'):
try:
c = CTXObject()
c.update(yd[k])
setattr(self, k, c)
except KeyError:
pass
[docs]class MeasurementPyScript(ValvePyScript):
"""
MeasurementPyScripts are used to collect isotopic data
"""
automated_run = None
ncounts = 0
info_color = MEASUREMENT_COLOR
abbreviated_count_ratio = None
_time_zero = None
_time_zero_offset = 0
_series_count = 0
_baseline_series = None
_fit_series_count = 0
def gosub(self, *args, **kw):
kw['automated_run'] = self.automated_run
s = super(MeasurementPyScript, self).gosub(*args, **kw)
if s:
s.automated_run = None
return s
[docs] def reset(self, arun):
"""
Reset the script with a new automated run
:param arun: A new ``AutomatedRun``
:type arun: ``AutomatedRun``
"""
self.debug('%%%%%%%%%%%%%%%%%% setting automated run {}'.format(arun.runid))
self.automated_run = arun
self._reset()
def get_command_register(self):
cs = super(MeasurementPyScript, self).get_command_register()
return cs + list(command_register.commands.items())
def truncate(self, style=None):
if style == 'quick':
self.abbreviated_count_ratio = 0.25
super(MeasurementPyScript, self).truncate(style=style)
def get_variables(self):
return ['truncated', 'eqtime', 'use_cdd_warming']
def increment_series_counts(self, s, f):
self._series_count += s
self._fit_series_count += f
def reset_series(self):
self._series_count = 0
self._fit_series_count = 0
# ===============================================================================
# commands
# ===============================================================================
@command_register
def measurement_delay(self, duration=None, message=None):
if duration:
try:
self.automated_run.plot_panel.total_counts += round(duration)
except AttributeError:
pass
self.sleep(duration, message=message)
[docs] @verbose_skip
@command_register
def generate_ic_mftable(self, detectors, refiso='Ar40', peak_center_config='', n=1, calc_time=False):
"""
Generate an IC MFTable. Use this when doing a Detector Intercalibration.
peak centers the ``refiso`` on a list of ``detectors``. MFTable saved as ic_mftable
cancel script if generating mftable fails
:param detectors: list of detectors to peak center
:type detectors: list
:param refiso: isotope to peak center
:type refiso: str
"""
if calc_time:
self._estimated_duration += (len(detectors) * 30)*n
return
if not self._automated_run_call('py_generate_ic_mftable', detectors, refiso, peak_center_config, n):
self.cancel()
@verbose_skip
@command_register
def extraction_gosub(self, *args, **kw):
kw['klass'] = 'ExtractionPyScript'
super(MeasurementPyScript, self).gosub(*args, **kw)
@count_verbose_skip
@command_register
def measure_equilibration(self, *args, **kw):
self.sniff(*args, **kw)
@count_verbose_skip
@command_register
def sniff(self, ncounts=0, calc_time=False, integration_time=1.04, block=True):
"""
collect a sniff measurement. Sniffs are the measurement of the equilibration gas.
:param ncounts: Number of counts
:type ncounts: int
:param integration_time: integration time in seconds
:type integration_time: float
:param block: Is this call blocking or should it return immediately
:type block: bool
"""
if calc_time:
self._estimated_duration += (ncounts * integration_time * ESTIMATED_DURATION_FF)
return
self.ncounts = ncounts
if not self._automated_run_call('py_sniff', ncounts,
self._time_zero, self._time_zero_offset,
series=self._series_count, block=block):
self.cancel()
@count_verbose_skip
@command_register
def multicollect(self, ncounts=200, integration_time=1.04, calc_time=False):
"""
Do a multicollection. Measure all detectors setup using ``activate_detectors``
:param ncounts: Number of counts
:type ncounts: int
:param integration_time: integration time in seconds
:type integration_time: float
"""
if calc_time:
self._estimated_duration += (ncounts * integration_time * ESTIMATED_DURATION_FF)
return
self.ncounts = ncounts
if self.abbreviated_count_ratio:
ncounts *= self.abbreviated_count_ratio
if not self._automated_run_call('py_data_collection',
self,
ncounts,
self._time_zero,
self._time_zero_offset,
fit_series=self._fit_series_count,
series=self._series_count,
integration_time=integration_time):
self.cancel()
@count_verbose_skip
@command_register
def baselines(self, ncounts=1, mass=None, detector='',
use_dac=False,
integration_time=1.04,
settling_time=4, calc_time=False):
"""
Measure the baseline for all detectors. Position ion beams using mass and detector
:param ncounts: Number of counts
:type ncounts: int
:param mass: Mass to measure baseline in amu
:type mass: float
:param detector: name of detector
:type detector: str
:param use_dac: If True interpret mass as a DAC value instead of amu
:type use_dac: bool
:param integration_time: integration time in seconds
:type integration_time: float
:param settling_time: delay between magnet positioning and measurement in seconds
:type settling_time: float
"""
if self.abbreviated_count_ratio:
ncounts *= self.abbreviated_count_ratio
if calc_time:
ns = ncounts
d = ns * integration_time * ESTIMATED_DURATION_FF + settling_time
self._estimated_duration += d
return
self.ncounts = ncounts
if self._baseline_series:
series = self._baseline_series
else:
series = self._series_count
if not self._automated_run_call('py_baselines', ncounts,
self._time_zero,
self._time_zero_offset,
mass,
detector,
use_dac=use_dac,
fit_series=self._fit_series_count,
settling_time=settling_time,
series=series,
integration_time=integration_time):
self.cancel()
self._baseline_series = series
# self._series_count += 2
# self._fit_series_count += 1
@count_verbose_skip
@command_register
def load_hops(self, p, *args, **kw):
"""
load the hops definition from a file
:param p: path. absolute or relative to this scripts root
:return: hops
:rtype: list of tuples
"""
if not os.path.isfile(p):
p = os.path.join(self.root, p)
if os.path.isfile(p):
with open(p, 'r') as rfile:
head, ext = os.path.splitext(p)
if ext in ('.yaml', '.yml'):
hops = yaml.load(rfile)
elif ext in ('.txt',):
def hop_factory(l):
pairs, counts, settle = eval(l)
# isos, dets = zip(*(p.split(':') for p in pairs.split(',')))
# items = (p.split(':') for p in pairs.split(','))
items = []
for p in pairs.split(','):
args = p.split(':')
defl = args[2] if len(args) == 3 else None
items.append((args[0], args[1], defl))
# n = len(isos)
cc = [{'isotope': i, 'detector': d, 'active': True,
'deflection': de, 'is_baseline': False, 'protect': False} for i, d, de in items]
h = {'counts': counts, 'settle': settle,
'cup_configuration': cc,
'positioning': {'detector': cc[0]['detector'], 'isotope': cc[0]['isotope']}}
return h
hops = [hop_factory(li) for li in fileiter(rfile)]
return hops
else:
self.warning_dialog('No such file {}'.format(p))
@count_verbose_skip
@command_register
def define_detectors(self, isotope, det, *args, **kw):
self._automated_run_call('py_define_detectors', isotope, det)
@count_verbose_skip
@command_register
def define_hops(self, hops=None, **kw):
if not hops:
return
self._automated_run_call('py_define_hops', hops)
@count_verbose_skip
@command_register
def peak_hop(self, ncycles=5, hops=None, mftable=None, calc_time=False):
"""
Peak hop ion beams. Hops usually defined in a separate file.
if mftable == 'ic_mftable' use the ic_mftable generated during detector intercalibration.
:param ncycles: int, number of cycles
:param hops: list of tuples, defined in a hops file
:param mftable: str
"""
if not hops:
return
integration_time = 1.1
counts = sum([h['counts'] * integration_time + h['settle'] for h in hops]) * ncycles
if calc_time:
# counts = sum of counts for each hop
self._estimated_duration += (counts * ESTIMATED_DURATION_FF)
return
group = 'signal'
self.ncounts = counts
if not self._automated_run_call('py_peak_hop', ncycles,
counts,
hops,
mftable,
self._time_zero,
self._time_zero_offset,
self._series_count,
fit_series=self._fit_series_count,
group=group):
self.cancel()
# self._series_count += 2
# self._fit_series_count += 1
[docs] @verbose_skip
@command_register
def peak_center(self, detector='', isotope='',
integration_time=1.04, save=True, calc_time=False,
directions='Increase', config_name='default'):
"""
Calculate the peak center for ``isotope`` on ``detector``.
:param detector: str
:param isotope: str
:param integration_time: float
:param save: bool
"""
if calc_time:
n = 31
self._estimated_duration += n * integration_time * 2
return
self._automated_run_call('py_peak_center', detector=detector,
isotope=isotope, integration_time=integration_time,
directions=directions,
save=save, config_name=config_name)
@verbose_skip
@command_register
def get_intensity(self, name):
v = self._automated_run_call('py_get_intensity', detector=name)
# ensure the script always gets a number
return 0 or v
[docs] @verbose_skip
@command_register
def whiff(self, ncounts=0, conditionals=None):
"""
Do a whiff measurement.
Whiff's are quick measurements with conditionals. use them to take action at
the beginning of a measurement. For example do a whiff to determine if intensity
to great.
:param ncounts: int
:param conditionals: list of dicts
"""
self.ncounts = ncounts
ret = self._automated_run_call('py_whiff', ncounts, conditionals,
self._time_zero, self._time_zero_offset,
fit_series=self._fit_series_count,
series=self._series_count)
return ret
@verbose_skip
@command_register
def reset_measurement(self, detectors=None):
if detectors:
self.reset_data()
self.activate_detectors(*detectors)
try:
self.automated_run.plot_panel.total_counts = 0
except AttributeError:
pass
self._reset()
@verbose_skip
@command_register
def reset_data(self):
self._automated_run_call('py_reset_data')
[docs] @verbose_skip
@command_register
def post_equilibration(self, block=False):
"""
Run the post equilibration script.
"""
self._automated_run_call('py_post_equilibration', block=block)
[docs] @verbose_skip
@command_register
def equilibrate(self, eqtime=20, inlet=None, outlet=None,
do_post_equilibration=True, close_inlet=True, delay=3):
"""
equilibrate the extraction line with the mass spectrometer
inlet or outlet can be a single valve name or a list of valve names. ::
'A', ('A','B'), ['A','B'], 'A,B'
:param eqtime: int, equilibration duration in seconds
:param inlet: str, tuple or list, inlet valve
:param outlet: str, tuple or list, ion pump valve
:param do_post_equilibration: bool
:param close_inlet: bool
:param delay: int, delay in seconds between close of outlet and open of inlet
"""
evt = self._automated_run_call('py_equilibration', eqtime=eqtime,
inlet=inlet,
outlet=outlet,
do_post_equilibration=do_post_equilibration,
close_inlet=close_inlet,
delay=delay)
if not evt:
self.cancel()
else:
# wait for inlet to open
evt.wait()
[docs] @verbose_skip
@command_register
def set_fits(self, *fits):
"""
set time vs intensity regression fits for isotopes
:param fits: str, list, or tuple
"""
self._automated_run_call('py_set_fits', fits)
[docs] @verbose_skip
@command_register
def set_baseline_fits(self, *fits):
"""
set baseline fits for detectors
:param fits:
"""
self._automated_run_call('py_set_baseline_fits', fits)
[docs] @verbose_skip
@command_register
def activate_detectors(self, *dets, **kw):
"""
set the active detectors
:param dets: list
"""
peak_center = kw.get('peak_center', False)
if dets:
self._automated_run_call('py_activate_detectors', list(dets), peak_center=peak_center)
@verbose_skip
@command_register
def position_hv(self, pos, detector='AX'):
self._automated_run_call('py_position_hv', pos, detector)
[docs] @verbose_skip
@command_register
def position_magnet(self, pos, detector='AX', use_dac=False, for_collection=True):
"""
:param pos: location to set magnetic field
:type pos: str, float
:param detector: detector to position ``pos``
:type pos: str
:param use_dac: is the ``pos`` a DAC voltage
:type use_dac: bool
examples::
position_magnet(4.54312, use_dac=True) # detector is not relevant
position_magnet(39.962, detector='AX')
position_magnet('Ar40', detector='AX') #Ar40 will be converted to 39.962 use mole weight dict
"""
self._automated_run_call('py_position_magnet', pos, detector, use_dac=use_dac, for_collection=for_collection)
[docs] @verbose_skip
@command_register
def coincidence(self):
"""
Do a coincidence scan. Peak center all active detectors simulatenously.
calculate required deflection corrections to bring all detectors into
coincidence
"""
self._automated_run_call('py_coincidence_scan')
# ===============================================================================
#
# ===============================================================================
# ===============================================================================
# set commands
# ===============================================================================
@verbose_skip
@command_register
def is_last_run(self):
return self._automated_run_call('py_is_last_run')
@verbose_skip
@command_register
def clear_conditionals(self):
self._automated_run_call('py_clear_conditionals')
@verbose_skip
@command_register
def clear_terminations(self):
self._automated_run_call('py_clear_terminations')
@verbose_skip
@command_register
def clear_truncations(self):
self._automated_run_call('py_clear_truncations')
@verbose_skip
@command_register
def clear_actions(self):
self._automated_run_call('py_clear_actions')
@verbose_skip
@command_register
def add_termination(self, attr, teststr, start_count=0, frequency=10, window=0, mapper='', ntrips=1):
self._automated_run_call('py_add_termination',
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency, window=window,
mapper=mapper, ntrips=ntrips)
@verbose_skip
@command_register
def add_cancelation(self, attr, teststr, start_count=0, frequency=10, window=0, mapper='', ntrips=1):
self._automated_run_call('py_add_cancelation',
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency, window=window,
mapper=mapper, ntrips=ntrips)
@verbose_skip
@command_register
def add_truncation(self, attr, teststr, start_count=0, frequency=10, ntrips=1,
abbreviated_count_ratio=1.0):
self._automated_run_call('py_add_truncation',
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency,
abbreviated_count_ratio=abbreviated_count_ratio, ntrips=ntrips)
@verbose_skip
@command_register
def add_action(self, attr, teststr, start_count=0, frequency=10, ntrips=1,
action=None,
resume=False):
self._automated_run_call('py_add_action',
attr=attr, teststr=teststr,
start_count=start_count,
frequency=frequency,
action=action,
resume=resume, ntrips=ntrips)
@verbose_skip
@command_register
def set_ncounts(self, ncounts=0):
try:
ncounts = int(ncounts)
self.ncounts = ncounts
except Exception as e:
print('set_ncounts', e)
[docs] @verbose_skip
@command_register
def set_time_zero(self, offset=0):
"""
set the time_zero value.
add offset to time_zero ::
T_o= ion pump closes
offset seconds after T_o. define time_zero
T_eq= inlet closes
"""
self._time_zero = time.time() + offset
self._time_zero_offset = offset
[docs] @verbose_skip
@command_register
def set_integration_time(self, v):
"""
Set the integration time
:param v: integration time in seconds
:type v: float
"""
self._automated_run_call('py_set_integration_time', v)
@verbose_skip
@command_register
def raw_spectrometer_command(self, command):
self._automated_run_call('py_raw_spectrometer_command', command)
@verbose_skip
@command_register
def set_spectrometer_configuration(self, name):
set_spectrometer_config_name(name)
self._automated_run_call('py_send_spectrometer_configuration')
@verbose_skip
@command_register
def set_isotope_group(self, name):
self._automated_run_call('py_set_isotope_group', name)
@property
def truncated(self):
"""
Property. True if run was truncated otherwise False
:return: bool
"""
return self._automated_run_call(lambda: self.automated_run.truncated) or self.is_truncated()
@property
def eqtime(self):
"""
Property. Equilibration time. Get value from ``AutomatedRun``.
:return: float, int
"""
r = 20
if self.automated_run:
r = self.automated_run.eqtime
if r == -1:
r = 20
cg = self._get_config()
if cg.has_option('Default', 'eqtime'):
r = cg.getfloat('Default', 'eqtime', )
return r
@property
def time_zero_offset(self):
"""
Property. Substract ``time_zero_offset`` from time value for all data points
:return: float, int
"""
if self.automated_run:
return self.automated_run.time_zero_offset
# return self._automated_run_call(lambda: self.automated_run.time_zero_offset)
else:
return 0
@property
def use_cdd_warming(self):
"""
Property. Use CDD Warming. Get value from ``AutomatedRunSpec``
:return: bool
"""
if self.automated_run:
return self.automated_run.spec.use_cdd_warming
# return self._automated_run_call(lambda: self.automated_run.spec.use_cdd_warming)
# private
def _get_deflection_from_file(self, name):
config = self._get_config()
section = 'Deflections'
dets = config.options(section)
for dn in dets:
if dn.lower() == name.lower():
return config.getfloat(section, dn)
def _set_from_file(self, attrs, section, **user_params):
func = self._set_spectrometer_parameter
config = self._get_config()
for attr in attrs:
if attr in user_params:
v = user_params[attr]
else:
v = config.getfloat(section, attr)
if v is not None:
func('Set{}'.format(attr), v)
def _get_config(self):
config = ConfigParser()
try:
p = get_spectrometer_config_path()
except IOError:
p = os.path.join(paths.spectrometer_dir, 'config.cfg')
config.read(p)
return config
def _automated_run_call(self, func, *args, **kw):
if self.automated_run is None:
return
if isinstance(func, str):
func = getattr(self.automated_run, func)
return func(*args, **kw)
def _set_spectrometer_parameter(self, *args, **kw):
self._automated_run_call('py_set_spectrometer_parameter', *args, **kw)
def _get_spectrometer_parameter(self, *args, **kw):
return self._automated_run_call('py_get_spectrometer_parameter', *args, **kw)
def _setup_docstr_context(self):
"""
add a context object to the global script context
e.g access measurement configuration values such as counts using
mx.counts
"""
try:
m = ast.parse(self.text)
try:
yd = yaml.load(ast.get_docstring(m))
if yd:
mx = MeasurementCTXObject()
mx.create(yd)
self._ctx['mx'] = mx
except yaml.YAMLError as e:
self.debug('failed loading docstring context. {}'.format(e))
except AttributeError:
pass
def _reset(self):
self._baseline_series = None
self._series_count = 0
self._fit_series_count = 0
self._time_zero = None
self.abbreviated_count_ratio = None
self.ncounts = 0
# ============= EOF =============================================