Source code for pychron.hardware.core.communicators.serial_communicator

# ===============================================================================
# Copyright 2011 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
# =============enthought library imports=======================

# =============standard library imports ========================

import codecs
import glob
import os
import sys
import time

import serial

# =============local library imports  ==========================
from .communicator import Communicator, process_response, prep_str, remove_eol_func


def get_ports():
    if sys.platform == "win32":
        ports = ["COM{}".format(i + 1) for i in range(256)]
    else:
        usb = glob.glob("/dev/tty.usb*")
        furpi = glob.glob("/dev/furpi.*")
        pychron = glob.glob("/dev/pychron.*")
        slab = glob.glob("/dev/tty.SLAB*")
        if sys.platform == "darwin":
            keyspan = glob.glob("/dev/tty.U*")
        else:
            keyspan = glob.glob("/dev/ttyU*")
        ports = keyspan + usb + furpi + pychron + slab

    return ports


[docs]class SerialCommunicator(Communicator): """ Base Class for devices that communicate using a rs232 serial port. Using Keyspan serial converter is the best option for a Mac class is built on top of pyserial. Pyserial is used to create a handle and this class uses the handle to read and write. handles are created when a serial device is opened setup args are loaded using load(). this method should be overwritten to load specific items. """ # char_write = False _auto_find_handle = False _auto_write_handle = False baudrate = None port = None bytesize = None parity = None stopbits = None timeout = None id_query = "" id_response = "" read_delay = None read_terminator = None read_terminator_position = None clear_output = False echos_command = False _config = None _comms_report_attrs = ( "port", "baudrate", "bytesize", "parity", "stopbits", "timeout", ) @property def address(self): return self.port def test_connection(self): return self.handle is not None def reset(self): handle = self.handle try: isopen = handle.isOpen() orate = handle.getBaudrate() if isopen: handle.close() handle.setBaudrate(0) handle.open() time.sleep(0.1) handle.close() handle.setBaudrate(orate) if isopen: handle.open() except Exception: self.warning("failed to reset connection") def close(self): if self.handle: self.debug("closing handle {}".format(self.handle)) self.handle.close() def load_comdict(self, port, baudrate=9600, bytesize=8, parity=None, stopbits=1): self.baudrate = baudrate self.port = port self.set_parity(parity) self.set_stopbits(stopbits) self.bytesize = bytesize def load(self, config, path): self.config_path = path self._config = config self.set_attribute(config, "port", "Communications", "port") self.set_attribute( config, "baudrate", "Communications", "baudrate", cast="int", optional=True ) self.set_attribute( config, "bytesize", "Communications", "bytesize", cast="int", optional=True ) self.set_attribute( config, "timeout", "Communications", "timeout", cast="float", optional=True ) self.set_attribute( config, "clear_output", "Communications", "clear_output", cast="boolean", optional=True, ) parity = self.config_get(config, "Communications", "parity", optional=True) self.set_parity(parity) stopbits = self.config_get(config, "Communications", "stopbits", optional=True) self.set_stopbits(stopbits) self.set_attribute( config, "read_delay", "Communications", "read_delay", cast="float", optional=True, default=25, ) self.set_attribute( config, "read_terminator", "Communications", "terminator", optional=True, default=None, ) self.set_attribute( config, "read_terminator_position", "Communications", "terminator_position", optional=True, default=None, cast="int", ) self.set_attribute( config, "write_terminator", "Communications", "write_terminator", optional=True, default=b"\r", ) if self.write_terminator == "CRLF": self.write_terminator = b"\r\n" if self.read_terminator == "CRLF": self.read_terminator = b"\r\n" if self.read_terminator == "ETX": self.read_terminator = chr(3) def set_parity(self, parity): if parity: self.parity = getattr(serial, "PARITY_%s" % parity.upper()) def set_stopbits(self, stopbits): if stopbits: if stopbits in ("1", 1): stopbits = "ONE" elif stopbits in ("2", 2): stopbits = "TWO" self.stopbits = getattr(serial, "STOPBITS_{}".format(stopbits.upper())) def tell(self, cmd, is_hex=False, info=None, verbose=True, **kw): """ """ if self.handle is None: if verbose: info = "no handle" self.log_tell(cmd, info) return with self._lock: self._write(cmd, is_hex=is_hex) if verbose: self.log_tell(cmd, info) def read(self, nchars=None, *args, **kw): """ """ with self._lock: if nchars is not None: r = self._read_nchars(nchars) else: r = self._read_terminator(*args, **kw) return r def ask( self, cmd, is_hex=False, verbose=True, delay=None, replace=None, remove_eol=True, info=None, nbytes=None, handshake_only=False, handshake=None, read_terminator=None, terminator_position=None, nchars=None, ): """ """ if self.handle is None: if verbose: x = prep_str(cmd.strip()) self.info("no handle {}".format(x)) return if not self.handle.isOpen(): return with self._lock: if self.clear_output: self.handle.flushInput() self.handle.flushOutput() cmd = self._write(cmd, is_hex=is_hex) if cmd is None: return if is_hex: if nbytes is None: nbytes = 8 re = self._read_hex(nbytes=nbytes, delay=delay) elif handshake is not None: re = self._read_handshake(handshake, handshake_only, delay=delay) elif nchars is not None: re = self._read_nchars(nchars) else: re = self._read_terminator( delay=delay, terminator=read_terminator, terminator_position=terminator_position, ) if remove_eol and not is_hex: re = remove_eol_func(re) if verbose: pre = process_response(re, replace, remove_eol=not is_hex) self.log_response(cmd, pre, info) return re
[docs] def open(self, **kw): """ Use pyserial to create a handle connected to port wth baudrate default handle parameters baudrate=9600 bytesize=EIGHTBITS parity= PARITY_NONE stopbits= STOPBITS_ONE timeout=None """ port = kw.get("port") if port is None: port = self.port if port is None: self.warning("Port not set") return False # #on windows device handles probably handled differently if sys.platform == "darwin": port = "/dev/tty.{}".format(port) kw["port"] = port for key in ["baudrate", "bytesize", "parity", "stopbits", "timeout"]: v = kw.get(key) if v is None: v = getattr(self, key) if v is not None: kw[key] = v pref = kw.pop("prefs", None) if pref is not None: pref = pref.serial_preference self._auto_find_handle = pref.auto_find_handle self._auto_write_handle = pref.auto_write_handle self.simulation = True if self._validate_address(port): try_connect = True while try_connect: try: self.debug("Connection parameters={}".format(kw)) self.handle = serial.Serial(**kw) try_connect = False self.simulation = False except serial.serialutil.SerialException: try_connect = False self.debug_exception() elif self._auto_find_handle: self._find_handle(**kw) self.debug("Serial device: {}".format(self.handle)) return self.handle is not None # connected is true if handle is not None
# private def _get_report_value(self, key): c, value = super(SerialCommunicator, self)._get_report_value(key) if self.handle: value = getattr(self.handle, key) return c, value def _find_handle(self, **kw): found = False self.simulation = False self.info("Trying to find correct port") port = None for port in get_ports(): self.info("trying port {}".format(port)) kw["port"] = port try: self.handle = serial.Serial(**kw) except serial.SerialException: continue r = self.ask(self.id_query) # use id_response as a callable to do device specific # checking if callable(self.id_response): if self.id_response(r): found = True self.simulation = False break if r == self.id_response: found = True self.simulation = False break if not found: # update the port if self._auto_write_handle and port: # port in form # /dev/tty.USAXXX1.1 p = os.path.split(port)[-1] # remove tty. p = p[4:] self._config.set( "Communication", "port", ) self.write_configuration(self._config, self.config_path) self.handle = None self.simulation = True def _validate_address(self, port): """ use glob to check the avaibable serial ports valid ports start with /dev/tty.U or /dev/tty.usbmodem """ valid = get_ports() if port in valid: return True else: msg = "{} is not a valid port address".format(port) self.warning(msg) if not valid: self.warning("No valid ports") else: self.warning("======== Valid Ports ========") for v in valid: self.warning(v) self.warning("=============================") def _write(self, cmd, is_hex=False): """ use the serial handle to write the cmd to the serial buffer return True if there is an exception writing cmd """ if not self.simulation: # want to write back the original cmd # use command locally command = cmd if not isinstance(command, bytes): command = bytes(command, "utf-8") if is_hex: command = codecs.decode(command, "hex") else: wt = self.write_terminator if wt is not None: if not isinstance(wt, bytes): wt = bytes(wt, "utf-8") command += wt cmd = command try: self.handle.write(command) except ( serial.serialutil.SerialException, OSError, IOError, ValueError, ) as e: self.warning("Serial Communicator write execption: {}".format(e)) return return cmd def _read_nchars(self, nchars, timeout=1, delay=None): return self._read_loop(lambda r: self._get_nchars(nchars, r), delay, timeout) def _read_hex(self, nbytes=8, timeout=1, delay=None): return self._read_loop(lambda r: self._get_nbytes(nbytes, r), delay, timeout) def _read_handshake(self, handshake, handshake_only, timeout=1, delay=None): def hfunc(r): terminated = False ack, r = self._check_handshake(handshake) if handshake_only and ack: r = handshake[0] terminated = True elif ack and r is not None: terminated = True return r, terminated return self._read_loop(hfunc, delay, timeout) def _read_terminator( self, timeout=1, delay=None, terminator=None, terminator_position=None ): if terminator is None: terminator = self.read_terminator if terminator_position is None: terminator_position = self.read_terminator_position if terminator is None: terminator = (b"\r\x00", b"\r\n", b"\r", b"\n") if not isinstance(terminator, (list, tuple)): terminator = (terminator,) def func(r): terminated = False try: if self.echos_command: inw = 1 else: inw = self.handle.inWaiting() r += self.handle.read(inw) if r and r.strip(): for ti in terminator: if terminator_position: terminated = r[terminator_position] == ti else: if isinstance(ti, str): ti = ti.encode() terminated = r.endswith(ti) if terminated: break except BaseException as e: self.warning(e) return r, terminated if self.echos_command: self._read_loop(func, delay, timeout) return self._read_loop(func, delay, timeout) def _get_nbytes(self, *args, **kw): """ 1 byte == 2 chars """ return self._get_nchars(*args, **kw) def _get_nchars(self, nchars, r): handle = self.handle inw = handle.inWaiting() c = min(inw, nchars - len(r)) r += handle.read(c) return r[:nchars], len(r) >= nchars def _check_handshake(self, handshake_chrs): ack, nak = handshake_chrs inw = self.handle.inWaiting() r = self.handle.read(inw) if r: return ack == r[0], r[1:] return False, None def _read_loop(self, func, delay, timeout=1): if delay is not None: time.sleep(delay / 1000.0) elif self.read_delay: time.sleep(self.read_delay / 1000.0) r = b"" st = time.time() handle = self.handle ct = time.time() while ct - st < timeout: if not handle.isOpen(): break try: r, isterminated = func(r) if isterminated: break except (ValueError, TypeError): pass time.sleep(0.01) ct = time.time() if ct - st > timeout: l = len(r) if r else 0 self.info("timed out. {}s r={}, len={}".format(timeout, r, l)) return r
if __name__ == "__main__": s = SerialCommunicator() s.read_delay = 0 s.port = "usbmodemfd1221" s.open() time.sleep(2) s.tell("A", verbose=False) for i in range(10): print("dddd", s.ask("1", verbose=False)) time.sleep(1) # s.tell('ddd', verbose=False) # print s.ask('ddd', verbose=False) # ===================== EOF ==========================================