# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
import os
import time
from datetime import datetime, timedelta
from pyface.message_dialog import warning
from pyface.timer.do_later import do_after
from traits.api import Instance, Bool, Int, Str, List, Enum, Float, Time
from traitsui.api import Item, EnumEditor, CheckListEditor
from pychron.core.helpers.iterfuncs import groupby_idx
from pychron.core.helpers.traitsui_shortcuts import okcancel_view
from pychron.globals import globalv
from pychron.pipeline.csv_dataset_factory import CSVDataSetFactory
from pychron.pipeline.nodes.base import BaseNode
from pychron.pychron_constants import ANALYSIS_TYPES
class BaseDVCNode(BaseNode):
dvc = Instance('pychron.dvc.dvc.DVC')
class DVCNode(BaseDVCNode):
"""
Base node for all nodes that need access to a DVC instance or BrowserModel for
retrieving analyses
"""
browser_model = Instance('pychron.envisage.browser.browser_model.BrowserModel')
def get_browser_analyses(self, irradiation=None, level=None):
from pychron.envisage.browser.view import BrowserView
self.browser_model.activated()
# self.browser_model.do_filter()
if irradiation:
self.browser_model.irradiation_enabled = True
self.browser_model.irradiation = irradiation
if level:
self.browser_model.level = level
browser_view = BrowserView(model=self.browser_model)
info = browser_view.edit_traits(kind='livemodal')
records = None
if info.result:
self.browser_model.add_analysis_set()
self.browser_model.dump_browser()
records = self.browser_model.get_analysis_records()
if records:
records = self.dvc.make_analyses(records)
return browser_view.is_append, records
def set_browser_analyses(self):
is_append, analyses = self.get_browser_analyses()
if analyses:
if is_append:
ans = getattr(self, self.analysis_kind)
ans.extend(analyses)
else:
self.trait_set(**{self.analysis_kind: analyses})
return True
[docs]class InterpretedAgeNode(DVCNode):
name = 'Interpreted Ages'
interpreted_ages = List
def configure(self, pre_run=False, **kw):
if not pre_run:
self._manual_configured = True
from pychron.envisage.browser.view import InterpretedAgeBrowserView
self.browser_model.activated()
browser_view = InterpretedAgeBrowserView(model=self.browser_model)
info = browser_view.edit_traits(kind='livemodal')
if info.result:
self.browser_model.dump_browser()
records = self.browser_model.get_interpreted_age_records()
if records:
interpreted_ages = self.dvc.make_interpreted_ages(records)
ias = self.interpreted_ages
ias.extend(interpreted_ages)
# if browser_view.is_append:
# ias = self.interpreted_ages
# ias.extend(interpreted_ages)
# else:
# self.interpreted_ages = interpreted_ages
return True
def run(self, state):
state.interpreted_ages = self.interpreted_ages
state.unknowns = self.interpreted_ages
class DataNode(DVCNode):
name = 'Data'
analysis_kind = None
def configure(self, pre_run=False, **kw):
# print(self, pre_run, getattr(self, self.analysis_kind), self.index)
if pre_run and getattr(self, self.analysis_kind) and self.index == 0:
return True
if not pre_run:
self._manual_configured = True
return self.set_browser_analyses()
[docs]class CSVNode(BaseDVCNode):
path = Str
name = 'CSV Data'
def reset(self):
super(CSVNode, self).reset()
self.path = ''
def configure(self, pre_run=False, **kw):
if not pre_run:
self._manual_configured = True
if not self.path or not os.path.isfile(self.path):
dsf = CSVDataSetFactory(dvc=self.dvc)
dsf.load()
info = dsf.edit_traits()
if info.result:
if dsf.data_path:
self.path = dsf.data_path
# if confirm(None, 'Would you like to create a new CSV dataset?'):
# # open a table editor to enter all the information
# pass
# else:
# # select a file from DVC or native finder
# pass
# msg = '''CSV File Format
# Create/select a file with a column header as the first line.
# The following columns are required:
#
# runid, age, age_err
#
# Optional columns are:
#
# group, aliquot, sample
#
# e.x.
# runid, age, age_error
# Run1, 10, 0.24
# Run2, 11, 0.32
# Run3, 10, 0.40'''
# information(None, msg)
#
# dlg = FileDialog()
# if dlg.open() == OK:
# self.path = dlg.path
return bool(self.path)
def run(self, state):
if not self.unknowns:
if not self.configure():
state.canceled = True
return
unks = self._load_analyses()
if unks:
self.unknowns.extend(unks)
# add our analyses to the state
items = state.unknowns
items.extend(self.unknowns)
def _load_analyses(self):
from pychron.core.csv.csv_parser import CSVColumnParser
par = CSVColumnParser(delimiter=',')
par.load(self.path)
if par.check(('runid', 'age', 'age_err')):
return self._get_items_from_file(par)
else:
warning(None, 'Invalid file format. Minimum columns required are "runid", "age", "age_err"')
def _get_items_from_file(self, parser):
from pychron.processing.analyses.file_analysis import FileAnalysis
try:
ans = [(d.get('group', 0), FileAnalysis(age=float(d['age']),
age_err=float(d['age_err']),
record_id=d['runid'],
sample=d.get('sample', ''),
aliquot=int(d.get('aliquot', 0)))) for d in parser.values()]
items = []
for i, (gid, aa) in enumerate(groupby_idx(ans, 0)):
for _, ai in aa:
ai.group_id = i
items.append(ai)
return items
except (TypeError, ValueError) as e:
warning(None, 'Invalid values in the import file. Error="{}"'.format(e))
[docs]class UnknownNode(DataNode):
name = 'Unknowns'
analysis_kind = 'unknowns'
def set_last_n_analyses(self, n):
db = self.dvc.db
ans = db.get_last_n_analyses(n)
self.unknowns = self.dvc.make_analyses(ans)
def set_last_n_hours_analyses(self, n):
db = self.dvc.db
ans = db.get_last_nhours_analyses(n)
if ans:
self.unknowns = self.dvc.make_analyses(ans)
def pre_run(self, state, configure=True):
# force Unknown node to always configure
return super(UnknownNode, self).pre_run(state, configure=True)
def run(self, state):
# add our analyses to the state
items = getattr(state, self.analysis_kind)
items.extend(self.unknowns)
state.projects = {ai.project for ai in state.unknowns if hasattr(ai, 'project')}
[docs]class ReferenceNode(DataNode):
name = 'References'
analysis_kind = 'references'
def pre_run(self, state, configure=True):
self.unknowns = state.unknowns
refs = state.references
if refs:
self.references = refs
if not self.references:
if configure:
self.configure(pre_run=True)
return self.references
def run(self, state):
pass
[docs]class FluxMonitorsNode(DataNode):
name = 'Flux Monitors'
analysis_kind = 'unknowns'
auto_configure = False
def run(self, state):
items = getattr(state, self.analysis_kind)
self.unknowns = items
class BaseAutoUnknownNode(UnknownNode):
mode = Enum('Normal', 'Window')
hours = Int(12)
mass_spectrometer = Str
available_spectrometers = List
analysis_types = List(['Unknown'])
available_analysis_types = List(ANALYSIS_TYPES)
engine = None
single_shot = False
verbose = Bool
_cached_unknowns = None
_unks_ids = None
_updated = False
_alive = False
def finish_load(self):
self.available_spectrometers = self.dvc.get_mass_spectrometer_names()
if self.available_spectrometers:
self.mass_spectrometer = self.available_spectrometers[0]
self._finish_load_hook()
def configure(self, pre_run=False, *args, **kw):
if pre_run:
info = self.edit_traits()
return info.result
return BaseNode.configure(self, pre_run=pre_run, *args, **kw)
def traits_view(self):
v = okcancel_view(Item('mode', tooltip='Normal: get analyses between now and start of pipeline - hours\n'
'Window: get analyses between now and now - hours'),
Item('hours'),
Item('period', label='Update Period (s)',
tooltip='Default time (s) to delay between "check for new analyses"'),
Item('mass_spectrometer', label='Mass Spectrometer',
editor=EnumEditor(name='available_spectrometers')),
Item('analysis_types', style='custom',
editor=CheckListEditor(name='available_analysis_types',
cols=len(self.available_analysis_types))),
Item('post_analysis_delay', label='Post Analysis Found Delay',
tooltip='Time (min) to delay before next "check for new analyses"'),
Item('verbose'))
return v
def post_run(self, engine, state):
if not self._alive:
self.engine = engine
if not self.single_shot:
self._start_listening()
self._post_run_hook(engine, state)
def reset(self):
super(BaseAutoUnknownNode, self).reset()
self._stop_listening()
def _post_run_hook(self, engine, state):
pass
def _finish_load_hook(self):
if globalv.auto_pipeline_debug:
self.mass_spectrometer = 'jan'
self.period = 15
self.hours = 48
def _start_listening(self):
self._alive = True
self._updated = False
self._iter()
def _stop_listening(self):
self._alive = False
def _iter(self):
raise NotImplementedError
def _load_analyses(self):
td = timedelta(hours=self.hours)
high = datetime.now()
updated = False
if self.mode == 'Normal':
low = self._low - td
else:
low = high - td
with self.dvc.session_ctx(use_parent_session=False):
ats = [a.lower().replace(' ', '_') for a in self.analysis_types]
records = self.dvc.get_analyses_by_date_range(low, high,
analysis_types=ats,
mass_spectrometers=self.mass_spectrometer,
verbose=self.verbose)
if not self._cached_unknowns:
updated = True
ans = self.dvc.make_analyses(records)
else:
ans = []
ais = []
for ri in records:
ca = next((ci for ci in self._cached_unknowns if ci.record_id == ri.record_id), None)
if ca is not None:
ans.append(ca)
else:
ais.append(ri)
if ais:
updated = True
# the database may have updated but the repository not yet updated.
# sleeping X seconds is a potential work around but a little dumb.
# better solution is to save to database after repository is updated
try:
ans.extend(self.dvc.make_analyses(ais))
except BaseException:
time.sleep(10)
try:
ans.extend(self.dvc.make_analyses(ais))
except BaseException:
pass
self._cached_unknowns = ans
return ans, updated
[docs]class ListenUnknownNode(BaseAutoUnknownNode):
name = 'Unknowns (Auto)'
exclude_uuids = List
period = Int(15)
post_analysis_delay = Float(5)
max_period = 10
_between_updates = None
pipeline = None
state = None
_low = None
def clear_data(self):
super(ListenUnknownNode, self).clear_data()
self.pipeline = None
self.state = None
self.skip_configure = False
def reset(self):
super(ListenUnknownNode, self).reset()
self.pipeline = None
self.state = None
self.skip_configure = False
def _start_listening(self):
self._alive = True
self._updated = False
self._iter()
self._status_loop()
def _status_loop(self):
self.active = not self.active
self.visited = not self.active
self.engine.refresh_all_needed = True
do_after(1000, self._status_loop)
def _post_run_hook(self, engine, state):
self.pipeline = engine.pipeline
engine.pipeline.active = True
def traits_view(self):
v = okcancel_view(Item('mode', tooltip='Normal: get analyses between now and start of pipeline - hours\n'
'Window: get analyses between now and now - hours'),
Item('hours'),
Item('period', label='Update Period (s)',
tooltip='Default time (s) to delay between "check for new analyses"'),
Item('mass_spectrometer', label='Mass Spectrometer',
editor=EnumEditor(name='available_spectrometers')),
Item('analysis_types', style='custom',
editor=CheckListEditor(name='available_analysis_types',
cols=len(self.available_analysis_types))),
Item('post_analysis_delay', label='Post Analysis Found Delay',
tooltip='Time (min) to delay before next "check for new analyses"'),
Item('verbose'),
title='Configure',
)
return v
def run(self, state):
if not self._alive:
self._low = datetime.now()
unks, updated = self._load_analyses()
state.unknowns = unks
self.unknowns = unks
self.state = state
self.skip_configure = True
def _finish_load_hook(self):
if globalv.auto_pipeline_debug:
self.mass_spectrometer = 'jan'
self.period = 15
self.hours = 48
def _iter(self, last_update=None):
if not self._alive:
return
unks, updated = self._load_analyses()
if not self._alive:
return
st = None
if updated:
self.state.unknowns = unks
self.unknowns = unks
self.engine.run(post_run=False, pipeline=self.pipeline, state=self.state, configure=False)
self.engine.post_run_refresh(state=self.state)
self.engine.refresh_figure_editors()
self.engine.selected = self.pipeline.nodes[-1]
if not self._alive:
return
st = time.time()
if last_update:
if self._between_updates:
self._between_updates = ((st - last_update) + self._between_updates) / 2.
else:
self._between_updates = st - last_update
period = self._between_updates * 0.75
else:
period = 60 * self.post_analysis_delay
else:
period = self.period
do_after(int(period * 1000), self._iter, st)
class CalendarUnknownNode(BaseAutoUnknownNode):
name = 'Unknowns (Calendar)'
run_time = Time
_ran = False
def run(self, state):
self._low = datetime.now()
super(CalendarUnknownNode, self).run(state)
def _run_time_default(self):
return (datetime.now() + timedelta(minutes=2)).time()
def _post_run_hook(self, engine, state):
self._flash_iter(0)
def _flash_iter(self, cnt):
if not self._alive:
return
self.visited = bool(cnt % 2)
self.engine.update_needed = True
if cnt > 100:
cnt = 0
do_after(1000, self._flash_iter, cnt + 1)
def _iter(self):
if not self._alive:
return
now = datetime.now()
print('now={} run_time={}. hourmatch={}, minutematch={} ran={}'.format(now, self.run_time,
now.hour >= self.run_time.hour,
now.minute >= self.run_time.minute,
self._ran))
if now.hour >= self.run_time.hour:
if now.minute >= self.run_time.minute:
if not self._ran:
self._ran = True
unks, updated = self._load_analyses()
if not self._alive:
return
print('updated={} loaded unks={}'.format(updated, unks))
if unks:
self.engine.rerun_with(unks, post_run=False)
else:
self._ran = False
period = 60 * 10
do_after(1000 * period, self._iter)
def traits_view(self):
v = okcancel_view(Item('run_time'),
Item('hours'),
# Item('period', label='Update Period (s)',
# tooltip='Defauly time (s) to delay between "check for new analyses"'),
Item('mass_spectrometer', label='Mass Spectrometer',
editor=EnumEditor(name='available_spectrometers')),
Item('analysis_types', style='custom',
editor=CheckListEditor(name='available_analysis_types',
cols=len(self.available_analysis_types))),
# Item('post_analysis_delay', label='Post Analysis Found Delay',
# tooltip='Time (min) to delay before next "check for new analyses"'),
Item('verbose'))
return v
# ============= EOF =============================================