# ===============================================================================
# Copyright 2012 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
# ============= standard library imports ========================
import ast
import os
import time
from configparser import ConfigParser
import yaml
from pychron.core.helpers.filetools import fileiter
from pychron.core.yaml import yload
from pychron.paths import paths
from pychron.pychron_constants import MEASUREMENT_COLOR
from pychron.pyscripts.contexts import MeasurementCTXObject
from pychron.pyscripts.decorators import verbose_skip, count_verbose_skip, makeRegistry
from pychron.pyscripts.valve_pyscript import ValvePyScript
from pychron.spectrometer import (
get_spectrometer_config_path,
set_spectrometer_config_name,
)
ESTIMATED_DURATION_FF = 1.0
command_register = makeRegistry()
[docs]class MeasurementPyScript(ValvePyScript):
"""
MeasurementPyScripts are used to collect isotopic data
"""
automated_run = None
ncounts = 0
info_color = MEASUREMENT_COLOR
abbreviated_count_ratio = None
hops_name = ""
hops_blob = ""
_time_zero = None
_time_zero_offset = 0
_series_count = 0
_baseline_series = None
_fit_series_count = 0
def gosub(self, *args, **kw):
kw["automated_run"] = self.automated_run
s = super(MeasurementPyScript, self).gosub(*args, **kw)
if s:
s.automated_run = None
return s
[docs] def reset(self, arun):
"""
Reset the script with a new automated run
:param arun: A new ``AutomatedRun``
:type arun: ``AutomatedRun``
"""
self.debug("%%%%%%%%%%%%%%%%%% setting automated run {}".format(arun.runid))
self.automated_run = arun
self._reset()
def get_command_register(self):
cs = super(MeasurementPyScript, self).get_command_register()
return cs + list(command_register.commands.items())
def truncate(self, style=None):
if style == "quick":
self.abbreviated_count_ratio = 0.25
super(MeasurementPyScript, self).truncate(style=style)
def get_variables(self):
return ["truncated", "eqtime", "use_cdd_warming"]
def increment_series_counts(self, s, f):
self._series_count += s
self._fit_series_count += f
def reset_series(self):
self._series_count = 0
self._fit_series_count = 0
# ===============================================================================
# commands
# ===============================================================================
@command_register
def measurement_delay(self, duration=None, message=None):
if duration:
try:
self.automated_run.plot_panel.total_counts += round(duration)
except AttributeError:
pass
self.sleep(duration, message=message)
[docs] @verbose_skip
@command_register
def generate_ic_mftable(
self, detectors, refiso="Ar40", peak_center_config="", n=1, calc_time=False
):
"""
Generate an IC MFTable. Use this when doing a Detector Intercalibration.
peak centers the ``refiso`` on a list of ``detectors``. MFTable saved as ic_mftable
cancel script if generating mftable fails
:param detectors: list of detectors to peak center
:type detectors: list
:param refiso: isotope to peak center
:type refiso: str
"""
if calc_time:
self._estimated_duration += (len(detectors) * 30) * n
return
if not self._automated_run_call(
"py_generate_ic_mftable", detectors, refiso, peak_center_config, n
):
self.cancel()
@verbose_skip
@command_register
def extraction_gosub(self, *args, **kw):
kw["klass"] = "ExtractionPyScript"
super(MeasurementPyScript, self).gosub(*args, **kw)
@count_verbose_skip
@command_register
def measure_equilibration(self, *args, **kw):
self.sniff(*args, **kw)
@count_verbose_skip
@command_register
def sniff(self, ncounts=0, calc_time=False, integration_time=1.04, block=True):
"""
collect a sniff measurement. Sniffs are the measurement of the equilibration gas.
:param ncounts: Number of counts
:type ncounts: int
:param integration_time: integration time in seconds
:type integration_time: float
:param block: Is this call blocking or should it return immediately
:type block: bool
"""
if calc_time:
self._estimated_duration += (
ncounts * integration_time * ESTIMATED_DURATION_FF
)
return
self.ncounts = ncounts
if not self._automated_run_call(
"py_sniff",
ncounts,
self._time_zero,
self._time_zero_offset,
series=self._series_count,
block=block,
):
self.cancel()
@count_verbose_skip
@command_register
def multicollect(self, ncounts=200, integration_time=1.04, calc_time=False):
"""
Do a multicollection. Measure all detectors setup using ``activate_detectors``
:param ncounts: Number of counts
:type ncounts: int
:param integration_time: integration time in seconds
:type integration_time: float
"""
if self.abbreviated_count_ratio:
ncounts *= self.abbreviated_count_ratio
if calc_time:
self._estimated_duration += (
ncounts * integration_time * ESTIMATED_DURATION_FF
)
return
self.ncounts = ncounts
# set self.ncounts before applying abbreviated_count_ratio
# if self.abbreviated_count_ratio:
# ncounts *= self.abbreviated_count_ratio
if not self._automated_run_call(
"py_data_collection",
self,
ncounts,
self._time_zero,
self._time_zero_offset,
fit_series=self._fit_series_count,
series=self._series_count,
integration_time=integration_time,
):
self.cancel()
@count_verbose_skip
@command_register
def baselines(
self,
ncounts=1,
mass=None,
detector="",
use_dac=False,
integration_time=1.04,
settling_time=4,
check_conditionals=True,
calc_time=False,
):
"""
Measure the baseline for all detectors. Position ion beams using mass and detector
:param ncounts: Number of counts
:type ncounts: int
:param mass: Mass to measure baseline in amu
:type mass: float
:param detector: name of detector
:type detector: str
:param use_dac: If True interpret mass as a DAC value instead of amu
:type use_dac: bool
:param integration_time: integration time in seconds
:type integration_time: float
:param settling_time: delay between magnet positioning and measurement in seconds
:type settling_time: float
"""
if self.abbreviated_count_ratio:
ncounts *= self.abbreviated_count_ratio
if calc_time:
ns = ncounts
d = ns * integration_time * ESTIMATED_DURATION_FF + settling_time
self._estimated_duration += d
return
self.ncounts = ncounts
if self._baseline_series:
series = self._baseline_series
else:
series = self._series_count
if not self._automated_run_call(
"py_baselines",
ncounts,
self._time_zero,
self._time_zero_offset,
mass,
detector,
check_conditionals=check_conditionals,
use_dac=use_dac,
fit_series=self._fit_series_count,
settling_time=settling_time,
series=series,
integration_time=integration_time,
):
self.cancel()
self._baseline_series = series
@count_verbose_skip
@command_register
def load_hops(self, p, *args, **kw):
"""
load the hops definition from a file
:param p: path. absolute or relative to this scripts root
:return: hops
:rtype: list of tuples
"""
if not os.path.isfile(p):
p = os.path.join(self.root, p)
if os.path.isfile(p):
self.hops_name = os.path.basename(p)
with open(p, "r") as rfile:
self.hops_blob = rfile.read()
with open(p, "r") as rfile:
head, ext = os.path.splitext(p)
if ext in (".yaml", ".yml"):
hops = yload(rfile)
elif ext in (".txt",):
def hop_factory(l):
pairs, counts, settle = eval(l)
# isos, dets = zip(*(p.split(':') for p in pairs.split(',')))
# items = (p.split(':') for p in pairs.split(','))
items = []
for p in pairs.split(","):
args = p.split(":")
defl = args[2] if len(args) == 3 else None
items.append((args[0], args[1], defl))
# n = len(isos)
cc = [
{
"isotope": i,
"detector": d,
"active": True,
"deflection": de,
"is_baseline": False,
"protect": False,
}
for i, d, de in items
]
h = {
"counts": counts,
"settle": settle,
"cup_configuration": cc,
"positioning": {
"detector": cc[0]["detector"],
"isotope": cc[0]["isotope"],
},
}
return h
hops = [hop_factory(li) for li in fileiter(rfile)]
return hops
else:
self.warning_dialog("No such file {}".format(p))
@count_verbose_skip
@command_register
def define_detectors(self, isotope, det, *args, **kw):
self._automated_run_call("py_define_detectors", isotope, det)
@count_verbose_skip
@command_register
def define_hops(self, hops=None, **kw):
if not hops:
return
self._automated_run_call("py_define_hops", hops)
@count_verbose_skip
@command_register
def peak_hop(self, ncycles=5, hops=None, mftable=None, calc_time=False):
"""
Peak hop ion beams. Hops usually defined in a separate file.
if mftable == 'ic_mftable' use the ic_mftable generated during detector intercalibration.
:param ncycles: int, number of cycles
:param hops: list of tuples, defined in a hops file
:param mftable: str
"""
if not hops:
return
integration_time = 1.1
counts = (
sum([h["counts"] * integration_time + h["settle"] for h in hops]) * ncycles
)
if calc_time:
# counts = sum of counts for each hop
self._estimated_duration += counts * ESTIMATED_DURATION_FF
return
group = "signal"
self.ncounts = counts
if not self._automated_run_call(
"py_peak_hop",
ncycles,
counts,
hops,
mftable,
self._time_zero,
self._time_zero_offset,
self._series_count,
fit_series=self._fit_series_count,
group=group,
):
self.cancel()
# self._series_count += 2
# self._fit_series_count += 1
[docs] @verbose_skip
@command_register
def peak_center(
self,
detector="",
isotope="",
integration_time=1.04,
save=True,
calc_time=False,
directions="Increase",
config_name="default",
):
"""
Calculate the peak center for ``isotope`` on ``detector``.
:param detector: str
:param isotope: str
:param integration_time: float
:param save: bool
"""
if calc_time:
n = 31
self._estimated_duration += n * integration_time * 2
return
self._automated_run_call(
"py_peak_center",
detector=detector,
isotope=isotope,
integration_time=integration_time,
directions=directions,
save=save,
config_name=config_name,
)
@verbose_skip
@command_register
def get_intensity(self, name):
v = self._automated_run_call("py_get_intensity", detector=name)
# ensure the script always gets a number
return 0 or v
[docs] @verbose_skip
@command_register
def whiff(self, ncounts=0, conditionals=None):
"""
Do a whiff measurement.
Whiff's are quick measurements with conditionals. use them to take action at
the beginning of a measurement. For example do a whiff to determine if intensity
to great.
:param ncounts: int
:param conditionals: list of dicts
"""
self.ncounts = ncounts
ret = self._automated_run_call(
"py_whiff",
ncounts,
conditionals,
self._time_zero,
self._time_zero_offset,
fit_series=self._fit_series_count,
series=self._series_count,
)
return ret
@verbose_skip
@command_register
def reset_measurement(self, detectors=None):
if detectors:
self.reset_data()
self.activate_detectors(*detectors)
try:
self.automated_run.plot_panel.total_counts = 0
self.automated_run.plot_panel.total_seconds = 0
except AttributeError:
pass
self._reset()
@verbose_skip
@command_register
def reset_data(self):
self._automated_run_call("py_reset_data")
[docs] @verbose_skip
@command_register
def post_equilibration(self, block=False):
"""
Run the post equilibration script.
"""
self._automated_run_call("py_post_equilibration", block=block)
[docs] @verbose_skip
@command_register
def equilibrate(
self,
eqtime=20,
inlet=None,
outlet=None,
do_post_equilibration=True,
close_inlet=True,
delay=3,
):
"""
equilibrate the extraction line with the mass spectrometer
inlet or outlet can be a single valve name or a list of valve names. ::
'A', ('A','B'), ['A','B'], 'A,B'
:param eqtime: int, equilibration duration in seconds
:param inlet: str, tuple or list, inlet valve
:param outlet: str, tuple or list, ion pump valve
:param do_post_equilibration: bool
:param close_inlet: bool
:param delay: int, delay in seconds between close of outlet and open of inlet
"""
ok = self._automated_run_call(
"py_equilibration",
eqtime=eqtime,
inlet=inlet,
outlet=outlet,
do_post_equilibration=do_post_equilibration,
close_inlet=close_inlet,
delay=delay,
)
if not ok:
self.cancel()
# else:
# wait for inlet to open
# evt.wait()
[docs] @verbose_skip
@command_register
def set_fits(self, *fits):
"""
set time vs intensity regression fits for isotopes
:param fits: str, list, or tuple
"""
self._automated_run_call("py_set_fits", fits)
[docs] @verbose_skip
@command_register
def set_baseline_fits(self, *fits):
"""
set baseline fits for detectors
:param fits:
"""
self._automated_run_call("py_set_baseline_fits", fits)
[docs] @verbose_skip
@command_register
def activate_detectors(self, *dets, **kw):
"""
set the active detectors
:param dets: list
"""
peak_center = kw.get("peak_center", False)
if dets:
self._automated_run_call(
"py_activate_detectors", list(dets), peak_center=peak_center
)
@verbose_skip
@command_register
def position_hv(self, pos, detector="AX"):
self._automated_run_call("py_position_hv", pos, detector)
[docs] @verbose_skip
@command_register
def position_magnet(self, pos, detector="AX", use_dac=False, for_collection=True):
"""
:param pos: location to set magnetic field
:type pos: str, float
:param detector: detector to position ``pos``
:type pos: str
:param use_dac: is the ``pos`` a DAC voltage
:type use_dac: bool
examples::
position_magnet(4.54312, use_dac=True) # detector is not relevant
position_magnet(39.962, detector='AX')
position_magnet('Ar40', detector='AX') #Ar40 will be converted to 39.962 use mole weight dict
"""
self._automated_run_call(
"py_position_magnet",
pos,
detector,
use_dac=use_dac,
for_collection=for_collection,
)
[docs] @verbose_skip
@command_register
def coincidence(self):
"""
Do a coincidence scan. Peak center all active detectors simulatenously.
calculate required deflection corrections to bring all detectors into
coincidence
"""
self._automated_run_call("py_coincidence_scan")
# ===============================================================================
#
# ===============================================================================
# ===============================================================================
# set commands
# ===============================================================================
@verbose_skip
@command_register
def is_last_run(self):
return self._automated_run_call("py_is_last_run")
@verbose_skip
@command_register
def clear_conditionals(self):
self._automated_run_call("py_clear_conditionals")
@verbose_skip
@command_register
def clear_terminations(self):
self._automated_run_call("py_clear_terminations")
@verbose_skip
@command_register
def clear_truncations(self):
self._automated_run_call("py_clear_truncations")
@verbose_skip
@command_register
def clear_actions(self):
self._automated_run_call("py_clear_actions")
@verbose_skip
@command_register
def add_termination(
self, attr, teststr, start_count=0, frequency=10, window=0, mapper="", ntrips=1
):
self._automated_run_call(
"py_add_termination",
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency,
window=window,
mapper=mapper,
ntrips=ntrips,
)
@verbose_skip
@command_register
def add_cancelation(
self, attr, teststr, start_count=0, frequency=10, window=0, mapper="", ntrips=1
):
self._automated_run_call(
"py_add_cancelation",
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency,
window=window,
mapper=mapper,
ntrips=ntrips,
)
@verbose_skip
@command_register
def add_truncation(
self,
attr,
teststr,
start_count=0,
frequency=10,
ntrips=1,
abbreviated_count_ratio=1.0,
):
self._automated_run_call(
"py_add_truncation",
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency,
abbreviated_count_ratio=abbreviated_count_ratio,
ntrips=ntrips,
)
@verbose_skip
@command_register
def add_action(
self,
attr,
teststr,
start_count=0,
frequency=10,
ntrips=1,
action=None,
resume=False,
):
self._automated_run_call(
"py_add_action",
attr=attr,
teststr=teststr,
start_count=start_count,
frequency=frequency,
action=action,
resume=resume,
ntrips=ntrips,
)
@verbose_skip
@command_register
def set_ncounts(self, ncounts=0):
try:
ncounts = int(ncounts)
self.ncounts = ncounts
except Exception as e:
print("set_ncounts", e)
[docs] @verbose_skip
@command_register
def set_time_zero(self, offset=0):
"""
set the time_zero value.
add offset to time_zero ::
T_o= ion pump closes
offset seconds after T_o. define time_zero
T_eq= inlet closes
"""
self._time_zero = time.time() + offset
self._time_zero_offset = offset
[docs] @verbose_skip
@command_register
def set_integration_time(self, v):
"""
Set the integration time
:param v: integration time in seconds
:type v: float
"""
self._automated_run_call("py_set_integration_time", v)
@verbose_skip
@command_register
def raw_spectrometer_command(self, command):
self._automated_run_call("py_raw_spectrometer_command", command)
@verbose_skip
@command_register
def set_spectrometer_configuration(self, name):
set_spectrometer_config_name(name)
self._automated_run_call("py_clear_cached_configuration")
self._automated_run_call("py_send_spectrometer_configuration")
@verbose_skip
@command_register
def set_isotope_group(self, name):
self._automated_run_call("py_set_isotope_group", name)
@property
def truncated(self):
"""
Property. True if run was truncated otherwise False
:return: bool
"""
return (
self._automated_run_call(lambda: self.automated_run.truncated)
or self.is_truncated()
)
@property
def eqtime(self):
"""
Property. Equilibration time. Get value from ``AutomatedRun``.
:return: float, int
"""
r = 20
if self.automated_run:
r = self.automated_run.eqtime
if r == -1:
r = 20
cg = self._get_config()
if cg.has_option("Default", "eqtime"):
r = cg.getfloat(
"Default",
"eqtime",
)
return r
@property
def time_zero_offset(self):
"""
Property. Substract ``time_zero_offset`` from time value for all data points
:return: float, int
"""
if self.automated_run:
return self.automated_run.time_zero_offset
# return self._automated_run_call(lambda: self.automated_run.time_zero_offset)
else:
return 0
@property
def use_cdd_warming(self):
"""
Property. Use CDD Warming. Get value from ``AutomatedRunSpec``
:return: bool
"""
if self.automated_run:
return self.automated_run.spec.use_cdd_warming
# return self._automated_run_call(lambda: self.automated_run.spec.use_cdd_warming)
# private
def _get_deflection_from_file(self, name):
config = self._get_config()
section = "Deflections"
dets = config.options(section)
for dn in dets:
if dn.lower() == name.lower():
return config.getfloat(section, dn)
def _set_from_file(self, section, **user_params):
func = self._set_spectrometer_parameter
config = self._get_config()
for name, value in config.items(section):
if name in user_params:
value = user_params[name]
func(name, value)
# for attr in attrs:
# if attr in user_params:
# v = user_params[attr]
# else:
# v = config.getfloat(section, attr)
#
# if v is not None:
# func('SetParameter {}'.format(attr), v)
def _get_config(self):
config = ConfigParser()
try:
p = get_spectrometer_config_path()
except IOError:
p = os.path.join(paths.spectrometer_dir, "config.cfg")
config.read(p)
return config
def _automated_run_call(self, func, *args, **kw):
if self.automated_run is None:
return
if isinstance(func, str):
func = getattr(self.automated_run, func)
return func(*args, **kw)
def _set_spectrometer_parameter(self, *args, **kw):
self._automated_run_call("py_set_spectrometer_parameter", *args, **kw)
def _get_spectrometer_parameter(self, *args, **kw):
return self._automated_run_call("py_get_spectrometer_parameter", *args, **kw)
def _setup_docstr_context(self):
"""
add a context object to the global script context
e.g access measurement configuration values such as counts using
mx.counts
"""
try:
m = ast.parse(self.text)
try:
yd = yload(ast.get_docstring(m))
if yd:
mx = MeasurementCTXObject()
mx.create(yd)
self._ctx["mx"] = mx
except yaml.YAMLError as e:
self.debug("failed loading docstring context. {}".format(e))
except AttributeError:
pass
def _reset(self):
self._baseline_series = None
self._series_count = 0
self._fit_series_count = 0
self._time_zero = None
self.abbreviated_count_ratio = None
self.ncounts = 0
# ============= EOF =============================================