Source code for pychron.hardware.core.scanable_device

# ===============================================================================
# Copyright 2012 Jake Ross
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
from traits.api import Event, Property, Any, Bool, Float, Str, Instance, List
from traitsui.api import HGroup, VGroup, Item, spring, ButtonEditor

# ============= standard library imports ========================
import os
import time
from threading import Lock

# ============= local library imports  ==========================
from pychron.core.helpers.datetime_tools import generate_datetimestamp
from pychron.database.data_warehouse import DataWarehouse
from pychron.graph.plot_record import PlotRecord
from pychron.hardware.core.alarm import Alarm
from pychron.hardware.core.viewable_device import ViewableDevice
from pychron.managers.data_managers.csv_data_manager import CSVDataManager
from pychron.paths import paths

[docs]class ScanableDevice(ViewableDevice): scan_button = Event scan_label = Property(depends_on="_scanning") alarms = List(Alarm) is_scanable = Bool(False) scan_func = Any scan_lock = None timer = None scan_period = Float(1000, enter_set=True, auto_set=False) scan_width = Float(5, enter_set=True, auto_set=False) scan_units = "ms" record_scan_data = Bool(False) graph_scan_data = Bool(False) scan_path = Str auto_start = Bool(False) scan_root = Str scan_name = Str graph = Instance("pychron.graph.graph.Graph") graph_ytitle = Str graph_klass = None data_manager = Instance(CSVDataManager) time_dict = dict(ms=1, s=1000, m=60000, h=3600000) _scanning = Bool(False) _auto_started = False def is_scanning(self): return self._scanning def _scan_path_changed(self): self.scan_root = os.path.split(self.scan_path)[0] self.scan_name = os.path.basename(self.scan_path) # =============================================================================== # streamin interface # =============================================================================== def setup_scan(self): self.debug("setup scan") # should get scan settings from the config file not the initialization.xml config = self.get_configuration() if config.has_section("Scan"): enabled = self.config_get( config, "Scan", "enabled", cast="boolean", optional=True, default=True ) self.is_scanable = enabled if enabled: self.set_attribute( config, "auto_start", "Scan", "auto_start", cast="boolean", default=False, ) self.set_attribute( config, "scan_period", "Scan", "period", cast="float" ) self.set_attribute(config, "scan_width", "Scan", "width", cast="float") self.set_attribute(config, "scan_units", "Scan", "units") self.set_attribute( config, "record_scan_data", "Scan", "record", cast="boolean" ) self.set_attribute( config, "graph_scan_data", "Scan", "graph", cast="boolean" ) func = self.config_get(config, "Scan", "function", optional=True) if func: self.scan_func = func def setup_alarms(self): self.debug("setup alarms") config = self.get_configuration() if config.has_section("Alarms"): for opt in config.options("Alarms"): self.alarms.append(Alarm(name=opt, alarm_str=config.get("Alarms", opt))) def _scan_hook(self, *args, **kw): pass def _scan_(self, *args): if self.scan_func: try: v = getattr(self, self.scan_func)() except AttributeError as e: print("exception", e) return if v is not None: # self.debug('current scan value={}'.format(v)) self.current_scan_value = str(v) # self.debug('current scan func={}, value ={}'.format(self.scan_func, v)) x = None if self.graph_scan_data: if isinstance(v, tuple): x = self.graph.record_multiple(v) elif isinstance(v, PlotRecord): for pi, d in zip(v.plotids, if isinstance(d, tuple): x = self.graph.record_multiple(d, plotid=pi) else: x = self.graph.record(d, plotid=pi) v = v.as_data_tuple() else: x = self.graph.record(v) v = (v,) if self.record_scan_data: self.debug("recording scan data") if x is None: x = time.time() ts = generate_datetimestamp() self.data_manager.write_to_frame( (ts, "{:<8s}".format("{:0.2f}".format(x))) + v ) self._scan_hook(v) else: """ scan func must return a value or we will stop the scan since the timer runs on the main thread any long comms timeouts slow user interaction """ if self._no_response_counter > 3: self.timer.Stop() "no response. stopping scan func={}".format(self.scan_func) ) self._scanning = False self._no_response_counter = 0 else: self._no_response_counter += 1 def scan(self, *args, **kw): if self.scan_lock is None: self.scan_lock = Lock() with self.scan_lock: self._scan_(*args, **kw)
[docs] def start_scan(self, period=None): """ :param period: delapy between triggers in milliseconds :return: """ if self.timer is not None: self.timer.Stop() self.timer.wait_for_completion() self._scanning = True"Starting scan") d = self.scan_width * 60 # * 1000/self.scan_period # print self.scan_width, self.scan_period if self.graph_scan_data:"Graph recording enabled") self.debug("scan width ={}".format(d)) self.graph.set_scan_widths(d) if self.record_scan_data:"Recording scan enabled") dm = self.data_manager dm.delimiter = "\t" dw = DataWarehouse(root=paths.device_scan_dir) dw.build_warehouse() dm.new_frame(, directory=dw.get_current_dir()) self.scan_path = dm.get_current_path() if period is None: period = self.scan_period * self.time_dict[self.scan_units] from pychron.core.helpers.timer import Timer self.timer = Timer(period, self.scan)"Scan started func={} period={}".format(self.scan_func, period))
def stop_scan(self):"Stoppiing scan") self._scanning = False if self.timer is not None: self.timer.Stop() if self.data_manager: self.data_manager.close_file() self._auto_started = False"Scan stopped") def _get_scan_label(self): return "Start" if not self._scanning else "Stop" def _scan_button_fired(self): self.debug("scan button fired. scanning {}".format(self._scanning)) if self._scanning: self.stop_scan() else: self.start_scan() def _scan_period_changed(self): if self._scanning: self.stop_scan() self.start_scan() def _data_manager_default(self): return CSVDataManager() def _graph_default(self): from pychron.graph.time_series_graph import TimeSeriesStreamGraph klass = self.graph_klass if not klass: klass = TimeSeriesStreamGraph g = klass() self.graph_builder(g) return g def graph_builder(self, g, **kw): g.new_plot(padding=[50, 5, 5, 35], zoom=True, pan=True, **kw) g.set_y_title(self.graph_ytitle) g.set_x_title("Time") g.new_series() def get_additional_tabs(self): g = VGroup( Item("graph", show_label=False, style="custom"), VGroup( Item("scan_func", label="Function", style="readonly"), HGroup( Item("scan_period", label="Period ({})".format(self.scan_units)), spring, ), Item("current_scan_value", style="readonly"), ), VGroup( HGroup( Item( "scan_button", editor=ButtonEditor(label_value="scan_label"), show_label=False, ), spring, ), Item( "scan_root", style="readonly", label="Scan directory", visible_when="object.record_scan_data", ), Item( "scan_name", label="Scan name", style="readonly", visible_when="object.record_scan_data", ), visible_when="object.is_scanable", ), label="Scan", ) return (g,)
# v = super(ScanableDevice, self).current_state_view() # v.content.content.content.append(g) # return v # ============= EOF =============================================