Source code for pychron.dvc.meta_repo

# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
import os
import shutil
from datetime import datetime

# ============= enthought library imports =======================
from traits.api import Bool
from uncertainties import ufloat

from pychron.core.helpers.datetime_tools import ISO_FORMAT_STR
from pychron.core.helpers.filetools import glob_list_directory, add_extension, \
    list_directory
from pychron.dvc import dvc_dump, dvc_load, repository_path, list_frozen_productions
from pychron.dvc.meta_object import IrradiationGeometry, Chronology, Production, cached, Gains, LoadGeometry
from pychron.git_archive.repo_manager import GitRepoManager
from pychron.paths import paths, r_mkdir
from pychron.pychron_constants import INTERFERENCE_KEYS, RATIO_KEYS, DEFAULT_MONITOR_NAME, DATE_FORMAT, NULL_STR


def irradiation_geometry(name):
    p = os.path.join(paths.meta_root, 'irradiation_holders', add_extension(name))
    return IrradiationGeometry(p)


def irradiation_geometry_holes(name):
    geom = irradiation_geometry(name)
    return geom.holes


def irradiation_chronology(name):
    p = os.path.join(paths.meta_root, name, 'chronology.txt')
    return Chronology(p)


def dump_chronology(path, doses):
    if doses is None:
        doses = []

    with open(path, 'w') as wfile:
        for p, s, e in doses:
            if not isinstance(s, str):
                s = s.strftime(ISO_FORMAT_STR)
            if not isinstance(s, str):
                s = s.strftime(ISO_FORMAT_STR)
            if not isinstance(p, str):
                p = '{:0.3f}'.format(p)

            line = '{},{},{}\n'.format(p, s, e)
            wfile.write(line)


def gain_path(name):
    root = os.path.join(paths.meta_root, 'spectrometers')
    if not os.path.isdir(root):
        os.mkdir(root)

    p = os.path.join(root, add_extension('{}.gain'.format(name), '.json'))
    return p


def get_frozen_productions(repo):
    prods = {}
    for name, path in list_frozen_productions(repo):
        prods[name] = Production(path)

    return prods


def get_frozen_flux(repo, irradiation):
    path = repository_path(repo, '{}.json'.format(irradiation))

    fd = {}
    if path:
        fd = dvc_load(path)
        for fi in fd.values():
            fi['j'] = ufloat(*fi['j'], tag='J')
    return fd


