Source code for pychron.pyscripts.valve_pyscript

# ===============================================================================
# Copyright 2012 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
import time

from traits.api import Any

from pychron.globals import globalv
from pychron.pychron_constants import NULL_STR, EL_PROTOCOL
from pychron.pyscripts.decorators import verbose_skip, makeRegistry, makeNamedRegistry
from pychron.pyscripts.pyscript import PyScript

command_register = makeRegistry()
named_register = makeNamedRegistry(command_register)


[docs]class ValvePyScript(PyScript): runner = Any allow_lock = False retry_actuation = True cancel_on_failed_actuation = True def get_command_register(self): return list(command_register.commands.items()) def gosub(self, *args, **kw): kw["runner"] = self.runner s = super(ValvePyScript, self).gosub(*args, **kw) if s: s.runner = None return s @verbose_skip @command_register def lock(self, name=None, description=""): if description is None: description = NULL_STR self.console_info("locking {} ({})".format(name, description)) if self.allow_lock: return self._manager_actions( [("lock_valve", (name,), dict(mode="script", description=description))], protocol=EL_PROTOCOL, ) else: self.warning("Valve locking not enabled for this script") @verbose_skip @command_register def unlock(self, name=None, description=""): if description is None: description = NULL_STR self.console_info("unlocking {} ({})".format(name, description)) if self.allow_lock: return self._manager_actions( [ ( "unlock_valve", (name,), dict(mode="script", description=description), ) ], protocol=EL_PROTOCOL, ) else: self.warning("Valve locking not enabled for this script") @verbose_skip @named_register("open") def _m_open( self, name=None, description="", cancel_on_failed_actuation=True, ntries=100 ): self._valve_actuation( "open", name, description, cancel_on_failed_actuation, ntries ) @verbose_skip @command_register def close( self, name=None, description="", cancel_on_failed_actuation=True, ntries=100 ): self._valve_actuation( "close", name, description, cancel_on_failed_actuation, ntries ) def _valve_actuation( self, action, name, description, cancel_on_failed_actuation, ntries ): self.console_info( "{} name={} desc={}".format( action, name or NULL_STR, description or NULL_STR ) ) result = self._manager_actions( [ ( "{}_valve".format(action), (name,), dict(mode="script", description=description), ) ], protocol=EL_PROTOCOL, ) self.debug( "-------------------------- {} {} ({}) result={}".format( action, name, description, result ) ) if result is not None: if not self._finish_valve_change( action, result, name, description, ntries=ntries ): if not globalv.experiment_debug: if cancel_on_failed_actuation: self.cancel() msg = 'Failed to {} valve name="{}", description="{}"'.format( action, name, description ) else: # certain valves can fail to actuate but still you'd want to analyses the gas in the # extraction line. So we dont want to cancel. but we dont want to run another analysis either. # setting "end_at_run_completion" should do it self._failed_actuation_hook() msg = 'Failed to {} valve name="{}", description="{}". Continuing'.format( action, name, description ) from pychron.core.ui.gui import invoke_in_main_thread invoke_in_main_thread(self.warning_dialog, msg) else: self.debug("Experiment debug mode. not canceling") @verbose_skip @command_register def is_open(self, name=None, description=""): self.console_info( "is name={} desc={} open?".format(name or NULL_STR, description or NULL_STR) ) result = self._get_valve_state(name, description) if result: return result[0] is True @verbose_skip @command_register def is_closed(self, name=None, description=""): self.console_info("is {} ({}) closed?".format(name, description)) result = self._get_valve_state(name, description) if result: r = result[0] is False self.debug("is closed {}".format(r)) return r # private def _failed_actuation_hook(self): pass def _finish_valve_change( self, action, result, name, description, retry=1, ntries=100 ): """ :param action: :param result: :param name: :param description: :param retry: :return: """ ok, changed = result[0] # if changed: # time.sleep(0.25) locked = self._manager_actions( [ ( "get_software_lock", (name,), dict(mode="script", description=description), ) ], protocol=EL_PROTOCOL, ) # if action == 'close': # ok = not ok self.debug("action={}, ok={}, locked={}".format(action, ok, locked)) change_ok = True if not ok and not locked[0]: msg = 'Failed to {} valve Name="{}", Description="{}"'.format( action, name or "", description or "" ) self.console_info(msg) change_ok = False if self.retry_actuation and retry < ntries: time.sleep(1) msg = 'Retry actuation. i={} Action="{}", Name="{}", Description="{}"'.format( retry, action, name or "", description or "" ) self.console_info(msg) result = self._manager_actions( [ ( "{}_valve".format(action), (name,), dict(mode="script", description=description), ) ], protocol=EL_PROTOCOL, ) if result is not None: change_ok = self._finish_valve_change( action, result, name, description, retry=retry + 1, ntries=ntries, ) return change_ok # return not cancel # if cancel: # if not globalv.experiment_debug: # # self.warning_dialog(msg) # self.cancel() # else: # self.debug('Experiment debug mode. not canceling') # else: # return True def _get_valve_state(self, name, description): return self._manager_actions( [("get_valve_state", (name,), dict(description=description))], protocol=EL_PROTOCOL, )
# ============= EOF =============================================