Source code for pychron.spectrometer.field_table

# ===============================================================================
# Copyright 2014 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
import csv
import hashlib
import os
import shutil

import six
from numpy import asarray, array, nonzero, polyval
from scipy.optimize import leastsq, brentq
from traits.api import HasTraits, List, Str, Dict, Bool, Property, CFloat

from pychron.core.helpers.filetools import add_extension, backup
from pychron.loggable import Loggable
from pychron.paths import paths
from pychron.pychron_constants import NULL_STR, STARTUP_MESSAGE_POSITION
from pychron.spectrometer import set_mftable_name, get_mftable_name


def get_detector_name(det):
    if not isinstance(det, (str, six.text_type)):
        det = det.name
    return det


def least_squares(func, xs, ys, initial_guess):
    xs, ys = asarray(xs), asarray(ys)

    def errfunc(p, x, v):
        return func(p, x) - v

    ret, info = leastsq(errfunc, initial_guess, args=(xs, ys))
    return ret


def format_dac(dac):
    return "{:0.5f}".format(dac) if dac != NULL_STR else ""


class FieldItem(HasTraits):
    isotope = Str

    def to_csv(self, keys, fmt):
        return [self.isotope] + [fmt(getattr(self, k)) for k in keys]


[docs]class FieldTable(Loggable): """ map a voltage to a mass """ items = List molweights = Dict spectrometer_name = Str use_local_archive = Bool # use_db_archive = Bool path = Property mass_cal_func = "parabolic" # path = Property(depends_on='_path_dirty') # _path_dirty = Event def __init__(self, bind=True, *args, **kw): super(FieldTable, self).__init__(*args, **kw) # self.db = None self._mftable = None self._detectors = None self._test_path = None if bind: self.bind_preferences() def initialize(self, molweights): self.molweights = molweights p = self.path if not os.path.isfile(p): self.warning_dialog( "No Magnet Field Table. Create {}".format(p), position=STARTUP_MESSAGE_POSITION, ) else: self.load_table(load_items=True) def bind_preferences(self): from apptools.preferences.preference_binding import bind_preference prefid = "pychron.spectrometer" bind_preference( self, "use_local_archive", "{}.use_local_mftable_archive".format(prefid) ) # bind_preference(self, 'use_db_archive', # '{}.use_db_mftable_archive'.format(prefid)) def backup(self): backup(self.path, paths.mftable_backup_dir) def map_dac_to_mass(self, dac, detname): detname = get_detector_name(detname) d = self._get_mftable() _, xs, ys, p = d[detname] if self.polynominal_mass_func: def func(x, *args): c = list(p) c[-1] -= dac return polyval(c, x) try: mass = brentq(func, 0, 200) return mass except ValueError as e: self.debug( "DAC does not map to an isotope. DAC={}, Detector={}".format( dac, detname ) ) else: try: idx = ys.index(dac) return xs[idx] except IndexError: self.debug( "DAC does not map to an isotope. DAC={}, Detector={}".format( dac, detname ) ) def map_mass_to_dac(self, mass, detname): if isinstance(mass, str): mass = self.molweights[mass] self.debug('Mapping mass to dac mass func: "{}"'.format(self.mass_cal_func)) detname = get_detector_name(detname) d = self._get_mftable() _, xs, ys, p = d[detname] if self.polynominal_mass_func: self.debug("{} map mass coeffs = {}".format(detname, p)) dac = polyval(p, mass) else: self.debug("using discrete mass mapping") dac = self.get_dac(detname, mass) return dac def get_dac(self, det, mass): det = get_detector_name(det) d = self._get_mftable() dac = next((d for i, m, d in zip(*d[det][:3]) if abs(m - mass) < 0.15), None) return dac # isotope = next((i for i, m in self.molweights.iteritems() if abs(m-mass)<1e-5), None) # if isotope is not None: # isos, xs, ys = map(array, d[det][:3]) # isos, mws, dacs = map(array, d[det][:3]) # refindex = min(nonzero()) # refindex = min(nonzero(isos == isotope)[0]) # return ys[refindex] @property def polynominal_mass_func(self): return self.mass_cal_func in ("linear", "parabolic", "cubic")
[docs] def update_field_table( self, det, isotope, dac, message="", save=True, report=False, update_others=True ): """ dac needs to be in axial units update_others. If false only """ det = get_detector_name(det) self.info( "update mftable {} {} {} message={}".format(det, isotope, dac, message) ) d = self._get_mftable() # isos, xs, ys = map(array, d[det][:3]) isos, xs, ys = d[det][:3] isos = array(isos) try: refindex = min(nonzero(isos == isotope)[0]) delta = dac - ys[refindex] # need to calculate all ys # using simple linear offset # ys += delta for k, (isoks, mws, dacs, _) in d.items(): if not update_others and k != det: continue ndacs = [yi + delta if yi != NULL_STR else NULL_STR for yi in dacs] xs, ny = self._clean_dacs(mws, ndacs) initial_guess = self._get_initial_guess(ny, xs) if initial_guess: p = least_squares(polyval, xs, ny, initial_guess) else: p = None d[k] = isoks, mws, ndacs, p if save: self.dump(isos, d, message) if report: self.print_table(isos, d) except ValueError: import traceback e = traceback.format_exc() print(e) self.debug("Magnet update field table {}".format(e))
def set_path_name(self, name): if name and self.path != name: self.path = name self.info("Using MFTable {}".format(self.path)) self.load_table() def get_table(self): mt = self._get_mftable() return mt def load(self): pass def save(self): detectors = self._detectors p = self.path p = "{}.temp".format(p) fmt = lambda x: "{:0.5f}".format(x) with open(p, "w") as f: writer = csv.writer(f) writer.writerow([self.mass_cal_func]) writer.writerow(["iso"] + detectors) for fi in self.items: writer.writerow(fi.to_csv(detectors, fmt)) self._set_mftable_hash(p) self._add_to_archive(p, message="manual modification") def dump(self, isos, d, message): detectors = self._detectors p = self.path self.debug("dump mftable to {}".format(p)) with open(p, "w", newline="") as f: writer = csv.writer(f) writer.writerow([self.mass_cal_func]) writer.writerow(["iso"] + detectors) for i, iso in enumerate(isos): a = [iso] for hi in detectors: iso, xs, ys, _ = d[hi] fdac = format_dac(ys[i]) a.append(fdac) writer.writerow(a) self._set_mftable_hash(p) self._add_to_archive(p, message) @property def detector_names(self): return self._detectors # @property # def mftable_path(self): # return os.path.join(paths.spectrometer_dir, 'mftable.csv') def print_table(self, isos, d): detectors = self.detector_names for i, iso in enumerate(isos): a = [iso] for hi in detectors: iso, xs, ys, _ = d[hi] a.append(format_dac(ys[i])) print(a) @property def mftable_archive_path(self): return os.path.join( paths.spectrometer_dir, "{}_mftable_archive".format(self.spectrometer_name) )
[docs] def load_table(self, path=None, load_items=False): """ mftable format- first line is a header followed by Isotope, Dac_i, Dac_j,.... Dac_i is the magnet dac setting to center Isotope on detector i example:: iso, H2, H1, AX, L1, L2, CDD Ar40,5.78790,5.895593,6.00675,6.12358,6.24510,6.35683 Ar39,5.89692,5.788276,5.89692,5.89692,5.89692,5.89692 Ar36,5.56072,5.456202,5.56072,5.56072,5.56072,5.56072 """ if path is None: path = self.path if not os.path.isfile(path): self.debug('Using mftable located at "{}" does not exist'.format(path)) return self.debug('Using mftable located at "{}"'.format(path)) mws = self.molweights self._set_mftable_hash(path) items = [] with open(path, "U") as f: reader = csv.reader(f) table = [] mass_func = None line0 = next(reader) if line0[0].strip() == "iso": detline = line0 else: mass_func = line0[0].strip() detline = next(reader) detectors = [d.strip() for d in detline[1:]] if mass_func is None: self.warning("Using default ") self.mass_cal_func = mass_func for line in reader: iso = line[0] try: mw = mws[iso] except KeyError as e: self.warning('"{}" not in molweights {}'.format(iso, mws)) continue # dacs = map(float, line[1:]) dacs = [] for di in line[1:]: try: di = float(di) except ValueError: di = NULL_STR dacs.append(di) if load_items: fi = FieldItem(isotope=iso) for di, v in zip(detectors, dacs): fi.add_trait(di, CFloat(v)) items.append(fi) row = [iso, mw] + dacs table.append(row) if load_items: self._report_mftable(detectors, items) self.items = items table = list(zip(*table)) isos, mws = list(table[0]), list(table[1]) d = {} for i, k in enumerate(detectors): ys = table[2 + i] xs, dacs = self._clean_dacs(mws, ys) initial_guess = self._get_initial_guess(dacs, xs) if initial_guess: try: c = least_squares( polyval, xs, dacs, initial_guess=initial_guess ) except TypeError as e: self.warning("load mftable {}".format(e)) c = (0, 0, ys[0]) else: c = None d[k] = (isos, mws, ys, c) self._mftable = d # self._mftable={k: (isos, mws, table[2 + i], ) # for i, k in enumerate(detectors)} self._detectors = detectors
def _clean_dacs(self, xx, dacs): """ return xs, clean_dacs @param dacs: @return: """ xs, dacs = list(zip(*((xx[i], y) for i, y in enumerate(dacs) if y != NULL_STR))) return xs, dacs def _get_initial_guess(self, y, x): initial_guess = None mass_func = self.mass_cal_func self.debug("get initial guess {}".format(mass_func)) if mass_func == "linear": initial_guess = [0, y[0]] elif mass_func == "parabolic": initial_guess = [0, 0, y[0]] elif mass_func == "cubic": initial_guess = [0, 0, 0, y[0]] return initial_guess def _report_mftable(self, detectors, items): self.debug("============ MFtable ===========") self.debug( "{:<8s} {}".format( "Isotope", "".join(["{:<7s}".format(di) for di in detectors]) ) ) for it in items: dd = [getattr(it, di) for di in detectors] vs = ["{:0.4f}".format(di) if di != NULL_STR else NULL_STR for di in dd] self.debug("{:<8s} {}".format(it.isotope, " ".join(vs))) self.debug("================================") def _get_mftable(self): if not self._mftable or not self._check_mftable_hash(): self.debug("using mftable at {}".format(self.path)) self.load_table() return self._mftable def _check_mftable_hash(self): """ return True if mftable externally modified """ # p = paths.mftable current_hash = self._make_hash(self.path) return self._mftable_hash != current_hash def _make_hash(self, p): if p and os.path.isfile(p): with open(p, "rb") as rfile: return hashlib.md5(rfile.read()) def _set_mftable_hash(self, p): self._mftable_hash = self._make_hash(p) def _add_to_archive(self, p, message): # if self.use_db_archive: # if self.db: # self.info('db archiving mftable') # with open(p, 'r') as rfile: # self.db.add_mftable(self.spectrometer_name, rfile.read()) # else: # self.debug('no db instance available for archiving') if self.use_local_archive: try: from pychron.git_archive.git_archive import GitArchive except ImportError: self.warning( "GitPython >=0.3.2RC1 required for local MFTable Archiving" ) return archive = GitArchive(self.mftable_archive_path) # copy dest = os.path.join(self.mftable_archive_path, os.path.basename(p)) shutil.copyfile(p, dest) archive.add(dest, msg=message) archive.close() self.info("locally archiving mftable") def _set_path(self, name): set_mftable_name(name) self.dirty = True # @cached_property def _get_path(self): p = self._test_path if not p: name = get_mftable_name() p = os.path.join(paths.mftable_dir, add_extension(name, ".csv")) return p def _name_to_path(self, name): if name: name = os.path.join(paths.mftable_dir, add_extension(name, ".csv")) return name or ""
# # def _set_path(self, v): # self._path = self._name_to_path(v) # # def _get_path(self): # if self._path: # p = self._path # else: # p = paths.mftable # return p # ============= EOF =============================================