[docs]class MetaRepo(GitRepoManager): clear_cache = Bool def get_monitor_info(self, irrad, level): age, decay = NULL_STR, NULL_STR positions = self._get_level_positions(irrad, level) # assume all positions have same monitor_age/decay constant. Not strictly true. Potential some ambiquity but # will not be resolved now 8/26/18. if positions: position = positions[0] opt = position.get('options') if opt: age = position.get('monitor_age', NULL_STR) decayd = position.get('decay_constants') if decayd: decay = decayd.get('lambda_k_total', NULL_STR) return str(age), str(decay) def get_molecular_weights(self): p = os.path.join(paths.meta_root, 'molecular_weights.json') return dvc_load(p) def update_molecular_weights(self, wts, commit=False): p = os.path.join(paths.meta_root, 'molecular_weights.json') dvc_dump(wts, p) self.add(p, commit=commit) def add_unstaged(self, *args, **kw): super(MetaRepo, self).add_unstaged(self.path, **kw) def save_gains(self, ms, gains_dict): p = gain_path(ms) dvc_dump(gains_dict, p) if self.add_paths(p): self.commit('Updated gains') def update_script(self, rootname, name, path_or_blob): self._update_text(os.path.join('scripts', rootname.lower()), name, path_or_blob) def update_experiment_queue(self, rootname, name, path_or_blob): self._update_text(os.path.join('experiments', rootname.lower()), name, path_or_blob) def update_level_production(self, irrad, name, prname, note=None): prname = prname.replace(' ', '_') pathname = add_extension(prname, '.json') src = os.path.join(paths.meta_root, irrad, 'productions', pathname) if os.path.isfile(src): self.update_productions(irrad, name, prname, note=note) else: self.warning_dialog('Invalid production name'.format(prname)) def add_production_to_irradiation(self, irrad, name, params, add=True, commit=False): self.debug('adding production {} to irradiation={}'.format(name, irrad)) p = os.path.join(paths.meta_root, irrad, 'productions', add_extension(name, '.json')) prod = Production(p, new=not os.path.isfile(p)) prod.update(params) prod.dump() if add: self.add(p, commit=commit) def add_production(self, irrad, name, obj, commit=False, add=True): p = self.get_production(irrad, name, force=True) p.attrs = attrs = INTERFERENCE_KEYS + RATIO_KEYS kef = lambda x: '{}_err'.format(x) if obj: def values(): return ((k, getattr(obj, k), kef(k), getattr(obj, kef(k))) for k in attrs) else: def values(): return ((k, 0, kef(k), 0) for k in attrs) for k, v, ke, e in values(): setattr(p, k, v) setattr(p, ke, e) p.dump() if add: self.add(p.path, commit=commit) def update_production(self, prod, irradiation=None): ip = self.get_production(prod.name) self.debug('saving production {}'.format(prod.name)) params = prod.get_params() for k, v in params.items(): self.debug('setting {}={}'.format(k, v)) setattr(ip, k, v) ip.note = prod.note self.add(ip.path, commit=False) self.commit('updated production {}'.format(prod.name)) def update_productions(self, irrad, level, production, note=None, add=True): p = os.path.join(paths.meta_root, irrad, 'productions.json') obj = dvc_load(p) obj['note'] = str(note) or '' if level in obj: if obj[level] != production: self.debug('setting production to irrad={}, level={}, prod={}'.format(irrad, level, production)) obj[level] = production dvc_dump(obj, p) if add: self.add(p, commit=False) else: obj[level] = production dvc_dump(obj, p) if add: self.add(p, commit=False) def set_identifier(self, irradiation, level, pos, identifier): p = self.get_level_path(irradiation, level) jd = dvc_load(p) positions = self._get_level_positions(irradiation, level) d = next((p for p in positions if p['position'] == pos), None) if d: d['identifier'] = identifier jd['positions'] = positions dvc_dump(jd, p) self.add(p, commit=False) def get_level_path(self, irrad, level): return os.path.join(paths.meta_root, irrad, '{}.json'.format(level)) def add_level(self, irrad, level, add=True): p = self.get_level_path(irrad, level) lv = dict(z=0, positions=[]) dvc_dump(lv, p) if add: self.add(p, commit=False) def add_chronology(self, irrad, doses, add=True): p = os.path.join(paths.meta_root, irrad, 'chronology.txt') dump_chronology(p, doses) if add: self.add(p, commit=False) def add_irradiation(self, name): p = os.path.join(paths.meta_root, name) if not os.path.isdir(p): os.mkdir(p) def add_position(self, irradiation, level, pos, add=True): p = self.get_level_path(irradiation, level) jd = dvc_load(p) if isinstance(jd, list): positions = jd z = 0 else: positions = jd.get('positions', []) z = jd.get('z', 0) pd = next((p for p in positions if p['position'] == pos), None) if pd is None: positions.append({'position': pos, 'decay_constants': {}}) dvc_dump({'z': z, 'positions': positions}, p) if add: self.add(p, commit=False) def add_irradiation_geometry_file(self, path): try: holder = IrradiationGeometry(path) if not holder.holes: raise BaseException except BaseException: self.warning_dialog('Invalid Irradiation Geometry file. Failed to import') return self.smart_pull() root = os.path.join(paths.meta_root, 'irradiation_holders') if not os.path.isdir(root): os.mkdir(root) name = os.path.basename(path) dest = os.path.join(root, name) shutil.copyfile(path, dest) self.add(dest, commit=False) self.commit('added irradiation geometry file {}'.format(name)) self.push() self.information_dialog('Irradiation Geometry "{}" added'.format(name)) # p = os.path.join(root, add_extension(name)) # def add_irradiation_holder(self, name, blob, commit=False, overwrite=False, add=True): # root = os.path.join(paths.meta_root, 'irradiation_holders') # if not os.path.isdir(root): # os.mkdir(root) # p = os.path.join(root, add_extension(name)) # # if not os.path.isfile(p) or overwrite: # with open(p, 'w') as wfile: # holes = list(iter_geom(blob)) # n = len(holes) # wfile.write('{},0.0175\n'.format(n)) # for idx, (x, y, r) in holes: # wfile.write('{:0.4f},{:0.4f},{:0.4f}\n'.format(x, y, r)) # if add: # self.add(p, commit=commit) def get_load_holders(self): p = os.path.join(paths.meta_root, 'load_holders') return list_directory(p, extension='.txt', remove_extension=True) def add_load_holder(self, name, path_or_txt, commit=False, add=True): p = os.path.join(paths.meta_root, 'load_holders', name) if os.path.isfile(path_or_txt): shutil.copyfile(path_or_txt, p) else: with open(p, 'w') as wfile: wfile.write(path_or_txt) if add: self.add(p, commit=commit) def update_level_z(self, irradiation, level, z): p = self.get_level_path(irradiation, level) obj = dvc_load(p) try: add = obj['z'] != z obj['z'] = z except TypeError: obj = {'z': z, 'positions': obj} add = True dvc_dump(obj, p) if add: self.add(p, commit=False) def remove_irradiation_position(self, irradiation, level, hole): p = self.get_level_path(irradiation, level) jd = dvc_load(p) if jd: if isinstance(jd, list): positions = jd z = 0 else: positions = jd['positions'] z = jd['z'] npositions = [ji for ji in positions if not ji['position'] == hole] obj = {'z': z, 'positions': npositions} dvc_dump(obj, p) self.add(p, commit=False) def update_fluxes(self, irradiation, level, j, e, add=True): p = self.get_level_path(irradiation, level) jd = dvc_load(p) if isinstance(jd, list): positions = jd else: positions = jd.get('positions') if positions: for ip in positions: ip['j'] = j ip['j_err'] = e dvc_dump(jd, p) if add: self.add(p, commit=False) def update_flux(self, irradiation, level, pos, identifier, j, e, mj, me, decay=None, position_jerr=None, analyses=None, options=None, add=True): if options is None: options = {} if decay is None: decay = {} if analyses is None: analyses = [] p = self.get_level_path(irradiation, level) jd = dvc_load(p) if isinstance(jd, list): positions = jd z = 0 else: positions = jd.get('positions', []) z = jd.get('z', 0) npos = {'position': pos, 'j': j, 'j_err': e, 'mean_j': mj, 'mean_j_err': me, 'position_jerr': position_jerr, 'decay_constants': decay, 'identifier': identifier, 'options': options, 'analyses': [{'uuid': ai.uuid, 'record_id': ai.record_id, 'is_omitted': ai.is_omitted()} for ai in analyses]} if positions: added = any((ji['position'] == pos for ji in positions)) npositions = [ji if ji['position'] != pos else npos for ji in positions] if not added: npositions.append(npos) else: npositions = [npos] obj = {'z': z, 'positions': npositions} dvc_dump(obj, p) if add: self.add(p, commit=False) def update_chronology(self, name, doses): p = os.path.join(paths.meta_root, name, 'chronology.txt') dump_chronology(p, doses) self.add(p, commit=False) def get_irradiation_holder_names(self): return glob_list_directory(os.path.join(paths.meta_root, 'irradiation_holders'), extension='.txt', remove_extension=True)
[docs] def get_cocktail_irradiation(self): """ example cocktail.json { "chronology": "2016-06-01 17:00:00", "j": 4e-4, "j_err": 4e-9 } :return: """ p = os.path.join(paths.meta_root, 'cocktail.json') ret = dvc_load(p) nret = {} if ret: lines = ['1.0, {}, {}'.format(ret['chronology'], ret['chronology'])] c = Chronology.from_lines(lines) nret['chronology'] = c nret['flux'] = ufloat(ret['j'], ret['j_err']) return nret
def get_default_productions(self): p = os.path.join(paths.meta_root, 'reactors.json') if not os.path.isfile(p): with open(p, 'w') as wfile: from pychron.file_defaults import REACTORS_DEFAULT wfile.write(REACTORS_DEFAULT) return dvc_load(p) def get_flux_positions(self, irradiation, level): positions = self._get_level_positions(irradiation, level) return positions def get_flux(self, irradiation, level, position): positions = self.get_flux_positions(irradiation, level) return self.get_flux_from_positions(position, positions) def get_flux_from_positions(self, position, positions): j, je, pe, lambda_k = 0, 0, 0, None monitor_name, monitor_material, monitor_age = DEFAULT_MONITOR_NAME, 'sanidine', ufloat(28.201, 0) if positions: pos = next((p for p in positions if p['position'] == position), None) if pos: j, je, pe = pos.get('j', 0), pos.get('j_err', 0), pos.get('position_jerr', 0) dc = pos.get('decay_constants') if dc: # this was a temporary fix and likely can be removed if isinstance(dc, float): v, e = dc, 0 else: v, e = dc.get('lambda_k_total', 0), dc.get('lambda_k_total_error', 0) lambda_k = ufloat(v, e) mon = pos.get('monitor') if mon: monitor_name = mon.get('name', DEFAULT_MONITOR_NAME) sa = mon.get('age', 28.201) se = mon.get('error', 0) monitor_age = ufloat(sa, se, tag='monitor_age') monitor_material = mon.get('material', 'sanidine') fd = {'j': ufloat(j, je, tag='J'), 'position_jerr': pe, 'lambda_k': lambda_k, 'monitor_name': monitor_name, 'monitor_material': monitor_material, 'monitor_age': monitor_age} return fd def get_gains(self, name): g = self.get_gain_obj(name) return g.gains def save_sensitivities(self, sens): ps = [] for k, v in sens.items(): root = os.path.join(paths.meta_root, 'spectrometers') p = os.path.join(root, add_extension('{}.sens'.format(k), '.json')) dvc_dump(v, p) ps.append(p) if self.add_paths(ps): self.commit('Updated sensitivity') def get_sensitivities(self): specs = {} root = os.path.join(paths.meta_root, 'spectrometers') for p in list_directory(root): if p.endswith('.sens.json'): name = p.split('.')[0] p = os.path.join(root, p) obj = dvc_load(p) specs[name] = obj for r in obj: if r['create_date']: r['create_date'] = datetime.strptime(r['create_date'], DATE_FORMAT) return specs def get_sensitivity(self, name): sens = self.get_sensitivities() spec = sens.get(name) v = 1 if spec: v = spec.get('sensitivity', 1) return v @cached('clear_cache') def get_gain_obj(self, name, **kw): p = gain_path(name) return Gains(p) # @cached('clear_cache') def get_production(self, irrad, level, **kw): path = os.path.join(paths.meta_root, irrad, 'productions.json') obj = dvc_load(path) pname = obj.get(level, '') p = os.path.join(paths.meta_root, irrad, 'productions', add_extension(pname, ext='.json')) ip = Production(p) # print 'new production id={}, name={}, irrad={}, level={}'.format(id(ip), pname, irrad, level) return pname, ip # @cached('clear_cache') def get_chronology(self, name, **kw): chron = irradiation_chronology(name) chron.use_irradiation_endtime = self.application.get_boolean_preference( 'pychron.arar.constants.use_irradiation_endtime', False) return chron @cached('clear_cache') def get_irradiation_holder_holes(self, name, **kw): return irradiation_geometry_holes(name) @cached('clear_cache') def get_load_holder_holes(self, name, **kw): p = os.path.join(paths.meta_root, 'load_holders', add_extension(name)) holder = LoadGeometry(p) return holder.holes @property def sensitivity_path(self): return os.path.join(paths.meta_root, 'sensitivity.json') # private def _get_level_positions(self, irrad, level): p = self.get_level_path(irrad, level) obj = dvc_load(p) if isinstance(obj, list): positions = obj else: positions = obj.get('positions', []) return positions def _update_text(self, tag, name, path_or_blob): if not name: self.debug('cannot update text with no name. tag={} name={}'.format(tag, name)) return root = os.path.join(paths.meta_root, tag) if not os.path.isdir(root): r_mkdir(root) p = os.path.join(root, name) if os.path.isfile(path_or_blob): shutil.copyfile(path_or_blob, p) else: with open(p, 'w') as wfile: wfile.write(path_or_blob) self.add(p, commit=False)
# ============= EOF =============================================