# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
from pyface.confirmation_dialog import confirm
from pyface.constant import YES
from traits.api import Float, Str, List, Property, cached_property, Button, Bool, Event
from traitsui.api import Item, EnumEditor, UItem, VGroup, HGroup
from pychron.core.helpers.iterfuncs import partition, groupby_group_id
from pychron.core.pychron_traits import BorderHGroup, BorderVGroup
from pychron.core.ui.check_list_editor import CheckListEditor
from pychron.pipeline.editors.flux_results_editor import FluxPosition
from pychron.pipeline.graphical_filter import GraphicalFilterModel, GraphicalFilterView
from pychron.pipeline.nodes.data import DVCNode
from pychron.pychron_constants import DEFAULT_MONITOR_NAME, NULL_STR, REFERENCE_ANALYSIS_TYPES, BLANKS
def compress_groups(ans):
if ans:
for i, (gid, analyses) in enumerate(groupby_group_id(ans)):
for ai in analyses:
ai.group_id = i
class FindNode(DVCNode):
pass
class BaseFindFluxNode(FindNode):
irradiation = Str
irradiations = Property
samples = Property(depends_on='irradiation, level')
levels = Property(depends_on='irradiation, dirty')
level = Str
monitor_sample_name = Str(DEFAULT_MONITOR_NAME)
dirty = Event
def load(self, nodedict):
self.irradiation = nodedict.get('irradiation', '')
self._load_hook(nodedict)
def _load_hook(self, nodedict):
pass
def _to_template(self, d):
d['irradiation'] = self.irradiation
def _irradiation_changed(self):
try:
self.level = self.levels[0]
except IndexError:
pass
@cached_property
def _get_samples(self):
if self.irradiation and self.level:
return self.dvc.distinct_sample_names(self.irradiation, self.level)
else:
return []
@cached_property
def _get_levels(self):
if self.irradiation and self.dvc:
irrad = self.dvc.get_irradiation(self.irradiation)
if irrad:
return sorted([l.name for l in irrad.levels])
else:
return []
else:
return []
@cached_property
def _get_irradiations(self):
if self.dvc:
irrads = self.dvc.get_irradiations()
return [i.name for i in irrads]
else:
return []
def _fp_factory(self, geom, irradiation, level, identifier, sample, hole_id, fluxes):
pp = next((p for p in fluxes if p['identifier'] == identifier))
j, j_err, mean_j, mean_j_err, model_kind = 0, 0, 0, 0, ''
if pp:
j = pp.get('j', 0)
j_err = pp.get('j_err', 0)
mean_j = pp.get('mean_j', 0)
mean_j_err = pp.get('mean_j_err', 0)
options = pp.get('options')
if options:
model_kind = options.get('model_kind', '')
x, y, r, idx = geom[hole_id - 1]
fp = FluxPosition(identifier=identifier,
irradiation=irradiation,
level=level,
sample=sample, hole_id=hole_id,
saved_j=j or 0,
saved_jerr=j_err or 0,
mean_j=mean_j or 0,
mean_jerr=mean_j_err or 0,
model_kind=model_kind,
x=x, y=y)
return fp
[docs]class FindVerticalFluxNode(BaseFindFluxNode):
select_all_button = Button('Select All')
selected_levels = List
def run(self, state):
state.levels = self.selected_levels
state.irradiation = self.irradiation
def _select_all_button_fired(self):
self.selected_levels = self.levels
def traits_view(self):
v = self._view_factory(Item('irradiation', editor=EnumEditor(name='irradiations')),
UItem('select_all_button'),
UItem('selected_levels',
style='custom',
editor=CheckListEditor(name='levels')),
width=300,
title='Select Irradiation and Level',
resizable=True)
return v
class FindRepositoryAnalysesNode(FindNode):
repositories = List
def run(self, state):
dvc = self.dvc
rs = []
for ri in self.repositories:
ans = dvc.get_repoository_analyses(ri)
rs.extend(ans)
unks, refs = partition(rs, predicate=lambda x: x.analysis_type == 'unknown')
state.unknowns = unks
state.references = refs
self.unknowns = unks
self.references = refs
class FindFluxMonitorMeansNode(BaseFindFluxNode):
name = 'Find Flux Monitors Means'
def _load_hook(self, nodedict):
self.level = nodedict.get('level', '')
self.irradiation = nodedict.get('irradiation', '')
def run(self, state):
if not self.irradiation or not self.level:
self.configure()
if not self.irradiation or not self.level:
state.veto = self
else:
dvc = self.dvc
args = dvc.get_irradiation_geometry(self.irradiation, self.level)
if args:
geom, holder = args
state.geometry = geom
state.holder = holder
ips = dvc.get_flux_monitors(self.irradiation, self.level, self.monitor_sample_name)
fluxes = dvc.get_flux_positions(self.irradiation, self.level)
state.monitor_positions = [self._fp_factory(state.geometry, self.irradiation, self.level,
ip.identifier, ip.sample.name, ip.position, fluxes)
for ip in ips if ip.identifier]
def traits_view(self):
v = self._view_factory(Item('irradiation', editor=EnumEditor(name='irradiations')),
Item('level', editor=EnumEditor(name='levels')),
Item('monitor_sample_name', editor=EnumEditor(name='samples')),
width=300,
title='Select Irradiation and Level')
return v
[docs]class FindFluxMonitorsNode(BaseFindFluxNode):
name = 'Find Flux Monitors'
use_browser = Bool(False)
def run(self, state):
if not self.irradiation or not self.level:
self.configure()
if not self.irradiation or not self.level:
state.veto = self
else:
dvc = self.dvc
args = dvc.get_irradiation_geometry(self.irradiation, self.level)
if args:
geom, holder = args
state.geometry = geom
state.holder = holder
ips = dvc.get_unknown_positions(self.irradiation, self.level, self.monitor_sample_name)
fluxes = dvc.get_flux_positions(self.irradiation, self.level)
state.unknown_positions = [self._fp_factory(state.geometry, self.irradiation, self.level,
ip.identifier, ip.sample.name, ip.position, fluxes)
for ip in ips if ip.identifier]
if self.use_browser:
is_append, monitors = self.get_browser_analyses(irradiation=self.irradiation,
level=self.level)
else:
monitors = self.dvc.find_flux_monitors(self.irradiation, self.level, self.monitor_sample_name)
state.unknowns = monitors
state.irradiation = self.irradiation
state.level = self.level
def _load_hook(self, nodedict):
self.level = nodedict.get('level', '')
if self.level and self.level not in self.levels:
self.dirty = True
def _to_template(self, d):
super(FindFluxMonitorsNode, self)._to_template(d)
d['level'] = self.level
def traits_view(self):
v = self._view_factory(Item('irradiation', editor=EnumEditor(name='irradiations')),
Item('level', editor=EnumEditor(name='levels')),
Item('monitor_sample_name', editor=EnumEditor(name='samples')),
Item('use_browser'),
width=300,
title='Select Irradiation and Level')
return v
[docs]class FindReferencesNode(FindNode):
user_choice = False
threshold = Float
load_name = Str
display_loads = Property(depends_on='limit_to_analysis_loads')
loads = List
analysis_loads = List
limit_to_analysis_loads = Bool(True)
threshold_enabled = Property
analysis_types = List
available_analysis_types = List
extract_device = Str
enable_extract_device = Bool
extract_devices = List
mass_spectrometer = Str
enable_mass_spectrometer = Bool
mass_spectrometers = List
use_graphical_filter = Bool
use_extract_device = Bool
use_mass_spectrometer = Bool
def reset(self):
self.user_choice = None
super(FindReferencesNode, self).reset()
def load(self, nodedict):
self.threshold = nodedict.get('threshold', 10)
self.analysis_types = nodedict.get('analysis_types', [])
self.name = nodedict.get('name', 'Find References')
self.limit_to_analysis_loads = nodedict.get('limit_to_analysis_loads', True)
self.use_graphical_filter = nodedict.get('use_graphical_filter', True)
self.use_extract_device = nodedict.get('use_extract_device', True)
self.use_mass_spectrometer = nodedict.get('use_mass_spectrometer', True)
def finish_load(self):
self.extract_devices = self.dvc.get_extraction_device_names()
self.mass_spectrometers = self.dvc.get_mass_spectrometer_names()
names = [NULL_STR]
dbnames = self.dvc.get_load_names()
if dbnames:
names += dbnames
self.loads = names
def _to_template(self, d):
d = dict()
for key in ('threshold', 'analysis_types', 'limit_to_analysis_loads', 'use_graphical_filter'):
d[key] = getattr(self, key)
def pre_run(self, state, configure=True):
if not state.unknowns:
return
eds = {ai.extract_device for ai in state.unknowns}
self.enable_extract_device = len(eds) > 1
self.extract_device = list(eds)[0]
ms = {ai.mass_spectrometer for ai in state.unknowns}
self.enable_mass_spectrometer = len(ms) > 1
self.mass_spectrometer = list(ms)[0]
ls = {ai.load_name for ai in state.unknowns}
self.analysis_loads = [NULL_STR] + list(ls)
return super(FindReferencesNode, self).pre_run(state, configure=configure)
def run(self, state):
for gid, ans in groupby_group_id(state.unknowns):
if self._run_group(state, gid, list(ans)):
return
compress_groups(state.unknowns)
compress_groups(state.references)
def _run_group(self, state, gid, unknowns):
atypes = [ai.lower().replace(' ', '_') for ai in self.analysis_types]
kw = dict(extract_devices=self.extract_device if self.use_extract_device else '',
mass_spectrometers=self.mass_spectrometer if self.use_mass_spectrometer else '',
make_records=False)
while 1:
if self.load_name and self.load_name != NULL_STR:
refs = self.dvc.find_references_by_load(self.load_name, atypes, **kw)
if refs:
times = sorted([ai.rundate for ai in refs])
else:
times = sorted([ai.rundate for ai in unknowns])
refs = self.dvc.find_references(times, atypes, hours=self.threshold, **kw)
if not refs:
if confirm(None, 'No References Found. Would you like to try different search criteria?') == YES:
if self.configure():
continue
else:
state.canceled = True
return True
else:
if not confirm(None, 'Would you like to search manually?') == YES:
state.canceled = True
return True
else:
break
if refs:
if self.use_graphical_filter:
ed = self.extract_device if self.use_extract_device else ''
ms = self.mass_spectrometer if self.use_mass_spectrometer else ''
unknowns.extend(refs)
model = GraphicalFilterModel(analyses=unknowns,
dvc=self.dvc,
extract_device=ed,
mass_spectrometer=ms,
low_post=times[0],
high_post=times[-1],
threshold=self.threshold,
gid=gid)
model.setup()
model.analysis_types = self.analysis_types
obj = GraphicalFilterView(model=model)
info = obj.edit_traits(kind='livemodal')
if info.result:
refs = model.get_filtered_selection()
else:
refs = None
state.veto = self
if refs:
refs = self.dvc.make_analyses(refs)
state.references = list(refs)
return True
def traits_view(self):
load_grp = BorderHGroup(UItem('load_name', editor=EnumEditor(name='display_loads')),
Item('limit_to_analysis_loads',
tooltip='Limit Loads based on the selected analyses',
label='Limit Loads by Analyses'),
label='Load')
inst_grp = BorderVGroup(HGroup(UItem('use_extract_device'),
Item('extract_device',
enabled_when='enable_extract_device',
editor=EnumEditor(name='extract_devices'),
label='Extract Device')),
HGroup(UItem('use_mass_spectrometer'),
Item('mass_spectrometer',
label='Mass Spectrometer',
enabled_when='enable_mass_spectrometer',
editor=EnumEditor(name='mass_spectrometers'))),
label='Instruments')
filter_grp = BorderVGroup(Item('threshold',
tooltip='Maximum difference between references and unknowns in hours',
enabled_when='threshold_enabled',
label='Threshold (Hrs)'),
Item('use_graphical_filter', label='Graphical Selection'),
VGroup(UItem('analysis_types',
style='custom',
editor=CheckListEditor(name='available_analysis_types', cols=2)),
show_border=True, label='Analysis Types'),
label='Filtering')
v = self._view_factory(VGroup(load_grp,
filter_grp,
inst_grp))
return v
def _available_analysis_types_default(self):
return [(b, b) for b in REFERENCE_ANALYSIS_TYPES]
def _get_display_loads(self):
if self.limit_to_analysis_loads:
return self.analysis_loads
else:
return self.loads
def _get_threshold_enabled(self):
return not self.load_name or self.load_name == NULL_STR
class FindBlanksNode(FindReferencesNode):
def _available_analysis_types_default(self):
return [(b, b) for b in BLANKS]
# ============= EOF =============================================