Source code for pychron.spectrometer.thermo.spectrometer.base

# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
from __future__ import absolute_import

import os
import time

from numpy import array, argmin
from traits.api import Int, Property, List, Str, DelegatesTo, Bool, Float

from pychron.core.helpers.strtools import csv_to_floats
from pychron.core.progress import open_progress
from pychron.core.ramper import StepRamper
from pychron.core.ui.gui import invoke_in_main_thread
from pychron.core.yaml import yload
from pychron.paths import paths
from pychron.pychron_constants import (
    QTEGRA_INTEGRATION_TIMES,
    QTEGRA_DEFAULT_INTEGRATION_TIME,
)
from pychron.spectrometer import get_spectrometer_config_path
from pychron.spectrometer.base_spectrometer import BaseSpectrometer


def normalize_integration_time(it):
    """
    find the integration time closest to "it"
    """
    try:
        x = array(QTEGRA_INTEGRATION_TIMES)
        return x[argmin(abs(x - it))]
    except TypeError:
        return 1.0


def calculate_radius(m_e, hv, mfield):
    """
    m_e= mass/charge
    hv= accelerating voltage (V)
    mfield= magnet field (H)
    """
    r = ((2 * m_e * hv) / mfield ** 2) ** 0.5

    return r


[docs]class ThermoSpectrometer(BaseSpectrometer): integration_time = Float integration_times = List(QTEGRA_INTEGRATION_TIMES) magnet_dac = DelegatesTo("magnet", prefix="dac") magnet_dacmin = DelegatesTo("magnet", prefix="dacmin") magnet_dacmax = DelegatesTo("magnet", prefix="dacmin") current_hv = DelegatesTo("source") sub_cup_configurations = List sub_cup_configuration = Property(depends_on="_sub_cup_configuration") _sub_cup_configuration = Str dc_start = Int(0) dc_stop = Int(500) dc_step = Int(50) dc_stepmin = Int(1) dc_stepmax = Int(1000) dc_threshold = Int(3) dc_npeak_centers = Int(3) send_config_on_startup = Bool max_deflection = Int(500) _debug_values = None _test_connect_command = "GetIntegrationTime" def hardware_names(self): return { "ion_repeller": "Ion Repeller Set", "electron_energy": "Electron Energy Set", "y_symmetry": "Y-Symmetry Set", "z_symmetry": "Z-Symmetry Set", "extraction_lens": "Extraction Lens Set", "z_focus": "Z-Focus Set", "hv": "HV", "trap.voltage": "Trap Voltage Readback", "trap.current": "Trap Current Readback", } def make_deflection_dict(self): names = self.detector_names values = self.read_deflection_word(names) return dict(list(zip(names, values))) def make_configuration_dict(self): keys = list(self.hardware_names().values()) values = self.get_parameter_word(keys) d = dict(list(zip(keys, values))) key = "Electron Energy Set" if key in d: d[key] = float("{:0.2f}".format(d[key])) def make_gains_dict(self): return {di.name: di.get_gain() for di in self.detectors}
[docs] def get_detector_active(self, dname): """ return True if dname in the list of intensity keys e.g. keys, signals = get_intensities return dname in keys :param dname: :return: """ keys, prev, _ = self.get_intensities() return dname in keys
[docs] def test_intensity(self): """ test if intensity is changing. make 2 measurements if exactlly the same for all detectors make third measurement if same as 1,2 make fourth measurement if same all four measurements same then test fails :return: """ ret, err = True, "" keys, one, _, _ = self.get_intensities() it = 0.1 if self.simulation else self.integration_time pv = None for i in range(4): time.sleep(it) _, v, _, _ = self.get_intensities() if pv is None or all(pv == v): pv = v else: break else: ret = False # time.sleep(it) # keys, two, _, _ = self.get_intensities() # if all(one == two): # time.sleep(it) # keys, three, _, _ = self.get_intensities() # if all(two == three): # time.sleep(it) # keys, four, _, _ = self.get_intensities() # if all(three == four): # ret = False return ret, err
[docs] def set_gains(self, history=None): """ :param history: :return: list """ if history: self.debug( "setting gains to {}, user={}".format( history.create_date, history.username ) ) for di in self.detectors: di.set_gain() return {di.name: di.gain for di in self.detectors}
[docs] def load_current_detector_gains(self): """ load the detector gains from the spectrometer """ for di in self.detectors: di.get_gain()
def read_integration_time(self): return self.ask("GetIntegrationTime")
[docs] def set_integration_time(self, it, force=False): """ :param it: float, integration time :param force: set integration even if "it" is not different than self.integration_time :return: float, integration time """ it = normalize_integration_time(it) if self.integration_time != it or force: self.debug("setting integration time = {}".format(it)) name = "SetIntegrationTime" self.set_parameter(name, it) self.trait_setq(integration_time=it) # this is a hail mary to potential make qtegra happier post setting integration time self.debug("sleeping 2 seconds after setting integration time") time.sleep(2) return it
def set_parameter(self, name, v, post_delay=None): if not name.startswith("Set"): mk = self.hardware_names() cmd = "SetParameter {},{}".format(mk.get(name, name), v) elif name == "HV": cmd = "SetHV {}".format(v) else: cmd = "{} {}".format(name, v) self.ask(cmd) if post_delay: time.sleep(post_delay) def get_hardware_name(self, k): d = self.hardware_names() return d.get(k) def get_parameter(self, cmd): if hasattr(self.source, "read_{}".format(cmd.lower())): return getattr(self.source, "read_{}".format(cmd.lower()))() else: return self.ask("GetParameter {}".format(cmd)) def set_deflection(self, name, value): det = self.get_detector(name) if det: det.deflection = value else: self.warning( 'Could not find detector "{}". Deflection Not Possible'.format(name) )
[docs] def get_deflection(self, name, current=False): """ get deflection by detector name :param name: str, detector name :param current: bool, if True query qtegra :return: float """ deflection = 0 det = self.get_detector(name) if det: if current: det.read_deflection() deflection = det.deflection else: self.warning('Failed getting deflection for detector ="{}"'.format(name)) return deflection
def read_deflection_word(self, keys): x = self.ask( "GetDeflections {}".format(",".join(keys)), verbose=False, quiet=True ) x = self._parse_word(x) return x def read_parameter_word(self, keys): x = self.ask( "GetParameters {}".format(",".join(keys)), verbose=True, quiet=False ) x = self._parse_word(x) return x def get_configuration_value(self, key): config = self._get_cached_config() ret = 0 if config is not None: if "." in key: section, key = key.split(".") try: opt = config[section] ret = opt.get(key, opt.get(key.lower(), 0)) except KeyError: pass else: for d in config.values(): try: ret = d[key] except KeyError: try: ret = d[key.lower()] except KeyError: pass return ret def set_debug_configuration_values(self): if self.simulation: config = self._get_cached_config() if config is not None: d = config["source"] keys = ( "ElectronEnergy", "YSymmetry", "ZSymmetry", "ZFocus", "IonRepeller", "ExtractionLens", ) ds = [0] + [d[k.lower()] for k in keys] self._debug_values = ds # =============================================================================== # load # ===============================================================================
[docs] def load_configurations(self): """ load configurations from Qtegra :return: """ # self.sub_cup_configurations = ['A', 'B', 'C'] # self._sub_cup_configuration = 'B' # # scc = self.ask('GetSubCupConfigurationList Argon', verbose=False) # if scc: # if 'ERROR' not in scc: # self.sub_cup_configurations = scc.split('\r') # # n = self.ask('GetActiveSubCupConfiguration') # if n: # if 'ERROR' not in n: # self._sub_cup_configuration = n self.molecular_weight = "Ar40"
[docs] def load(self): """ load detectors load setupfiles/spectrometer/config.cfg file load magnet load deflections coefficients :return: """ config = super(ThermoSpectrometer, self).load() pd = "Protection" if config.has_section(pd): self.magnet.use_beam_blank = self.config_get( config, pd, "use_beam_blank", cast="boolean", default=False ) self.magnet.use_detector_protection = self.config_get( config, pd, "use_detector_protection", cast="boolean", default=False ) self.magnet.beam_blank_threshold = self.config_get( config, pd, "beam_blank_threshold", cast="float", default=0.1 ) # self.magnet.detector_protection_threshold = self.config_get(config, pd, # 'detector_protection_threshold', # cast='float', default=0.1) ds = self.config_get(config, pd, "detectors") if ds: ds = ds.split(",") self.magnet.protected_detectors = ds for di in ds: self.info( 'Making protection available for detector "{}"'.format(di) ) if config.has_section("Deflections"): if config.has_option("Deflections", "max"): v = config.getint("Deflections", "max") if v: self.max_deflection = v self.debug("Detectors {}".format(self.detectors)) for d in self.detectors: d.load_deflection_coefficients()
def start(self): self.debug( "********** Spectrometer start. send configuration: {}".format( self.send_config_on_startup ) ) if self.send_config_on_startup: self.send_configuration(use_ramp=True, ramp_confirm=True) def settle(self): time.sleep(self.integration_time * 2) # =============================================================================== # signals # =============================================================================== def read_intensities(self, tagged=True, *args, **kw): keys = [] signals = [] datastr = self.ask("GetData", verbose=False, quiet=True, use_error_mode=False) if datastr: if "ERROR" not in datastr: data = datastr.split(",") if tagged: keys = data[::2] signals = data[1::2] else: keys = ["H2", "H1", "AX", "L1", "L2", "CDD"] signals = data signals = [float(s) for s in signals] return keys, signals, None, True
[docs] def get_intensity(self, dkeys): """ dkeys: str or tuple of strs """ data = self.get_intensities() if data is not None: keys, signals, _, _ = data def func(k): return signals[keys.index(k)] if k in keys else 0 if isinstance(dkeys, (tuple, list)): return [func(key) for key in dkeys] else: return func(dkeys)
# return signals[keys.index(dkeys)] if dkeys in keys else 0 def update_config(self, **kw): # p = os.path.join(paths.spectrometer_dir, 'config.cfg') p = get_spectrometer_config_path() config = self.get_configuration_writer(p) for k, v in kw.items(): if not config.has_section(k): config.add_section(k) for option, value in v: config.set(k, option, value) with open(p, "w") as wfile: config.write(wfile) self.clear_cached_config() def _get_source_parameter_value(self, k, hardware_name): self.debug("get source parameter k={}, hw={}".format(k, hardware_name)) try: ret = getattr(self.source, "read_{}".format(k.lower()))() except AttributeError: ret = self.get_parameter(hardware_name) return ret def verify_configuration(self, **kw): self.debug("========= Verifying configuration =========") readout_comp, defl_comp = self._load_configuration_comparisons() mismatch = False if self.microcontroller: hardware_names = self.hardware_names() config = self._get_cached_config() if config is not None: # specparams, defl, trap, magnet = args for k, v in config["deflection"].items(): comp = defl_comp.get(k, {}) if comp.get("compare", True): current = self.get_deflection(k, current=True) dev = self._get_config_dev(current, v, comp) if dev: self.warning( "verify failed {}. current={}, config={}".format( k, current, v ) ) mismatch = True for k, v in config["source"].items(): try: mk = hardware_names[k] except KeyError: self.debug( "--- Not checking {}. Not in hardware_names".format(k) ) self.debug("hardware names: {}".format(hardware_names)) continue comp = readout_comp.get(k, {}) if comp.get("compare", True): current = self._get_source_parameter_value(k, mk) try: current = float(current) except ValueError: self.warning( "invalid float value {}, {}".format(mk, current) ) continue dev = self._get_config_dev(current, v, comp) if dev: self.warning( "verify failed {}. current={}, config={}".format( mk, current, v ) ) mismatch = True trap = config["trap"] for tag in ("voltage", "current"): v = trap.get(tag) if v is not None: comp = readout_comp.get("Trap{}".format(tag.capitalize()), {}) if comp.get("compare", True): current = getattr(self.source, "trap_{}".format(tag)) dev = self._get_config_dev(current, v, comp) if dev: self.warning( "verify failed trap {}. current={}, config={}".format( tag, current, v ) ) mismatch = True self.debug("========= Verify complete ===========") return not mismatch # =============================================================================== # private # =============================================================================== def _parse_word(self, word): try: x = csv_to_floats(word) except (AttributeError, ValueError): x = [] return x def _get_simulation_data(self): signals = [1, 100, 3, 0.01, 0.01, 0.01, 38, 38.5] # + random(6) keys = ["H2", "H1", "AX", "L1", "L2", "CDD", "L2(CDD)", "AX(CDD)"] return keys, signals, None def _get_config_dev(self, current, v, comp): dev = False if comp.get("compare", True): tol = comp.get("percent_tol") if not tol: tol = comp.get("tolerance", 0.01) delta = abs(current - v) dev = delta > tol self.debug("abs tolerance={}, delta={}".format(tol, delta)) else: try: delta = abs(current - v) / float(v) dev = delta > tol self.debug("percent tolerance={}, delta={}".format(tol, delta)) except ZeroDivisionError: self.warning("zero division exception") tol = comp.get("tolerance", 0.01) delta = abs(current - v) dev = delta > tol self.debug("abs tolerance={}, delta={}".format(tol, delta)) return dev def _load_configuration_comparisons(self): path = os.path.join(paths.spectrometer_dir, "readout.yaml") readouts = {} deflections = {} yt = yload(path) if yt: readouts, deflections = yt readouts = {r["name"]: r for r in readouts} deflections = {r["name"]: r for r in deflections} return readouts, deflections def _send_configuration(self, use_ramp=True, ramp_confirm=False): self.debug("======== Sending configuration ========") if self.force_send_configuration: readout_comp, defl_comp = {}, {} else: readout_comp, defl_comp = self._load_configuration_comparisons() def not_setting(k, c, v): self.debug("Not setting {:<20s} current={}, config={}".format(k, c, v)) if self.microcontroller: hardware_names = self.hardware_names() config = self._get_cached_config() if config is not None: # specparams, defl, trap, magnet = ret for k, v in config["deflection"].items(): if not self.force_send_configuration: comp = defl_comp.get(k, {}) if comp.get("compare", True): current = self.get_deflection(k, current=True) dev = self._get_config_dev(current, v, comp) if not dev: not_setting(k, current, v) continue cmd = "SetDeflection" v = "{},{}".format(k, v) self.set_parameter(cmd, v, post_delay=0.05) for k, v in config["source"].items(): try: mk = hardware_names[k] except KeyError: self.debug( "--- Not setting {}. Not in hardware_names".format(k) ) self.debug("hardware names: {}".format(hardware_names)) continue if not self.force_send_configuration: comp = readout_comp.get(k, {}) if comp.get("compare", True): current = self._get_source_parameter_value(k, mk) try: current = float(current) except (ValueError, TypeError): self.warning("invalid value {}, {}".format(k, current)) continue dev = self._get_config_dev(current, v, comp) if not dev: not_setting(mk, current, v) continue self.set_parameter(mk, v, post_delay=0.05) trap = config["trap"] for tag, func in ( ("voltage", self.source.read_trap_voltage), ("current", self.source.read_trap_current), ): # set trap voltage v = trap.get(tag) ttag = "Trap{}".format(tag.capitalize()) self.debug("send trap {} {}".format(tag, v)) if v is not None: if not self.force_send_configuration: comp = readout_comp.get(ttag, {}) if comp.get("compare", True): current = func() try: current = float(current) dev = self._get_config_dev(current, v, comp) if not dev: not_setting(ttag, current, v) v = None except (ValueError, TypeError): self.warning( "invalid value {}, {}".format(ttag, current) ) else: v = None if v is not None: if tag == "current": step = trap.get("ramp_step", 1) period = trap.get("ramp_period", 1) tol = trap.get("ramp_tolerance", 10) use_ramp = use_ramp and trap.get("use_ramp", True) self._ramp_trap_current( v, step, period, use_ramp, tol, ramp_confirm ) else: setattr(self.source, "trap_{}".format(tag), v) # set the mftable magnet = config["magnet"] mftable_name = magnet.get("mftable") if mftable_name: self.debug("updating mftable name {}".format(mftable_name)) self.magnet.field_table.path = mftable_name self.magnet.field_table.load_table(load_items=True) self.debug("======== Configuration Finished ========") self.source.sync_parameters() def _ramp_trap_current( self, v, step, period, use_ramp=False, tol=10, confirm=False ): if use_ramp: self.debug("ramping trap current") current = self.source.read_trap_current() if current is None: self.debug("could not read current trap. skipping ramp") return if abs(v - current) >= tol: ok = True show_progress = False if confirm: ok = self.confirmation_dialog( "Would you like to ramp up the " "Trap current from {} to {}".format(current, v) ) show_progress = True if ok: r = StepRamper() steps = abs(v - current) / step if show_progress: prog = open_progress(int(steps)) def func(x): self.source.trap_current = x if show_progress: invoke_in_main_thread( prog.change_message, "Set Trap Current {}".format(x) ) if not prog.accepted and not prog.canceled: return True else: return True self.debug( "current={}, target={}, step={}, period={}".format( current, v, step, period ) ) r.ramp(func, current, v, step, period) if show_progress: prog.close() return True else: self.debug("trap current is up-to-date") return True # =============================================================================== # defaults # =============================================================================== def _integration_time_default(self): self.default_integration_time = QTEGRA_DEFAULT_INTEGRATION_TIME return QTEGRA_DEFAULT_INTEGRATION_TIME # =============================================================================== # property get/set # =============================================================================== def _get_sub_cup_configuration(self): return self._sub_cup_configuration def _set_sub_cup_configuration(self, v): self._sub_cup_configuration = v self.ask("SetSubCupConfiguration {}".format(v))
# if __name__ == '__main__': # s = Spectrometer() # ss = ArgusSource() # ss.current_hv = 4505 # s.source = ss # corrected = s.get_hv_correction(100,current=False) # uncorrected = s.get_hv_correction(corrected, uncorrect=True, current=False) # # print corrected, uncorrected # ============= EOF =============================================