Source code for pychron.pipeline.nodes.export

# ===============================================================================
# Copyright 2019 ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import csv
import os

from traits.api import HasTraits, Str, Bool, List, Button, Enum
from traitsui.api import VGroup, HGroup, UItem, Item, CheckListEditor, TableEditor
from traitsui.extras.checkbox_column import CheckboxColumn
from traitsui.table_column import ObjectColumn
from uncertainties import nominal_value, std_dev

from pychron.core.confirmation import confirmation_dialog
from pychron.core.helpers.filetools import add_extension, view_file
from pychron.core.helpers.isotope_utils import sort_isotopes
from pychron.core.pychron_traits import BorderHGroup
from pychron.core.ui.strings import SpacelessStr
from pychron.paths import paths
from pychron.pipeline.nodes.base import BaseNode
from pychron.pipeline.state import get_isotope_set
from pychron.pychron_constants import EXTRACTION_ATTRS, META_ATTRS


class CSVExportNode(BaseNode):
    delimiter = Enum(',', '\t', ':', ';')
    available_isotopes = List
    pathname = SpacelessStr

    def run(self, state):
        p = os.path.join(paths.csv_data_dir, add_extension(self.pathname, '.csv'))

        with open(p, 'w') as wfile:
            writer = csv.writer(wfile, delimiter=self.delimiter)
            for ans in (state.unknowns, state.references):
                if ans:
                    self._run_hook(writer, ans)

            if confirmation_dialog('File saved to {}\n\nWould you like to open?'.format(p)):
                view_file(p, application='Excel')

    def _run_hook(self, writer, ans):
        raise NotImplementedError

    def _set_available_isotopes(self):
        if self.unknowns or self.references:
            uisokeys = get_isotope_set(self.unknowns)
            risokeys = get_isotope_set(self.references)
            isokeys = list(uisokeys.union(risokeys))
            self.available_isotopes = [self._isotope_klass(name=i) for i in sort_isotopes(isokeys)]


class RawIsot(HasTraits):
    name = Str
    time = Bool(True)
    intensity = Bool(True)

    @property
    def header(self):
        h = []
        if self.time:
            h.append('{}Time'.format(self.name).upper())
        if self.intensity:
            h.append('{}Intensity'.format(self.name).upper())
        return h


class CSVRawDataExportNode(CSVExportNode):
    name = 'Save CSV Raw Data'
    _isotope_klass = RawIsot

    def traits_view(self):
        pgrp = BorderHGroup(Item('pathname', springy=True, label='File Name'),
                            Item('delimiter'))

        cols = [ObjectColumn(name='name'),
                CheckboxColumn(name='time'),
                CheckboxColumn(name='intensity')]

        igrp = VGroup(UItem('available_isotopes', editor=TableEditor(columns=cols, sortable=False)))
        return self._view_factory(VGroup(pgrp, igrp))

    def _configure_hook(self):
        self._set_available_isotopes()

    def _run_hook(self, writer, analyses):
        def writedata(d):
            for i, row in enumerate(d):
                row = [str(ri) for ri in row]
                writer.writerow([i + 1, ] + row)
            writer.writerow([])

        for a in analyses:
            a.load_raw_data()
            writer.writerow(['RUNID', 'UUID', 'PROJECT', 'SAMPLE', 'REPOSITORY'])
            writer.writerow([a.record_id, a.uuid, a.project, a.sample, a.repository_identifier])
            header = self._get_header(a)
            for tag in ('equilibration', 'signal', 'baseline'):
                data = self._gather_data(a, tag)
                writer.writerow([tag.upper()])
                writer.writerow(header)
                writedata(data)
            writer.writerow([])

    def _get_header(self, analysis):
        header = []
        for k in analysis.isotope_keys:
            iso = self._get_isotope(k)
            if iso and iso.header:
                header.extend(iso.header)

        # header = ['{}{}'.format(key, tag) for key in analysis.isotope_keys for tag in ('Time', 'Intensity')]
        header.insert(0, 'COUNTER')
        return header

    def _gather_data(self, a, tag):
        data = []
        for i in a.isotope_keys:
            kiso = self._get_isotope(i)
            if kiso:
                iso = a.get_isotope(i)
                if tag == 'equilibration':
                    iso = iso.sniff
                elif tag == 'baseline':
                    iso = iso.baseline

                if kiso.time:
                    xs = iso.offset_xs
                    data.append(xs)
                if kiso.intensity:
                    ys = iso.ys
                    data.append(ys)

        return zip(*data)

    def _get_isotope(self, k):
        return next((i for i in self.available_isotopes if i.name == k), None)


