# ===============================================================================
# Copyright 2011 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# ============= enthought library imports =======================
# ============= standard library imports ========================
from __future__ import absolute_import
import codecs
import time
from threading import Lock, RLock
# ============= local library imports ==========================
import binascii
from pychron.headless_config_loadable import HeadlessConfigLoadable
def prep_str(s):
"""
"""
ns = ''
if s is None:
s = ''
for c in s:
oc = ord(c)
if not 0x20 <= oc <= 0x7E:
c = '[{:02d}]'.format(ord(c))
ns += c
return ns
def remove_eol_func(re):
"""
"""
if re is not None:
if isinstance(re, bytes):
try:
re = re.decode('utf-8')
except UnicodeDecodeError:
try:
re = codecs.decode(re, 'hex')
except binascii.Error:
re = ''.join(('[{}]'.format(str(b)) for b in re))
return re.rstrip()
def process_response(re, replace=None, remove_eol=True):
"""
"""
if remove_eol:
re = remove_eol_func(re)
if isinstance(replace, tuple):
re = re.replace(replace[0], replace[1])
# re = prep_str(re)
return re
[docs]class Communicator(HeadlessConfigLoadable):
"""
Base class for all communicators, e.g. SerialCommunicator, EthernetCommunicator
"""
_lock = None
simulation = True
write_terminator = chr(13) # '\r'
handle = None
scheduler = None
address = None
def __init__(self, *args, **kw):
"""
"""
super(Communicator, self).__init__(*args, **kw)
self._lock = RLock()
[docs] def load(self, config, path):
self.set_attribute(config, 'verbose', 'Communications', 'verbose', default=False, optional=True, cast='boolean')
self.set_attribute(config, 'write_terminator', 'Communications', 'write_terminator',
default=chr(13),
optional=True)
if self.write_terminator == 'chr(10)':
self.write_terminator = chr(10)
if self.write_terminator == 'chr(0)':
self.write_terminator = chr(0)
return True
def close(self):
pass
[docs] def delay(self, ms):
"""
sleep ms milliseconds
"""
time.sleep(ms / 1000.0)
def ask(self, *args, **kw):
pass
def tell(self, *args, **kw):
pass
def write(self, *args, **kw):
pass
def read(self, *args, **kw):
pass
[docs] def log_tell(self, cmd, info=None):
"""
Log command as and INFO message
"""
cmd = remove_eol_func(cmd)
ncmd = prep_str(cmd)
if ncmd:
cmd = ncmd
if info is not None:
msg = '{} {}'.format(info, cmd)
else:
msg = cmd
self.info(msg)
[docs] def log_response(self, cmd, re, info=None):
"""
Log command and response as an INFO message
"""
cmd = remove_eol_func(cmd)
ncmd = prep_str(cmd)
if ncmd:
cmd = ncmd
if len(re) > 100:
re = '{}...'.format(re[:97])
if info and info != '':
msg = '{} {} ===>> {}'.format(info, cmd, re)
else:
msg = '{} ===>> {}'.format(cmd, re)
self.info(msg)
@property
def lock(self):
return self._lock
# ============= EOF ====================================