Source code for pychron.hardware.core.communicators.modbus_communicator

# ===============================================================================
# Copyright 2011 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================


# =============enthought library imports=======================
# =============standard library imports =======================
import binascii
import struct

from pychron.hardware.core.checksum_helper import computeCRC

# =============local library imports  =========================
from pychron.hardware.core.communicators.serial_communicator import SerialCommunicator
from pychron.hardware.core.exceptions import CRCError

WRITE_FUNCTION_CODE = "06"
WRITE_MULTIPLE_FUNCTION_CODE = "10"
READ_FUNCTION_CODE = "03"


[docs]class ModbusCommunicator(SerialCommunicator): """ modbus message syntax [Device address][function code][data][error check] """ slave_address = "01" device_word_order = "low_high" def load(self, config, path): """ """ super(ModbusCommunicator, self).load(config, path) self.set_attribute(config, "slave_address", "Communications", "slave_address") def write( self, register, value, nregisters=1, response_type="register_write", **kw ): """ """ if nregisters == 1: return self.set_single_register(register, value, response_type, **kw) else: return self.set_multiple_registers( register, nregisters, value, response_type, **kw ) def tell(self, *args, **kw): return self.write(*args, **kw) def read(self, register, response_type="float", nregisters=1, **kw): """ """ if "nbytes" not in kw: if response_type == "int": kw["nbytes"] = 7 elif response_type == "float": kw["nbytes"] = 8 return self.read_holding_register(register, nregisters, response_type, **kw) def _execute_request(self, args, response_type, **kw): """ """ cmd = "".join([self.slave_address] + args) # convert hex string into list of ints cmdargs = self._parse_hexstr(cmd, return_type="int") # calculate the CRC and append to message crc = computeCRC(cmdargs) cmd += crc kw["is_hex"] = True if self.scheduler is not None: resp = self.scheduler.schedule(self.ask, args=(cmd,), kwargs=kw) else: resp = self.ask(cmd, **kw) return self._parse_response(cmd, resp, response_type) def _parse_hexstr(self, hexstr, return_type="hex"): """ """ if isinstance(hexstr, bytes): if return_type == "int": def func(b): return int.from_bytes([b], "big") else: def func(b): return b return [func(b) for b in hexstr] else: gen = range(0, len(hexstr), 2) if return_type == "int": return [int(hexstr[i : i + 2], 16) for i in gen] else: return [hexstr[i : i + 2] for i in gen] def _parse_response(self, cmd, resp, response_type): """ """ # self.debug('asdf {} {}'.format(cmd, resp)) if resp is not None and resp is not "simulation": args = self._parse_hexstr(resp) if args: # self.debug('args={}'.format(args)) if bytes(args[:2]) != bytes.fromhex(cmd[:4]): self.warning( "{} != {} {} >> {}".format(cmd[:4], args[:2], cmd, resp) ) return else: return # check the crc crc = args[-2:] calc_crc = computeCRC(args[:-2]) calc_crc = [int(calc_crc[:2], 16), int(calc_crc[2:], 16)] if not crc == calc_crc: msg = "Returned CRC ({}) does not match calculated ({})".format( crc, calc_crc ) self.warning(msg) raise CRCError("{} {}".format(cmd, msg)) else: if response_type == "register_write": return True # ndata = int(args[2], 16) ndata = int.from_bytes([args[2]], "big") dataargs = args[3 : 3 + ndata] if len(dataargs) < ndata: ndata = 4 dataargs = args[3 : 3 + ndata] if ndata > 2: data = [] for i in range(ndata // 4): s = 4 * i low_word = "".join( ["{:02X}".format(d) for d in dataargs[s : s + 2]] ) high_word = "".join( ["{:02X}".format(d) for d in dataargs[s + 2 : s + 4]] ) # low_word = ''.join(dataargs[s:s + 2]) # high_word = ''.join(dataargs[s + 2:s + 4]) if self.device_word_order == "low_high": """ dataargs in low word - high word order 1234 5678 want high word -low word order 5678 1234 """ di = "".join([high_word, low_word]) else: di = "".join([low_word, high_word]) data.append(di) data = "".join(data) if response_type == "float": fmt_str = "!" + "f" * (ndata // 4) resp = struct.unpack(fmt_str, bytes.fromhex(data)) # return a single value if ndata == 4: return resp[0] else: # return a list of values return resp else: data = args[3 : 3 + ndata] return int.from_bytes(data, "big") # return int(data, 16) def set_multiple_registers(self, startid, nregisters, value, response_type, **kw): """ """ data_address = "{:04X}".format(int(startid)) n = "{:04X}".format(int(nregisters)) nbytes = "{:02X}".format(int(nregisters * 2)) if isinstance(value, tuple): value = "".join(["{:04X}".format(vi) for vi in value]) else: # convert decimal value to 32-bit float binstr = struct.pack("!f", value) # convert binary string to a ascii hex string hexstr = binascii.hexlify(binstr) if self.device_word_order == "low_high": high = hexstr[:4] low = hexstr[4:] # flip order of words # value = ''.join([low, high]) value = low + high else: value = hexstr value = value.decode("utf8") return self._execute_request( [WRITE_MULTIPLE_FUNCTION_CODE, data_address, n, nbytes, value], response_type, **kw ) def set_single_register(self, rid, value, response_type, **kw): """ """ register_addr = "{:04X}".format(int(rid)) value = "{:04X}".format(int(value)) return self._execute_request( [WRITE_FUNCTION_CODE, register_addr, value], response_type, **kw ) def read_holding_register(self, holdid, nregisters, response_type, **kw): """ """ data_address = "{:04X}".format(holdid) n = "{:04X}".format(nregisters) return self._execute_request( [READ_FUNCTION_CODE, data_address, n], response_type, **kw )
# def read_input_status(self, inputid, ninputs): # ''' # ''' # func_code = '02' # data_address = '{:04X}'.format(inputid - 10001) # n = '{04x}'.format(ninputs) # return self._execute_request([func_code, data_address, n]) # ============= EOF =====================================