Source code for pychron.dvc.meta_repo
# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
import os
import shutil
from datetime import datetime
from traits.api import Bool
from uncertainties import ufloat
from pychron.core.helpers.datetime_tools import ISO_FORMAT_STR
from pychron.core.helpers.filetools import (
glob_list_directory,
add_extension,
list_directory,
)
from pychron.dvc import dvc_dump, dvc_load, repository_path, list_frozen_productions
from pychron.dvc.meta_object import (
IrradiationGeometry,
Chronology,
Production,
cached,
Gains,
LoadGeometry,
MetaObjectException,
)
from pychron.git_archive.repo_manager import GitRepoManager
from pychron.paths import paths, r_mkdir
from pychron.pychron_constants import (
INTERFERENCE_KEYS,
RATIO_KEYS,
DEFAULT_MONITOR_NAME,
DATE_FORMAT,
NULL_STR,
)
# ============= enthought library imports =======================
def irradiation_geometry(name):
p = os.path.join(paths.meta_root, "irradiation_holders", add_extension(name))
return IrradiationGeometry(p)
def irradiation_geometry_holes(name):
geom = irradiation_geometry(name)
return geom.holes
def irradiation_chronology(name, allow_null=False):
p = os.path.join(paths.meta_root, name, "chronology.txt")
return Chronology(p, allow_null=allow_null)
def dump_chronology(path, doses):
if doses is None:
doses = []
with open(path, "w") as wfile:
for p, s, e in doses:
if not isinstance(s, str):
s = s.strftime(ISO_FORMAT_STR)
if not isinstance(s, str):
s = s.strftime(ISO_FORMAT_STR)
if not isinstance(p, str):
p = "{:0.3f}".format(p)
line = "{},{},{}\n".format(p, s, e)
wfile.write(line)
def gain_path(name):
root = os.path.join(paths.meta_root, "spectrometers")
if not os.path.isdir(root):
os.mkdir(root)
p = os.path.join(root, add_extension("{}.gain".format(name), ".json"))
return p
def get_frozen_productions(repo):
prods = {}
for name, path in list_frozen_productions(repo):
prods[name] = Production(path)
return prods
def get_frozen_flux(repo, irradiation):
path = repository_path(repo, "{}.json".format(irradiation))
fd = {}
if path:
fd = dvc_load(path)
for fi in fd.values():
fi["j"] = ufloat(*fi["j"], tag="J")
return fd
[docs]class MetaRepo(GitRepoManager):
clear_cache = Bool
def get_correlation_ellipses(self):
p = os.path.join(paths.meta_root, "correlation_ellipses.json")
return dvc_load(p)
def get_monitor_info(self, irrad, level):
age, decay = NULL_STR, NULL_STR
positions = self._get_level_positions(irrad, level)
# assume all positions have same monitor_age/decay constant. Not strictly true. Potential some ambiquity but
# will not be resolved now 8/26/18.
if positions:
position = positions[0]
opt = position.get("options")
if opt:
age = position.get("monitor_age", NULL_STR)
decayd = position.get("decay_constants")
if decayd:
decay = decayd.get("lambda_k_total", NULL_STR)
return str(age), str(decay)
def add_unstaged(self, *args, **kw):
super(MetaRepo, self).add_unstaged(self.path, **kw)
def save_gains(self, ms, gains_dict):
p = gain_path(ms)
dvc_dump(gains_dict, p)
if self.add_paths(p):
self.commit("Updated gains")
def update_script(self, rootname, name, path_or_blob):
self._update_text(os.path.join("scripts", rootname.lower()), name, path_or_blob)
def update_experiment_queue(self, rootname, name, path_or_blob):
self._update_text(
os.path.join("experiments", rootname.lower()), name, path_or_blob
)
def update_level_production(self, irrad, name, prname, note=None):
prname = prname.replace(" ", "_")
pathname = add_extension(prname, ".json")
src = os.path.join(paths.meta_root, irrad, "productions", pathname)
if os.path.isfile(src):
self.update_productions(irrad, name, prname, note=note)
else:
self.warning_dialog("Invalid production name".format(prname))
def update_level_monitor(
self, irradiation, level, monitor_name, monitor_material, monitor_age, lambda_k
):
obj, path = self.get_level_obj(irradiation, level)
positions = self._get_level_positions(irradiation, level)
options = {
"monitor_name": monitor_name,
"monitor_material": monitor_material,
"monitor_age": monitor_age,
}
decay_constants = {"lambda_k_total": lambda_k, "lambda_k_total_error": 0}
for p in positions:
p["options"] = options
p["decay_constants"] = decay_constants
obj["positions"] = positions
dvc_dump(obj, path)
self.add(path)
def add_production_to_irradiation(
self, irrad, name, params, add=True, commit=False
):
self.debug("adding production {} to irradiation={}".format(name, irrad))
p = os.path.join(
paths.meta_root, irrad, "productions", add_extension(name, ".json")
)
if isinstance(params, Production):
prod = params
prod.path = p
else:
prod = Production(p, new=not os.path.isfile(p))
prod.update(params)
prod.dump()
if add:
self.add(p, commit=commit)
def add_production(self, irrad, name, obj, commit=False, add=True):
p = self.get_production(irrad, name, force=True)
p.attrs = attrs = INTERFERENCE_KEYS + RATIO_KEYS
kef = lambda x: "{}_err".format(x)
if obj:
def values():
return (
(k, getattr(obj, k), kef(k), getattr(obj, kef(k))) for k in attrs
)
else:
def values():
return ((k, 0, kef(k), 0) for k in attrs)
for k, v, ke, e in values():
setattr(p, k, v)
setattr(p, ke, e)
p.dump()
if add:
self.add(p.path, commit=commit)
def update_production(self, prod, irradiation=None):
ip = self.get_production(prod.name)
self.debug("saving production {}".format(prod.name))
params = prod.get_params()
for k, v in params.items():
self.debug("setting {}={}".format(k, v))
setattr(ip, k, v)
ip.note = prod.note
self.add(ip.path, commit=False)
self.commit("updated production {}".format(prod.name))
def update_productions(self, irrad, level, production, note=None, add=True):
p = os.path.join(paths.meta_root, irrad, "productions.json")
obj = dvc_load(p)
obj["note"] = str(note) or ""
if level in obj:
if obj[level] != production:
self.debug(
"setting production to irrad={}, level={}, prod={}".format(
irrad, level, production
)
)
obj[level] = production
dvc_dump(obj, p)
if add:
self.add(p, commit=False)
else:
obj[level] = production
dvc_dump(obj, p)
if add:
self.add(p, commit=False)
def set_identifier(self, irradiation, level, pos, identifier):
# p = self.get_level_path(irradiation, level)
# jd = dvc_load(p)
jd, p = self.get_level_obj(irradiation, level)
positions = self._get_level_positions(irradiation, level)
d = next((p for p in positions if p["position"] == pos), None)
if d:
d["identifier"] = identifier
jd["positions"] = positions
dvc_dump(jd, p)
self.add(p, commit=False)
def get_level_path(self, irrad, level):
return os.path.join(paths.meta_root, irrad, "{}.json".format(level))
def add_level(self, irrad, level, add=True):
p = self.get_level_path(irrad, level)
lv = dict(z=0, positions=[])
dvc_dump(lv, p)
if add:
self.add(p, commit=False)
def add_chronology(self, irrad, doses, add=True):
p = os.path.join(paths.meta_root, irrad, "chronology.txt")
dump_chronology(p, doses)
if add:
self.add(p, commit=False)
def add_irradiation(self, name):
p = os.path.join(paths.meta_root, name)
if not os.path.isdir(p):
os.mkdir(p)
def add_position(self, irradiation, level, pos, add=True):
p = self.get_level_path(irradiation, level)
jd = dvc_load(p)
if isinstance(jd, list):
positions = jd
z = 0
else:
positions = jd.get("positions", [])
z = jd.get("z", 0)
pd = next((p for p in positions if p["position"] == pos), None)
if pd is None:
positions.append({"position": pos, "decay_constants": {}})
dvc_dump({"z": z, "positions": positions}, p)
if add:
self.add(p, commit=False)
def add_irradiation_geometry_file(self, path):
try:
holder = IrradiationGeometry(path)
if not holder.holes:
raise BaseException
except BaseException:
self.warning_dialog("Invalid Irradiation Geometry file. Failed to import")
return
self.add_geometry_file(path)
[docs] def make_geometry_file(self, name, holes):
"""
holes = [(x,y,r,id), ]
:param holes:
:return:
"""
root = os.path.join(paths.meta_root, "irradiation_holders")
if not os.path.isdir(root):
os.mkdir(root)
path = os.path.join(root, "{}.txt".format(name))
if not os.path.isfile(path):
with open(path, "w") as wfile:
lines = [",".join(map(str, h)) for h in holes]
wfile.writelines(lines)
self.add_geometry_file(path)
def add_geometry_file(self, path):
self.smart_pull()
root = os.path.join(paths.meta_root, "irradiation_holders")
if not os.path.isdir(root):
os.mkdir(root)
name = os.path.basename(path)
dest = os.path.join(root, name)
try:
shutil.copyfile(path, dest)
except shutil.SameFileError:
pass
self.add(dest, commit=False)
self.commit("added irradiation geometry file {}".format(name))
self.push()
self.information_dialog('Irradiation Geometry "{}" added'.format(name))
def get_load_holders(self):
p = os.path.join(paths.meta_root, "load_holders")
return list_directory(p, extension=".txt", remove_extension=True)
def add_load_holder(self, name, path_or_txt, commit=False, add=True):
p = os.path.join(paths.meta_root, "load_holders", name)
if os.path.isfile(path_or_txt):
shutil.copyfile(path_or_txt, p)
else:
with open(p, "w") as wfile:
wfile.write(path_or_txt)
if add:
self.add(p, commit=commit)
def update_level_z(self, irradiation, level, z):
# p = self.get_level_path(irradiation, level)
# obj = dvc_load(p)
obj, p = self.get_level_obj(irradiation, level)
try:
add = obj["z"] != z
obj["z"] = z
except TypeError:
obj = {"z": z, "positions": obj}
add = True
dvc_dump(obj, p)
if add:
self.add(p, commit=False)
def remove_irradiation_position(self, irradiation, level, hole):
# p = self.get_level_path(irradiation, level)
# jd = dvc_load(p)
jd, p = self.get_level_obj(irradiation, level)
if jd:
if isinstance(jd, list):
positions = jd
z = 0
else:
positions = jd["positions"]
z = jd["z"]
npositions = [ji for ji in positions if not ji["position"] == hole]
obj = {"z": z, "positions": npositions}
dvc_dump(obj, p)
self.add(p, commit=False)
def new_flux_positions(self, irradiation, level, positions, add=True):
p = self.get_level_path(irradiation, level)
obj = {"positions": positions, "z": 0}
dvc_dump(obj, p)
if add:
self.add(p, commit=False)
def update_flux_simple(self, irradiation, level, j, e, add=True):
# p = self.get_level_path(irradiation, level)
# jd = dvc_load(p)
jd, p = self.get_level_obj(irradiation, level)
if isinstance(jd, list):
positions = jd
else:
positions = jd.get("positions")
if positions:
for ip in positions:
ip["j"] = j
ip["j_err"] = e
dvc_dump(jd, p)
if add:
self.add(p, commit=False)
def update_flux(
self,
irradiation,
level,
pos,
identifier,
j,
e,
mj=0,
me=0,
mmwsd=0,
decay=None,
position_jerr=None,
analyses=None,
options=None,
add=True,
save_predicted=True,
jd=None,
):
self.info(
"Saving j for {}{}:{} {}, j={} +/-{}".format(
irradiation, level, pos, identifier, j, e
)
)
if options is None:
options = {}
if decay is None:
decay = {}
if analyses is None:
analyses = []
dump = False
if jd is None:
dump = True
p = self.get_level_path(irradiation, level)
jd = dvc_load(p)
if isinstance(jd, list):
positions = jd
z = 0
else:
positions = jd.get("positions", [])
z = jd.get("z", 0)
if not save_predicted:
j, e = jd.get("j", 0), jd.get("j_err", 0)
npos = {
"position": pos,
"j": j,
"j_err": e,
"mean_j": mj,
"mean_j_err": me,
"mean_j_mswd": mmwsd,
"position_jerr": position_jerr,
"decay_constants": decay,
"identifier": identifier,
"options": options,
"analyses": [
{
"uuid": ai.uuid,
"record_id": ai.record_id,
"is_omitted": ai.is_omitted(),
}
for ai in analyses
],
}
if positions:
added = any((ji["position"] == pos for ji in positions))
npositions = [ji if ji["position"] != pos else npos for ji in positions]
if not added:
npositions.append(npos)
else:
npositions = [npos]
obj = {"z": z, "positions": npositions}
if dump:
dvc_dump(obj, p)
else:
jd["z"] = z
jd["positions"] = npositions
if add:
self.add(p, commit=False)
def update_chronology(self, name, doses):
p = os.path.join(paths.meta_root, name, "chronology.txt")
dump_chronology(p, doses)
self.add(p, commit=False)
def get_irradiation_holder_names(self):
return glob_list_directory(
os.path.join(paths.meta_root, "irradiation_holders"),
extension=".txt",
remove_extension=True,
)
[docs] def get_cocktail_irradiation(self):
"""
example cocktail.json
{
"chronology": "2016-06-01 17:00:00",
"j": 4e-4,
"j_err": 4e-9
}
:return:
"""
p = os.path.join(paths.meta_root, "cocktail.json")
ret = dvc_load(p)
nret = {}
if ret:
lines = ["1.0, {}, {}".format(ret["chronology"], ret["chronology"])]
c = Chronology.from_lines(lines)
nret["chronology"] = c
nret["flux"] = ufloat(ret["j"], ret["j_err"])
return nret
def get_default_productions(self):
p = os.path.join(paths.meta_root, "reactors.json")
if not os.path.isfile(p):
with open(p, "w") as wfile:
from pychron.file_defaults import REACTORS_DEFAULT
wfile.write(REACTORS_DEFAULT)
return dvc_load(p)
def get_flux_history(self, irradiation, level, **kw):
greps = ["fit flux for {}{}".format(irradiation, level)]
cs = self.get_commits_from_log(greps, **kw)
return cs
def get_flux_positions(self, irradiation, level):
positions = self._get_level_positions(irradiation, level)
return positions
def get_level_obj(self, irradiation, level):
p = self.get_level_path(irradiation, level)
return dvc_load(p), p
def get_flux(self, irradiation, level, position):
positions = self.get_flux_positions(irradiation, level)
return self.get_flux_from_positions(position, positions)
def get_flux_from_positions(self, position, positions):
j, je, pe, lambda_k = 0, 0, 0, None
monitor_name, monitor_material, monitor_age = (
DEFAULT_MONITOR_NAME,
"sanidine",
ufloat(28.201, 0),
)
if positions:
pos = next((p for p in positions if p["position"] == position), None)
if pos:
j, je, pe = (
pos.get("j", 0),
pos.get("j_err", 0),
pos.get("position_jerr", 0),
)
dc = pos.get("decay_constants")
if dc:
# this was a temporary fix and likely can be removed
if isinstance(dc, float):
v, e = dc, 0
else:
v, e = dc.get("lambda_k_total", 0), dc.get(
"lambda_k_total_error", 0
)
lambda_k = ufloat(v, e)
mon = pos.get("monitor")
if mon:
monitor_name = mon.get("name", DEFAULT_MONITOR_NAME)
sa = mon.get("age", 28.201)
se = mon.get("error", 0)
monitor_age = ufloat(sa, se, tag="monitor_age")
monitor_material = mon.get("material", "sanidine")
fd = {
"j": ufloat(j, je, tag="J"),
"position_jerr": pe,
"lambda_k": lambda_k,
"monitor_name": monitor_name,
"monitor_material": monitor_material,
"monitor_age": monitor_age,
}
return fd
def get_gains(self, name):
g = self.get_gain_obj(name)
return g.gains
def save_sensitivities(self, sens):
ps = []
for k, v in sens.items():
root = os.path.join(paths.meta_root, "spectrometers")
p = os.path.join(root, add_extension("{}.sens".format(k), ".json"))
dvc_dump(v, p)
ps.append(p)
if self.add_paths(ps):
self.commit("Updated sensitivity")
def get_sensitivities(self):
specs = {}
root = os.path.join(paths.meta_root, "spectrometers")
for p in list_directory(root):
if p.endswith(".sens.json"):
name = p.split(".")[0]
p = os.path.join(root, p)
obj = dvc_load(p)
for r in obj:
if r["create_date"]:
r["create_date"] = datetime.strptime(
r["create_date"], DATE_FORMAT
)
specs[name] = obj
return specs
def get_sensitivity(self, name):
sens = self.get_sensitivities()
spec = sens.get(name)
v = 1
if spec:
# get most recent sensitivity
record = spec[-1]
v = record.get("sensitivity", 1)
return v
@cached("clear_cache")
def get_gain_obj(self, name, **kw):
p = gain_path(name)
return Gains(p)
# @cached('clear_cache')
def get_production(self, irrad, level, allow_null=False, **kw):
path = os.path.join(paths.meta_root, irrad, "productions.json")
obj = dvc_load(path)
pname = obj.get(level, "")
p = os.path.join(
paths.meta_root, irrad, "productions", add_extension(pname, ext=".json")
)
ip = Production(p, allow_null=allow_null)
# print 'new production id={}, name={}, irrad={}, level={}'.format(id(ip), pname, irrad, level)
return pname, ip
# @cached('clear_cache')
def get_chronology(self, name, allow_null=False, **kw):
chron = None
try:
chron = irradiation_chronology(name, allow_null=allow_null)
if self.application:
chron.use_irradiation_endtime = self.application.get_boolean_preference(
"pychron.arar.constants.use_irradiation_endtime", False
)
except MetaObjectException:
if name != "NoIrradiation" and not name.startswith("Package"):
self.warning(
'Could not locate the irradiation chronology "{}"'.format(name)
)
return chron
@cached("clear_cache")
def get_irradiation_holder_holes(self, name, **kw):
return irradiation_geometry_holes(name)
@cached("clear_cache")
def get_load_holder_holes(self, name, **kw):
p = os.path.join(paths.meta_root, "load_holders", add_extension(name))
holder = LoadGeometry(p)
return holder.holes
@property
def sensitivity_path(self):
return os.path.join(paths.meta_root, "sensitivity.json")
# private
def _get_level_positions(self, irrad, level):
obj, p = self.get_level_obj(irrad, level)
if isinstance(obj, list):
positions = obj
else:
positions = obj.get("positions", [])
return positions
def _update_text(self, tag, name, path_or_blob):
if not name:
self.debug(
"cannot update text with no name. tag={} name={}".format(tag, name)
)
return
root = os.path.join(paths.meta_root, tag)
if not os.path.isdir(root):
r_mkdir(root)
p = os.path.join(root, name)
if os.path.isfile(path_or_blob):
shutil.copyfile(path_or_blob, p)
else:
with open(p, "w") as wfile:
wfile.write(path_or_blob)
self.add(p, commit=False)
# ============= EOF =============================================