Source code for pychron.spectrometer.base_magnet

# ===============================================================================
# Copyright 2014 Jake Ross
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
import os
import time
from math import pi

from numpy import arange, sin
from traits.api import Property, Float, Event, Instance

from pychron.core.yaml import yload
from pychron.paths import paths
from pychron.spectrometer.fieldmixin import FieldMixin
from pychron.spectrometer.spectrometer_device import SpectrometerDevice

[docs]class BaseMagnet(SpectrometerDevice, FieldMixin): dac = Property(Float, depends_on="_dac") mass = Float(enter_set=True, auto_set=False) _dac = Float dacmin = Float(0.0) dacmax = Float(10.0) massmin = Property(Float, depends_on="_massmin") massmax = Property(Float, depends_on="_massmax") _massmin = Float(0.0) _massmax = Float(200.0) settling_time = 0.5 detector = Instance("pychron.spectrometer.base_detector.BaseDetector") dac_changed = Event confirmation_threshold_mass = Float use_deflection_correction = True use_af_demagnetization = False _suppress_mass_update = False # def __init__(self, *args, **kw): # super(BaseMagnet, self).__init__(*args, **kw) # self._lock = threading.Lock() # self._cond = threading.Condition((threading.Lock())) def read_dac(self): raise NotImplementedError def set_dac(self, *args, **kw): raise NotImplementedError # =============================================================================== # persistence # =============================================================================== def load(self): self.field_table_setup()
[docs] def finish_loading(self): """ initialize the mftable read DAC from device :return: """ # self.field_table_setup() d = self.read_dac() if d is not None: self._dac = d # load af demag self._load_af_demag_configuration()
# =============================================================================== # mapping # ===============================================================================
[docs] def map_dac_to_mass(self, dac, detname): """ convert a DAC value (voltage) to mass for a given detector use the mftable :param dac: float, voltage (0-10V) :param detname: str, name of a detector, e.g H1 :return: float, mass """ return self.field_table.map_dac_to_mass(dac, detname)
[docs] def map_mass_to_dac(self, mass, detname): """ convert a mass value from amu to dac for a given detector :param mass: float, amu :param detname: std, name of a detector, e.g. H1 :return: float, dac voltage """ dac = self.field_table.map_mass_to_dac(mass, detname) self.debug("{} map mass to dac {} >> {}".format(detname, mass, dac)) if dac is None: self.warning( "Could not map mass to dac. Returning current DAC {}".format(self._dac) ) dac = self._dac return dac
[docs] def map_dac_to_isotope(self, dac=None, det=None, current=True): """ convert a dac voltage to isotope name for a given detector :param dac: float, voltage :param det: str, detector name :param current: bool, get current hv :return: str, e.g Ar40 """ if dac is None: dac = self._dac if det is None: det = self.detector if det: dac = self.spectrometer.uncorrect_dac(det, dac, current=current) m = self.map_dac_to_mass(dac, if m is not None: return self.spectrometer.map_isotope(m)
[docs] def mass_change(self, m): """ set the self.mass attribute suppress mass change handler :param m: float :return: """ self._suppress_mass_update = True self.trait_set(mass=m) self._suppress_mass_update = False
# =============================================================================== # private # =============================================================================== def _wait_release(self): self._lock.release() # self._cond.notify() def _wait_lock(self, timeout): """ @param timeout: @return: """ with self._cond: current_time = start_time = time.time() while current_time < start_time + timeout: if self._lock.acquire(False): return True else: self._cond.wait(timeout - current_time + start_time) current_time = time.time() return False def _load_af_demag_configuration(self): self.use_af_demagnetization = False p = paths.af_demagnetization if os.path.isfile(p): try: yd = yload(p) except BaseException as e: self.warning_dialog( "AF Demagnetization unavailable. Syntax error in file. Error: {}".format( e ) ) return if not isinstance(yd, dict): self.warning_dialog( "AF Demagnetization unavailable. Syntax error in file" ) return self.use_af_demagnetization = yd.get("enabled", True) self.af_demag_threshold = yd.get("threshold", 1) def _do_af_demagnetization(self, target, setfunc): p = paths.af_demagnetization if os.path.isfile(p): try: yd = yload(p) except BaseException as e: self.warning( "AF Demagnetization unavailable. Syntax error in file. Error: {}".format( e ) ) return period = yd.get("period", None) if period is None: frequency = yd.get("frequency") if frequency is None: self.warning( "AF Demagnetization unavailable. " 'Need to specify "period" or "frequency" in "{}"'.format(p) ) return else: period = 1 / float(frequency) else: frequency = 1 / float(period) duration = yd.get("duration") if duration is None: duration = 5 self.debug("defaulting to duration={}".format(duration)) start_amplitude = yd.get("start_amplitude") if start_amplitude is None: self.warning( "AF Demagnetization unavailable. " 'Need to specify "start_amplitude" in "{}"'.format(p) ) return sx = arange(0.5 * period, duration, period) slope = start_amplitude / float(duration) dacs = slope * sx * sin(frequency * pi * sx) "Doing AF Demagnetization around target={}. " "duration={}, start_amplitude={}, period={}".format( target, duration, start_amplitude, period ) ) for dac in reversed(dacs): self.debug("set af dac raw:{} dac:{}".format(dac, target + dac)) setfunc(target + dac) time.sleep(period) else: self.warning( "AF Demagnetization unavailable. {} not a valid file".format(p) ) def _validate_mass_change(self, cm, m): ct = self.confirmation_threshold_mass move_ok = True if ct and abs(cm - m) > ct: move_ok = False "Requested move greater than threshold. Current={}, Request={}, Threshold={}".format( cm, m, ct ) ) if self.confirmation_dialog( "Requested magnet move is greater than threshold.\n" "Current Mass={}\n" "Requested Mass={}\n" "Threshold={}\n" "Are you sure you want to continue?".format(cm, m, ct) ): move_ok = True return move_ok def _mass_changed(self, old, new): if self._suppress_mass_update: return self.debug("mass changed old={}, new={}".format(old, new)) if self._validate_mass_change(old, new): self._set_mass(new) else: self.mass_change(old) def _set_mass(self, m): if self.detector: self.debug("setting mass {}".format(m)) dac = self.map_mass_to_dac(m, dac = self.spectrometer.correct_dac(self.detector, dac) self.dac = dac # =============================================================================== # property get/set # =============================================================================== def _validate_dac(self, d): return self._validate_float(d) def _get_dac(self): return self._dac def _set_dac(self, v): if v is not None: self.set_dac(v) def _validate_float(self, d): try: return float(d) except (ValueError, TypeError): return d def _validate_massmin(self, d): d = self._validate_float(d) if isinstance(d, float): if d > self.massmax: d = str(d) return d def _get_massmin(self): return self._massmin def _set_massmin(self, v): self._massmin = v def _validate_massmax(self, d): d = self._validate_float(d) if isinstance(d, float): if d < self.massmin: d = str(d) return d def _get_massmax(self): return self._massmax def _set_massmax(self, v): self._massmax = v # =============================================================================== # views # =============================================================================== def traits_view(self): from traitsui.api import View, Item, VGroup, HGroup, Spring, RangeEditor v = View( VGroup( VGroup( Item( "dac", editor=RangeEditor( low_name="dacmin", high_name="dacmax", format="%0.5f" ), ), Item( "mass", editor=RangeEditor( mode="slider", low_name="massmin", high_name="massmax", format="%0.3f", ), ), HGroup( Spring(springy=False, width=48), Item("massmin", width=-40), Spring(springy=False, width=138), Item("massmax", width=-55), show_labels=False, ), show_border=True, label="Control", ) ) ) return v
# ============= EOF =============================================