Source code for pychron.tx.protocols.valve

# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
# ============= standard library imports ========================
# ============= local library imports  ==========================
from __future__ import absolute_import
import random

from pychron.pychron_constants import EL_PROTOCOL
from pychron.tx.errors import DeviceConnectionErrorCode, InvalidGaugeErrorCode
from pychron.tx.protocols.base_valve import BaseValveProtocol
from pychron.tx.registry import FUNC_REGISTRY


def make_wrapper(func, postprocess):
    def wrapper(*args, **kw):
        """
        handler signature is self, manager, args, sender
        """

        ret = func(*args[1:], **kw)
        if postprocess:
            ret = postprocess(ret)
        return ret

    return wrapper


[docs]class ValveProtocol(BaseValveProtocol): manager_protocol = EL_PROTOCOL def _init_hook(self): for k, v in FUNC_REGISTRY.items(): self.register_service(k, make_wrapper(*v)) services = ( ("GetData", "_get_data"), ("Read", "_read"), ("Set", "_set"), ("Ping", "_ping"), ("GetPressure", "_get_pressure"), ) self._register_services(services) def _get_data(self, data): if isinstance(data, dict): offset = data["value"] else: offset = float(data) v = random.random() + offset return str(v) def _get_device(self, name, protocol=None, owner=None): dev = None if self._application is not None: if protocol is None: protocol = "pychron.hardware.core.i_core_device.ICoreDevice" dev = self._application.get_service(protocol, 'name=="{}"'.format(name)) if dev is None: # possible we are trying to get a flag m = self._manager if m: dev = m.get_flag(name) if dev is None: dev = m.get_mass_spec_param(name) else: if owner: dev.set_owner(str(owner)) return dev def _get_error(self, data): return self._manager.get_error() def _set(self, data): if isinstance(data, dict): dname, value = data["device"], data["value"] else: dname, value = data.split(" ") d = self._get_device(dname, owner=self._addr) # self.debug('Set {name} {value}', name=dname, value=value) if d is not None: # self.info('Set {name} to {value}', name=dname, value=value) result = d.set(value) else: result = DeviceConnectionErrorCode(dname, logger=self) return result def _ping(self, data): if isinstance(data, dict): dname = data["device"] else: dname = data d = self._get_device(dname, owner=self._addr) if d is not None: if hasattr(d, "ping"): result = d.ping() else: result = DeviceConnectionErrorCode(dname, logger=self) return result def _read(self, data): if isinstance(data, dict): data = data["value"] # self.debug('Read {data}', data=data) d = self._get_device(data, owner=self._addr) if d is not None: result = d.get(current=True) else: result = DeviceConnectionErrorCode(data, logger=self) return result def _get_pressure(self, data): """ Get the pressure from ``controller``'s ``gauge`` """ manager = self._manager if isinstance(data, dict): controller, gauge = data["controller"], data["gauge"] else: controller, gauge = data p = None if manager: p = manager.get_pressure(controller, gauge) if p is None: p = InvalidGaugeErrorCode(controller, gauge) return str(p)
# ============= EOF =============================================