class Isot(HasTraits):
    name = Str
    intercept_enabled = Bool(True)
    baseline_enabled = Bool(True)
    blank_enabled = Bool(True)
    bs_corrected_enabled = Bool(True)
    bl_corrected_enabled = Bool(True)
    ic_corrected_enabled = Bool(True)
    ic_decay_corrected_enabled = Bool(True)
    ifc_enabled = Bool(True)
    detector_enabled = Bool(True)

    def values(self):
        return (('{}_{}'.format(self.name, tag), getattr(self, '{}_enabled'.format(tag)))
                for tag in ('detector', 'intercept', 'blank', 'baseline', 'bs_corrected',
                            'bl_corrected', 'ic_corrected', 'ic_decay_corrected', 'ifc'))


[docs]class CSVAnalysesExportNode(CSVExportNode): name = 'Save CSV Analyses' available_meta_attributes = List selected_meta_attributes = List available_ratios = List select_all_meta = Button('Select All') unselect_all_meta = Button('Unselect All') _isotope_klass = Isot def traits_view(self): cols = [ObjectColumn(name='name', editable=False), CheckboxColumn(name='detector_enabled', label='Detector'), CheckboxColumn(name='intercept_enabled', label='Intercept'), CheckboxColumn(name='baseline_enabled', label='Baseline'), CheckboxColumn(name='blank_enabled', label='Blank'), CheckboxColumn(name='bs_corrected_enabled', label='Baseline Corrected'), CheckboxColumn(name='bl_corrected_enabled', label='Blank Corrected'), CheckboxColumn(name='ic_corrected_enabled', label='IC Corrected'), CheckboxColumn(name='ic_decay_corrected_enabled', label='IC+Decay Corrected'), CheckboxColumn(name='ifc_enabled', label='Interference Corrected')] pgrp = HGroup(Item('pathname', springy=True, label='File Name'), show_border=True) mgrp = VGroup(HGroup(UItem('select_all_meta'), UItem('unselect_all_meta')), UItem('selected_meta_attributes', style='custom', editor=CheckListEditor(cols=4, name='available_meta_attributes'), width=200), show_border=True) igrp = VGroup(UItem('available_isotopes', editor=TableEditor(columns=cols, sortable=False)), show_border=True) return self._view_factory(VGroup(pgrp, mgrp, igrp)) def _run_hook(self, writer, analyses): header = self._get_header() writer.writerow(header) for ai in analyses: row = self._get_row(header, ai) writer.writerow(row) def _configure_hook(self): self._set_available_isotopes() temps = ('lab_temperature', 'east_diffuser_temperature', 'east_return_temperature', 'outside_temperature') self.available_meta_attributes = list(('rundate', 'timestamp') + META_ATTRS + EXTRACTION_ATTRS + temps) self._select_all_meta_fired() def _unselect_all_meta_fired(self): self.selected_meta_attributes = [] def _select_all_meta_fired(self): self.selected_meta_attributes = self.available_meta_attributes def _get_header(self): header = self.selected_meta_attributes[:] vargs = [], [], [], [], [], [], [], [], [] for i in self.available_isotopes: for vs, (name, enabled) in zip(vargs, i.values()): if enabled: vs.append(name) if not name.endswith('detector'): vs.append('error') for va in vargs: header.extend(va) return header def _get_row(self, header, ai): def get_intercept(iso): return iso.uvalue def get_baseline_corrected(iso): return iso.get_baseline_corrected_value() def get_blank(iso): return iso.blank.uvalue def get_baseline(iso): return iso.baseline.uvalue def get_blank_corrected(iso): return iso.get_non_detector_corrected_value() def get_ic_corrected(iso): return iso.get_ic_corrected_value() def get_ic_decay_corrected(iso): return iso.get_ic_decay_corrected_value() def get_ifc(iso): return iso.get_interference_corrected_value() row = [] for attr in header: if attr == 'error': continue for tag, func in (('intercept', get_intercept), ('blank', get_blank), ('baseline', get_baseline), ('bs_corrected', get_baseline_corrected), ('bl_corrected', get_blank_corrected), ('ic_corrected', get_ic_corrected), ('ic_decay_corrected', get_ic_decay_corrected), ('ifc', get_ifc)): if attr.endswith(tag): # iso = attr[:len(tag) + 1] args = attr.split('_') iso = ai.get_isotope(args[0]) vs = ('', '') if iso is not None: v = func(iso) vs = nominal_value(v), std_dev(v) row.extend(vs) break else: if attr.endswith('detector'): args = attr.split('_') iso = ai.get_isotope(args[0]) det = '' if iso is not None: det = iso.detector row.append(det) else: try: # if attr.endswith('err'): # v = std_dev(ai.get_value(attr[-3:])) # else: v = nominal_value(ai.get_value(attr)) except BaseException: v = '' row.append(v) return row
# ============= EOF =============================================