Source code for pychron.pyscripts.valve_pyscript

# ===============================================================================
# Copyright 2012 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
from __future__ import absolute_import

import time

from traits.api import Any

from pychron.globals import globalv
from pychron.pychron_constants import NULL_STR
from pychron.pyscripts.pyscript import PyScript, verbose_skip, makeRegistry, \
    makeNamedRegistry

ELPROTOCOL = 'pychron.extraction_line.extraction_line_manager.ExtractionLineManager'

command_register = makeRegistry()
named_register = makeNamedRegistry(command_register)


[docs]class ValvePyScript(PyScript): runner = Any allow_lock = False retry_actuation = True def get_command_register(self): return list(command_register.commands.items()) def gosub(self, *args, **kw): kw['runner'] = self.runner s = super(ValvePyScript, self).gosub(*args, **kw) if s: s.runner = None return s @verbose_skip @command_register def lock(self, name=None, description=''): if description is None: description = NULL_STR self.console_info('locking {} ({})'.format(name, description)) if self.allow_lock: return self._manager_actions([('lock_valve', (name,), dict(mode='script', description=description))], protocol=ELPROTOCOL) else: self.warning('Valve locking not enabled for this script') @verbose_skip @command_register def unlock(self, name=None, description=''): if description is None: description = NULL_STR self.console_info('unlocking {} ({})'.format(name, description)) if self.allow_lock: return self._manager_actions([('unlock_valve', (name,), dict(mode='script', description=description))], protocol=ELPROTOCOL) else: self.warning('Valve locking not enabled for this script') @verbose_skip @named_register('open') def _m_open(self, name=None, description=''): self._valve_actuation('open', name, description) @verbose_skip @command_register def close(self, name=None, description=''): self._valve_actuation('close', name, description) def _valve_actuation(self, action, name, description): self.console_info('{} name={} desc={}'.format(action, name or NULL_STR, description or NULL_STR)) result = self._manager_actions([('{}_valve'.format(action), (name,), dict(mode='script', description=description))], protocol=ELPROTOCOL) self.debug('-------------------------- {} {} ({}) result={}'.format(action, name, description, result)) if result is not None: if not self._finish_valve_change(action, result, name, description): if not globalv.experiment_debug: from pychron.core.ui.gui import invoke_in_main_thread msg = 'Failed to {} valve name="{}", description="{}"'.format(action, name, description) invoke_in_main_thread(self.warning_dialog, msg) self.cancel() else: self.debug('Experiment debug mode. not canceling') @verbose_skip @command_register def is_open(self, name=None, description=''): self.console_info('is name={} desc={} open?'.format(name or NULL_STR, description or NULL_STR)) result = self._get_valve_state(name, description) if result: return result[0] is True @verbose_skip @command_register def is_closed(self, name=None, description=''): self.console_info('is {} ({}) closed?'.format(name, description)) result = self._get_valve_state(name, description) if result: r = result[0] is False self.debug('is closed {}'.format(r)) return r # private def _finish_valve_change(self, action, result, name, description, retry=1): """ :param action: :param result: :param name: :param description: :param retry: :return: """ ok, changed = result[0] # if changed: # time.sleep(0.25) locked = self._manager_actions([('get_software_lock', (name,), dict(mode='script', description=description))], protocol=ELPROTOCOL) # if action == 'close': # ok = not ok self.debug('action={}, ok={}, locked={}'.format(action, ok, locked)) change_ok = True if not ok and not locked[0]: msg = 'Failed to {} valve Name="{}", Description="{}"'.format(action, name or '', description or '') self.console_info(msg) change_ok = False if self.retry_actuation and retry < 100: time.sleep(1) msg = 'Retry actuation. i={} Action="{}", Name="{}", Description="{}"'.format(retry, action, name or '', description or '') self.console_info(msg) result = self._manager_actions([('{}_valve'.format(action), (name,), dict(mode='script', description=description))], protocol=ELPROTOCOL) if result is not None: change_ok = self._finish_valve_change(action, result, name, description, retry=retry + 1) return change_ok # return not cancel # if cancel: # if not globalv.experiment_debug: # # self.warning_dialog(msg) # self.cancel() # else: # self.debug('Experiment debug mode. not canceling') # else: # return True def _get_valve_state(self, name, description): return self._manager_actions([('get_valve_state', (name,), dict(description=description))], protocol=ELPROTOCOL)
# ============= EOF =============================================