Source code for pychron.pyscripts.extraction_line_pyscript

# ===============================================================================
# Copyright 2011 Jake Ross
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import inspect
import re
import time
from threading import Event
from threading import Thread

from traits.api import List, Dict

from pychron.core.ramper import Ramper
from pychron.external_pipette.protocol import IPipetteManager
from pychron.furnace.ifurnace_manager import IFurnaceManager
from pychron.hardware.core.exceptions import TimeoutError
from pychron.hardware.core.i_core_device import ICoreDevice
from pychron.lasers.laser_managers.ilaser_manager import ILaserManager
from pychron.pychron_constants import (
from pychron.pyscripts.context_managers import (
from pychron.pyscripts.decorators import verbose_skip, makeRegistry, calculate_duration
from pychron.pyscripts.valve_pyscript import ValvePyScript

COMPRE = re.compile(r"[A-Za-z]*")

# make a registry to hold all the commands exposed by ExtractionPyScript
# used when building the context
# see PyScript.get_context and get_command_register
command_register = makeRegistry()

[docs]class ExtractionPyScript(ValvePyScript): """ The ExtractionPyScript is used to program the extraction and gettering of sample gas. """ automated_run = None _resource_flag = None info_color = EXTRACTION_COLOR snapshots = List videos = List extraction_context = Dict _extraction_positions = List _grain_polygons = List def set_load_identifier(self, v): self.setup_context(load_identifier=v) def set_run_identifier(self, v): self.setup_context(run_identifier=v)
[docs] def get_extraction_positions(self, clear=True): """ Returns a list of x,y,z tuples each tuple represents where the extraction occurred if clear is True (default) ``self._extraction_positions`` set to an empty list :return: list of x,y,z tuples :rtype: list of tuples """ ret = self._extraction_positions if clear: self._extraction_positions = [] return ret
def get_grain_polygons(self): m = self._grain_polygons if not m: m = self._extraction_action(("get_grain_polygon_blob", (), {})) return m
[docs] def get_response_blob(self): """ Get the extraction device's response blob :return: response blob. binary string representing time v measured output :rtype: str """ result = self._extraction_action(("get_response_blob", (), {})) return result or b""
[docs] def get_output_blob(self): """ Get the extraction device's output blob :return: output blob: binary string representing time v percent output :rtype: str """ result = self._extraction_action(("get_output_blob", (), {})) return result or b""
[docs] def get_setpoint_blob(self): """ Get the extraction device's setpoint blob :return: setpoint blob: binary string representing time v requested setpoint :rtype: str """ result = self._extraction_action(("get_setpoint_blob", (), {})) return result or b""
[docs] def output_achieved(self): """ Return a formated string with the extraction "heating" results:: Requested Output= 100.000 Achieved Output= 99.012 :return: Formatted string with results :rtype: str """ request = self.extract ach = self._extraction_action(("get_achieved_output", (), {})) try: request = float(request) except (ValueError, TypeError): request = 0 try: ach = float(ach) except (ValueError, TypeError): ach = 0 return ach, request
def get_active_pid_parameters(self): result = self._extraction_action(("get_active_pid_parameters", (), {})) return result or "" def get_command_register(self): cm = super(ExtractionPyScript, self).get_command_register() return list(command_register.commands.items()) + cm
[docs] def set_default_context(self, **kw): """ provide default values for all the properties exposed in the script """ for attr in ("analysis_type", "extract_device"): if attr not in kw: kw[attr] = "" self.setup_context( position="", pattern="", extract_value=0, extract_units="", tray="", ramp_rate=0, ramp_duration=0, duration=0, delay_after=0, cleanup=0, pre_cleanup=0, post_cleanup=0, cryo_temperature=0, light_value=0, beam_diameter=None, load_identifier="default_load", run_identifier="default_runid", **kw )
# ========================================================================== # commands # ========================================================================== @verbose_skip @command_register def store_manometer_pressure(self, idx=0): result = self._manager_action( ("get_manometer_pressure", (), {"idx": idx}), protocol=EL_PROTOCOL ) self.extraction_context["manometer_pressure{}".format(idx)] = result return result @verbose_skip @command_register def get_manometer_pressure(self, idx=0): result = self._manager_action( ("get_manometer_pressure", (), {"idx": idx}), protocol=EL_PROTOCOL ) return result @verbose_skip @command_register def get_pressure(self, controller, gauge): result = self._manager_action( ("get_pressure", (controller, gauge), {}), protocol=EL_PROTOCOL ) return result @verbose_skip @command_register def set_cryo(self, value, device_name=None, block=False, delay=1): result = self._manager_action( ( "set_cryo", (value,), {"block": block, "device_name": device_name, "delay": delay}, ), protocol=EL_PROTOCOL, ) self.debug("set cyro result={}".format(result)) return result @verbose_skip @command_register def get_cryo_temp(self, value=1): result = self._manager_action( ("get_cryo_temp", (value,), {}), protocol=EL_PROTOCOL ) return result @calculate_duration @command_register def begin_heating_interval( self, duration, min_rise_rate=None, check_time=60, check_delay=60, check_period=1, temperature=None, timeout=300, tol=10, name=None, calc_time=False, ): duration = float(duration) if calc_time: self._estimated_duration += duration if self._cancel: return def wait(dur, flag, n): if not min_rise_rate: self._sleep(dur) else: st = time.time() self._sleep(check_delay, "Heating check delay") t1 = time.time() r1 = self._extraction_action(("get_process_value",)) self._sleep(check_time, "Checking rise rate") t2 = time.time() r2 = self._extraction_action(("get_process_value",)) rr = (r2 - r1) / (t2 - t1) if rr < min_rise_rate: self.warning( "Failed to heat. Rise Rate={:0.1f}. Min Rise Rate={:0.1f}".format( rr, min_rise_rate ) ) self.cancel() flag.set() else: if temperature: self._set_extraction_state( "Waiting to reach temperature {}".format(temperature) ) st = time.time() while 1: sti = time.time() if sti - st < timeout: self._set_extraction_state( "Failed to reach temperature {}".format(r2) ) self.warning( "Failed to reach temperature {}".format(r2) ) self.cancel() break r2 = self._extraction_action(("get_process_value",)) if r2: if abs(r2 - temperature) < tol: self._set_extraction_state( "Reached Temperature {}".format(r2) ) break else: self.warning("Failed to get response.") self.cancel() break time.sleep(max(0, check_period - (time.time() - sti))) self._sleep(dur, "Time at Temperature") else: rem = dur - (time.time - st) self._sleep( rem, ) if not self._cancel: self.console_info("{} finished".format(n)) flag.set() t, f = None, None if name is None: name = "Interval {}".format(self._interval_stack.qsize() + 1) if not self.testing_syntax: f = Event() self.console_info( "BEGIN HEATING INTERVAL {} waiting for {}".format(name, duration) ) t = Thread(name=name, target=wait, args=(duration, f, name)) t.start() self._interval_stack.put((t, f, name)) def _set_extraction_state(self, msg, color="red", flash=False): self._manager_action( ("set_extract_state", (msg,), {"color": color, "flash": flash}) ) @verbose_skip @command_register def set_response_recorder_period(self, p): self._extraction_action(("set_response_recorder_period", (p,), {})) @verbose_skip @command_register def start_response_recorder(self): self._extraction_action(("start_response_recorder",)) @verbose_skip @command_register def stop_response_recorder(self): self._extraction_action(("stop_response_recorder",)) @verbose_skip @command_register def check_reached_setpoint(self): self._extraction_action(("check_reached_setpoint",)) @verbose_skip @command_register def wake(self): self._extraction_action("wake") self._manager_actions("wake") @verbose_skip @command_register def pause(self, delay=2): self._sleep(delay, paused=True)
[docs] @verbose_skip @command_register def waitfor( self, func_or_tuple, start_message="", end_message="", check_period=1, timeout=0, func_kw=None, ): """ tuple format: (device_name, function_name, comparison, ...) addition tuple elements are passed to function_name comparison :: x<10 10<x<20 callable can of form ``func() or func(ti) or func(ti, i)`` where ``ti`` is the current relative time (relative to start of waitfor) and ``i`` is a counter :param func_or_tuple: wait for function to return True :type func_or_tuple: callable, tuple :param start_message: Message to display at start :type start_message: str :param end_message: Message to display at end :type end_message: str :param check_period: Delay between checks in seconds :type check_period: int, float :param timeout: Cancel waiting after ``timeout`` seconds :type timeout: int, float """ include_time = False include_time_and_count = False if isinstance(func_or_tuple, tuple): func = self._make_waitfor_func(*func_or_tuple, func_kw=func_kw) else: func = func_or_tuple args = inspect.getfullargspec(func).args if len(args) == 1: include_time = True elif len(args) == 2: include_time_and_count = True if not func: self.debug("no waitfor function") self.cancel() self.console_info("waitfor started. {}".format(start_message)) st = time.time() i = 0 while 1: if self.is_canceled(): self.console_info("waitfor canceled") return ct = time.time() - st if timeout and ct > timeout: self.warning("waitfor timed out after {}s".format(timeout)) self.cancel() return if include_time: args = (ct,) elif include_time_and_count: args = (ct, i) i += 1 else: args = tuple() if func(*args): self.console_info("waitfor ended. {}".format(end_message)) break time.sleep(check_period)
@verbose_skip @command_register def power_map(self, cx, cy, padding, bd, power): pass # @verbose_skip # @command_register # def degas(self, lumens=0, duration=0): # self._extraction_action([('do_machine_vision_degas', (lumens, duration), {})]) @verbose_skip @command_register def autofocus(self, set_zoom=True): self._extraction_action(("do_autofocus", (), {"set_zoom": set_zoom})) @verbose_skip @command_register def set_light(self, value=""): self._extraction_action(("set_light", (value,), {}))
[docs] @verbose_skip @command_register def snapshot(self, name="", prefix="", view_snapshot=False, pic_format=".jpg"): """ if name not specified use RID_Position e.g 12345-01A_3 """ if not name: pos = "_".join(self.position) name = "{}_{}".format(self.run_identifier, pos) name = "{}{}".format(prefix, name) snap = self._extraction_action( ("take_snapshot", (name, pic_format), {"view_snapshot": view_snapshot}) ) if snap: self.snapshots.append(snap)
@command_register def grain_polygon(self): return GrainPolygonCTX(self) @command_register def lighting(self, value=75): return LightingCTX(self, value) @command_register def video_recording(self, name="video"): return RecordingCTX(self, name) @verbose_skip @command_register def start_video_recording(self, name="video"): self._extraction_action(("start_video_recording", (), {"name": name})) @verbose_skip @command_register def stop_video_recording(self, save_db=True): ps = self._extraction_action(("stop_video_recording",)) if save_db: if ps: self.videos.append(ps) @verbose_skip @command_register def set_x(self, value, velocity=""): self._set_axis("x", value, velocity) @verbose_skip @command_register def set_y(self, value, velocity=""): self._set_axis("y", value, velocity) @verbose_skip @command_register def set_z(self, value, velocity=""): self._set_axis("z", value, velocity) @verbose_skip @command_register def set_xy(self, value, velocity=""): self._set_axis("xy", value, velocity) @verbose_skip @command_register def set_motor_lock(self, name="", value=""): if name and value is not "": l = "YES" if value else "NO" self.console_info("set motor lock to {}".format(name, l)) self._extraction_action(("set_motor_lock", (name, value), {})) @verbose_skip @command_register def set_motor(self, name="", value=""): self.console_info('setting motor "{}" to {}'.format(name, value)) if name is not "" and value is not "": if value is not None: self._extraction_action(("set_motor", (name, value), {})) @verbose_skip @command_register def get_value(self, name): try: print(name, self.get_context()[name]) return self.get_context()[name] except KeyError: self.warning("no name {} in context".format(name)) pass @verbose_skip @command_register def move_to_position(self, position="", autocenter=True, block=True): if position == "": position = self.position if position: position_ok = True if isinstance(position, (list, tuple)): position_ok = all(position) else: position_ok = False if position_ok: ed = self.extract_device self.console_info("{} move to position {}".format(ed, position)) success = self._extraction_action( ( "move_to_position", (position,), {"autocenter": autocenter, "block": block}, ) ) if not success:"{} move to position failed".format(ed)) self.cancel() else: self.console_info("move to position succeeded") return True else: self.console_info("move not required. position is None") return True @verbose_skip @command_register def dump_sample(self): success = self._extraction_action(("dump_sample", (), {"block": True})) if not success:"{} dump sample failed".format(self.extract_device)) self.cancel() else: self.console_info("dump sample succeeded") return True @verbose_skip @command_register def drop_sample(self, position=""): success = self._extraction_action( [("drop_sample", (position,), {"block": True})] ) if not success:"{} drop sample failed".format(self.extract_device)) self.cancel() else: self.console_info("drop sample succeeded") return True @verbose_skip @command_register def execute_pattern(self, pattern="", block=True, duration=None): if pattern == "": pattern = self.pattern st = time.time() # set block=True to wait for pattern completion self._extraction_action( ("execute_pattern", (pattern,), {"block": block, "duration": duration}) ) return time.time() - st @verbose_skip @command_register def set_tray(self, tray=""): if tray == "": tray = self.tray self.console_info("set tray to {}".format(tray)) result = self._extraction_action(("set_stage_map", (tray,), {})) return result
[docs] @verbose_skip @command_register def load_pipette(self, identifier, timeout=300): """ this is a non blocking command. it simply sends a command to apis to start one of its runscripts. it is the ExtractionPyScripts responsiblity to handle the waiting. use the waitfor command to wait for signals from apis. """ from pychron.external_pipette.apis_manager import InvalidPipetteError if self.analysis_type == "blank": cmd = "load_blank_non_blocking" else: cmd = "load_pipette_non_blocking" try: # bug _manager_action only with except tuple of len 1 for args ret = self._extraction_action( (cmd, (identifier,), {"timeout": timeout}), name="externalpipette", protocol=IPipetteManager, ) return ret except InvalidPipetteError as e: self.cancel(protocol=IPipetteManager) e = str(e) self.warning(e) return e
[docs] @verbose_skip @command_register def extract_pipette(self, identifier="", timeout=300): """ this is an atomic command. use the apis_controller config file to define the isolation procedures. """ from pychron.external_pipette.apis_manager import InvalidPipetteError if identifier == "": identifier = self.extract_value cmd = "load_blank" if self.analysis_type == "blank" else "load_pipette" try: # bug _manager_action only with except tuple of len 1 for args result = self._extraction_action( (cmd, (identifier,), {"timeout": timeout, "script": self}), name="externalpipette", protocol=IPipetteManager, ) return result except (TimeoutError, InvalidPipetteError) as e: self.cancel(protocol=IPipetteManager) e = str(e) self.warning(e) return e
@verbose_skip @command_register def set_pid_parameters(self, v): self._extraction_action(("set_pid_parameters", (v,), {})) @verbose_skip @command_register def warmup(self, block=False): self._extraction_action( ( "warmup", (), { "block": block, }, ) ) @verbose_skip @command_register def extract(self, power="", units="", block=None): if power == "": power = self.extract_value if units == "": units = self.extract_units ed = self.extract_device ed = ed.replace("_", " ") # get current position and add as an extraction position pos = self._extraction_action(("get_position", (), {})) self._extraction_positions.append(pos) msg = "{} ON! {}({})".format(ed, power, units) self._set_extraction_state(msg) self.console_info("extract sample to {} ({})".format(power, units)) self._extraction_action(("extract", (power,), {"units": units, "block": block})) @verbose_skip @command_register def end_extract(self): self._set_extraction_state(False) self._extraction_action(("end_extract",)) @verbose_skip @command_register def acquire_grain_polygon_blob(self): result = self._extraction_action(("acquire_grain_polygon",)) if result: self._grain_polygons.append(result) @verbose_skip @command_register def start_grain_polygon(self): self._extraction_action(("start_measure_grain_polygon",)) @verbose_skip @command_register def stop_grain_polygon(self): self._extraction_action(("stop_measure_grain_polygon",)) @verbose_skip @command_register def fire_laser(self): self._extraction_action(("fire_laser",)) @verbose_skip @command_register def ramp(self, start=0, setpoint=0, duration=0, rate=0, period=1): args = start, setpoint, duration, rate, period self.debug( "ramp parameters start={}, " "setpoint={}, duration={}, rate={}, period={}".format(*args) ) def func(i, ramp_step): if self._cancel: return self.console_info("ramp step {}. setpoint={}".format(i, ramp_step)) if not self._extraction_action(("set_laser_power", (ramp_step,), {})): return if self._cancel: return return True st = time.time() rmp = Ramper() rmp.ramp(func, start, setpoint, duration, rate, period) return time.time() - st @verbose_skip @command_register def acquire(self, name=None, clear=False): if self.runner is None: self.debug("+++++++++++++++++++++++ Runner is None") return self.console_info("acquire {}".format(name)) r = self.runner.get_resource(name) if not clear: if r.isSet(): self.console_info("waiting for access") # if self.manager: # msg = 'Waiting for Resource Access. "{}"'.format(name) # self.manager.set_extract_state(msg, color='red') msg = 'Waiting for Resource Access. "{}"'.format(name) self._set_extraction_state(msg) while r.isSet(): if self._cancel: break self._sleep(1) if not self.runner.reset_connection(): self.cancel() break if not self._cancel: self._resource_flag = r self.runner.acquire(name) self.console_info("{} acquired".format(name)) self._set_extraction_state(False) # if self.manager: # self.manager.set_extract_state(False) @verbose_skip @command_register def wait(self, name=None, criterion=0): if self.runner is None: self.debug("+++++++++++++++++++++++ Runner is None") return self.console_info("waiting for {} = {}".format(name, criterion)) r = self.runner.get_resource(name) cnt = 0 resp = if resp is not None: while resp != criterion: time.sleep(1) # only verbose every 10s resp = % 10 == 0) if resp is None: continue cnt += 1 if cnt > 100: cnt = 0 self.console_info("finished waiting") @verbose_skip @command_register def release(self, name=None): self.console_info("release {}".format(name)) if self.runner is None: self.debug("+++++++++++++++++++++++ Runner is None") return if not self.runner.release(name): self.console_info("Could not release {}".format(name)) @verbose_skip @command_register def set_resource(self, name=None, value=1): if self.runner is None: self.debug("+++++++++++++++++++++++ Runner is None") return r = self.runner.get_resource(name) if r is not None: r.set(value) else: self.console_info("Could not set {}".format(name)) @verbose_skip @command_register def get_resource_value(self, name=None): if self.runner is None: self.debug("+++++++++++++++++++++++ Runner is None") return r = self.runner.get_resource(name) resp = None if r is not None: if hasattr(r, "get"): resp = r.get() else: resp = r.isSet() else: self.console_info("Could not get {}".format(name)) self.debug("Get Resource Value {}={}".format(name, resp)) return resp @verbose_skip @command_register def enable(self): ed = self.extract_device ed = ed.replace("_", " ") self._set_extraction_state("{} Enabled".format(ed), flash=False) # self.manager.set_extract_state('{} Enabled'.format(ed)) return self._manager_action( ("enable_device", (), {}), protocol=ILaserManager, name=self.extract_device ) @verbose_skip @command_register def disable(self): return self._disable() @verbose_skip @command_register def prepare(self): return self._extraction_action(("prepare", (), {})) @verbose_skip @command_register def set_intensity_scalar(self, v): return self._automated_run_call("py_set_intensity_scalar", v) @verbose_skip @command_register def get_device(self, name): return self._get_device(name) # ========================================================================== # properties # ========================================================================== @property def duration(self): return self._get_property(DURATION) @property def cleanup(self): return self._get_property(CLEANUP) @property def pre_cleanup(self): return self._get_property(PRECLEANUP) @property def post_cleanup(self): return self._get_property(POSTCLEANUP) @property def cryo_temperature(self): return self._get_property(CRYO_TEMP) @property def pattern(self): return self._get_property(PATTERN) @property def analysis_type(self): at = self._get_property("analysis_type") self.debug( "getting analysis type for {}. " "analysis_type={}".format(self.run_identifier, at) ) return at @property def extract_device(self): return self._get_property("extract_device") @property def tray(self): return self._get_property("tray") # return self.get_context()['tray'] @property def position(self): """ if position is 0 return None """ # pos = self.get_context()['position'] pos = self._get_property("position") if pos: return pos @property def extract_value(self): return self._get_property("extract_value") # return self.get_context()['extract_value'] @property def extract_units(self): return self._get_property("extract_units") # return self.get_context()['extract_units'] @property def beam_diameter(self): return self._get_property("beam_diameter") # return self.get_context()['beam_diameter'] @property def run_identifier(self): return self._get_property("run_identifier") @property def load_identifier(self): return self._get_property("load_identifier") @property def light_value(self): return self._get_property("light_value") # =============================================================================== # private # =============================================================================== def _failed_actuation_hook(self): self._automated_run_call("set_end_after") def _check_responding(self, rr, st): self._extraction_action(("check_responding", (rr, st), {})) def _abort_hook(self): self.disable() # def _cancel_hook(self): # self.disable() def _get_device(self, name): app = self._get_application() if app is not None: return app.get_service_by_name(ICoreDevice, name) else: self.warning("_get_device - No application") def _make_waitfor_func(self, name, funcname, comp, func_kw=None): if func_kw is None: func_kw = {} dev = self._get_device(name) if dev: devfunc = getattr(dev, funcname) m = COMPRE.findall(comp) if m: k = m[0] def func(*a): print("devfunc", devfunc(**func_kw)) print("eval", eval(comp, {k: devfunc(**func_kw)})) return eval(comp, {k: devfunc(**func_kw)}) return func else: self.warning("invalid comparison. valid e.g.=x<10 comp={}".format(comp)) else: self.warning('no device available named "{}"'.format(name)) def _extraction_action(self, *args, **kw): if "name" not in kw or kw["name"] is None: kw["name"] = self.extract_device if "protocol" not in kw or kw["protocol"] is None: kw["protocols"] = ILaserManager, IFurnaceManager if kw["name"] in ( "Extract Device", "ExtractDevice", "extract device", "extractdevice", NULL_STR, LINE_STR, ): self.debug("no extraction action") return return self._manager_action(*args, **kw) def _extraction_actions(self, *args, **kw): if "name" not in kw or kw["name"] is None: kw["name"] = self.extract_device if "protocol" not in kw or kw["protocol"] is None: kw["protocols"] = ILaserManager, IFurnaceManager if kw["name"] in ( "Extract Device", "ExtractDevice", "extract device", "extractdevice", NULL_STR, LINE_STR, ): self.debug("no extraction action") return return self._manager_actions(*args, **kw) def _disable(self, protocol=None): self.debug("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% disable") self._set_extraction_state(False) # if self.manager: # self.manager.set_extract_state(False) return self._extraction_action(("disable_device", (), {}), protocol=protocol) def _set_axis(self, name, value, velocity): kw = dict(block=True) if velocity: kw["velocity"] = value success = self._extraction_action(("set_{}".format(name), (value,), kw)) if not success: self.console_info("{} move to position failed".format(self.extract_device)) else: self.console_info("move to position suceeded") return True def _cancel_hook(self, **kw): if self._resource_flag: self._resource_flag.clear() # disable the extract device self._disable(**kw) # stop patterning self._stop_pattern(**kw) self.stop_grain_polygon() def _stop_pattern(self, protocol=None): self._extraction_action(("stop_pattern", (), {}), protocol=protocol) def _automated_run_call(self, func, *args, **kw): if self.automated_run is None: return if isinstance(func, str): func = getattr(self.automated_run, func) return func(*args, **kw)
# ============= EOF ====================================