Source code for watlow

import struct
from binascii import hexlify
from binascii import unhexlify

import crcmod
import serial as ser


[docs]class Watlow(): ''' Object representing a Watlow PID temperature controller. This class facilitates the generation and parsing of BACnet TP/MS messages to and from Watlow temperature controllers. * **serial**: serial object (see pySerial's serial.Serial class) or `None` * **port** (str): string representing the serial port or `None` * **timeout** (float): Read timeout value in seconds * **address** (int): Watlow controller address (found in the setup menu). Acceptable values are 1 through 16. `timeout` and `port` are not necessary if a serial object was already passed with those arguments. The baudrate for Watlow temperature controllers is 38400 and hardcoded. ''' def __init__(self, serial=None, port=None, timeout=0.5, address=1): self.timeout = timeout self.baudrate = 38400 self.address = address if serial: self.port = serial.port self.serial = serial else: self.port = port self.open() def open(self): self.serial = ser.Serial(self.port, self.baudrate, timeout=self.timeout) def close(self): self.serial.flush() self.serial.close() def _headerCheckByte(self, headerBytes): ''' Takes the full header byte array bytes[0] through bytes[6] of the full command and returns a check byte (bytearray of length one) using Watlow's algorithm. Implementation relies on this post: https://reverseengineering.stackexchange.com/questions/8303/rs-485-checksum-reverse-engineering-watlow-ez-zone-pm ''' crc_8_table = [ 0x00, 0xfe, 0xff, 0x01, 0xfd, 0x03, 0x02, 0xfc, 0xf9, 0x07, 0x06, 0xf8, 0x04, 0xfa, 0xfb, 0x05, 0xf1, 0x0f, 0x0e, 0xf0, 0x0c, 0xf2, 0xf3, 0x0d, 0x08, 0xf6, 0xf7, 0x09, 0xf5, 0x0b, 0x0a, 0xf4, 0xe1, 0x1f, 0x1e, 0xe0, 0x1c, 0xe2, 0xe3, 0x1d, 0x18, 0xe6, 0xe7, 0x19, 0xe5, 0x1b, 0x1a, 0xe4, 0x10, 0xee, 0xef, 0x11, 0xed, 0x13, 0x12, 0xec, 0xe9, 0x17, 0x16, 0xe8, 0x14, 0xea, 0xeb, 0x15, 0xc1, 0x3f, 0x3e, 0xc0, 0x3c, 0xc2, 0xc3, 0x3d, 0x38, 0xc6, 0xc7, 0x39, 0xc5, 0x3b, 0x3a, 0xc4, 0x30, 0xce, 0xcf, 0x31, 0xcd, 0x33, 0x32, 0xcc, 0xc9, 0x37, 0x36, 0xc8, 0x34, 0xca, 0xcb, 0x35, 0x20, 0xde, 0xdf, 0x21, 0xdd, 0x23, 0x22, 0xdc, 0xd9, 0x27, 0x26, 0xd8, 0x24, 0xda, 0xdb, 0x25, 0xd1, 0x2f, 0x2e, 0xd0, 0x2c, 0xd2, 0xd3, 0x2d, 0x28, 0xd6, 0xd7, 0x29, 0xd5, 0x2b, 0x2a, 0xd4, 0x81, 0x7f, 0x7e, 0x80, 0x7c, 0x82, 0x83, 0x7d, 0x78, 0x86, 0x87, 0x79, 0x85, 0x7b, 0x7a, 0x84, 0x70, 0x8e, 0x8f, 0x71, 0x8d, 0x73, 0x72, 0x8c, 0x89, 0x77, 0x76, 0x88, 0x74, 0x8a, 0x8b, 0x75, 0x60, 0x9e, 0x9f, 0x61, 0x9d, 0x63, 0x62, 0x9c, 0x99, 0x67, 0x66, 0x98, 0x64, 0x9a, 0x9b, 0x65, 0x91, 0x6f, 0x6e, 0x90, 0x6c, 0x92, 0x93, 0x6d, 0x68, 0x96, 0x97, 0x69, 0x95, 0x6b, 0x6a, 0x94, 0x40, 0xbe, 0xbf, 0x41, 0xbd, 0x43, 0x42, 0xbc, 0xb9, 0x47, 0x46, 0xb8, 0x44, 0xba, 0xbb, 0x45, 0xb1, 0x4f, 0x4e, 0xb0, 0x4c, 0xb2, 0xb3, 0x4d, 0x48, 0xb6, 0xb7, 0x49, 0xb5, 0x4b, 0x4a, 0xb4, 0xa1, 0x5f, 0x5e, 0xa0, 0x5c, 0xa2, 0xa3, 0x5d, 0x58, 0xa6, 0xa7, 0x59, 0xa5, 0x5b, 0x5a, 0xa4, 0x50, 0xae, 0xaf, 0x51, 0xad, 0x53, 0x52, 0xac, 0xa9, 0x57, 0x56, 0xa8, 0x54, 0xaa, 0xab, 0x55 ] # Watlow's header check byte algorithm: intCheck = ~crc_8_table[headerBytes[6] ^ crc_8_table[headerBytes[5] ^ crc_8_table[headerBytes[4] ^ crc_8_table[headerBytes[3] ^ crc_8_table[~headerBytes[2]]]]]] & (2**8-1) return bytes([intCheck]) def _dataCheckByte(self, dataBytes): ''' Takes the full data byte array, bytes[8] through bytes[13] of the full command and calculates the data check byte using BacNET CRC-16. ''' # CRC-16 with 0xFFFF as initial value, 0x1021 as polynomial, bit reversed crc_fun = crcmod.mkCrcFun(poly=0x11021, initCrc=0, rev=True, xorOut=0xFFFF) # bytes object packed using C-type unsigned short, little-endian: byte_str = struct.pack('<H', crc_fun(dataBytes)) return byte_str def _intDataParamToHex(self, dataParam): # Reformats data param from notation in the manual to hex string # (e.g. '4001' becomes '04' and '001', returned as '0401') dataParam = format(int(dataParam), '05d') dataParam = hexlify(int(dataParam[:2]).to_bytes(1, 'big') + int(dataParam[2:]).to_bytes(1, 'big')).decode('utf-8') return dataParam def _byteDataParamToInt(self, hexParam): # Reformats data parameter from bytes string to integer # (e.g. b'\x1a\x1d' to 26029) return int(str(hexParam[0]) + format(hexParam[1], '03d')) def _buildZone(self, address): ''' Generates the zone portion of a message from the watlow address. Returns the hexidecimal form as a string (without "0x") ''' return format(address + 15, 'x') def _buildReadRequest(self, dataParam, instance='01'): ''' Takes the watlow parameter ID, converts to bytes objects, calls internal functions to calc check bytes, and assembles/returns the request byte array. ''' # Request Header: BACnetPreamble = '55ff' requestParam = '05' # Zone corresponds to the address parameter in setup (e.g. '10' = 1, '11' = 2, etc.) zone = self._buildZone(self.address) additionalHeader = '000006' hexHeader = BACnetPreamble + requestParam + zone + additionalHeader # Request Data Parameters additionalData = '010301' dataParam = self._intDataParamToHex(dataParam) hexData = additionalData + dataParam + instance # Convert input strings to bytes: hexHeader = unhexlify(hexHeader) hexData = unhexlify(hexData) # Calculate check bytes: headerChk = self._headerCheckByte(hexHeader) dataChk = self._dataCheckByte(hexData) # Assemble request byte array: request = bytearray(hexHeader) request += bytearray(headerChk) request += bytearray(hexData) request += dataChk return request def _buildWriteRequest(self, dataParam, value, data_type, instance='01'): ''' Takes the set point temperature value, converts to bytes objects, calls internal functions to calc check bytes, and assembles/returns the request byte array. Much of this function is hard coded until I figure out how each part of the hex command is assembled. It is different than a normal read command. ''' BACnetPreamble = '55ff' requestParam = '05' zone = self._buildZone(self.address) dataParam = self._intDataParamToHex(dataParam) if data_type == float: additionalHeader = '00000a' hexData = '0104' + dataParam + instance + '08' value = struct.pack('>f', float(value)) elif data_type == int: additionalHeader = '030009' hexData = '0104' + dataParam + instance + '0f01' value = value.to_bytes(2, 'big') # Request Header String: hexHeader = BACnetPreamble + requestParam + zone + additionalHeader # Convert input strings to bytes: hexHeader = unhexlify(hexHeader) # Data portion of request (here the set point value is appended) hexData = unhexlify(hexData) + value # Calculate check bytes: headerChk = self._headerCheckByte(hexHeader) dataChk = self._dataCheckByte(hexData) # Assemble request byte array: request = bytearray(hexHeader) request += bytearray(headerChk) request += bytearray(hexData) request += dataChk return request def _validateResponse(self, bytesResponse): ''' Compares check bytes received in response to those calculated. ''' isValid = False # Evaluate headerChk as bytearray instead of as an int (which is how # python will interpret a single hex character) headerChkReceived = bytearray([bytesResponse[7]]) dataCheckRecieved = bytesResponse[-2:] addressReceived = bytesResponse[4] - 15 if (headerChkReceived == self._headerCheckByte(bytesResponse[0:7]) and dataCheckRecieved == self._dataCheckByte(bytesResponse[8:-2]) and addressReceived == self.address): isValid = True return isValid def _parseResponse(self, bytesResponse): ''' Takes the full response byte array and extracts the relevant data (e.g. current temperature), constructs response dict, and returns it. ''' output = { 'address': self.address, 'param': None, 'data': None, 'error': None } try: if bytesResponse == b'' or bytesResponse == bytearray(len(bytesResponse)): raise Exception('Exception: No response from address {0}'.format(self.address)) if not self._validateResponse(bytesResponse): print('Invalid Response at address {0}: '.format(self.address), hexlify(bytesResponse)) raise Exception('Exception: Invalid response received from address {0}'.format(self.address)) except Exception as e: output['error'] = e return output else: # Case where response data value is an int used to represent a state defined # in the manual (e.g. param 8003, heat algorithm, where 62 means 'PID') # from a read request # Hex byte 7: '0a', Hex bytes 15, 16: 0F, 01 if bytesResponse[6] == 10 and bytesResponse[-6] == 15 and bytesResponse[-5] == 1: data = bytesResponse[-4:-2] output['param'] = self._byteDataParamToInt(bytesResponse[11:13]) output['data'] = int.from_bytes(data, byteorder='big') # Case where response data value is a float from a set param request # (e.g. 7001, process value setpoint) # Hex byte 7: '0a', Hex byte 14: '08' elif bytesResponse[6] == 10 and bytesResponse[-7] == 8: ieee_754 = hexlify(bytesResponse[-6:-2]) output['data'] = struct.unpack('>f', unhexlify(ieee_754))[0] output['param'] = self._byteDataParamToInt(bytesResponse[10:12]) # Case where response data value is an integer from a set param # request (e.g. param 8003, heat algorithm, where 62 means 'PID') # Hex byte 7: '09' elif bytesResponse[6] == 9: data = bytesResponse[-4:-2] output['param'] = self._byteDataParamToInt(bytesResponse[10:12]) output['data'] = int.from_bytes(data, byteorder='big') # Case where data value is a float representing a process value # (e.g. 4001, where current temp of 50.0 is returned) # Hex byte 7: '0b' elif bytesResponse[6] == 11: ieee_754 = bytesResponse[-6:-2] output['param'] = self._byteDataParamToInt(bytesResponse[11:13]) output['data'] = struct.unpack('>f', ieee_754)[0] # Other cases, such as response from trying to write a read-only parameter: else: output['error'] = Exception('Received a message that could not be parsed from address {0}'.format(self.address)) return output
[docs] def read(self, instance='01'): ''' Reads the current temperature. This is a wrapper around `readParam()` and is equivalent to `readParam(4001, float, instance)`. Returns a dict containing the response data, parameter ID, and address. * **instance**: a two digit string corresponding to the channel to read (e.g. '01', '05') ''' return self.readParam(4001, float, instance)
[docs] def readSetpoint(self, instance='01'): ''' Reads the current setpoint. This is a wrapper around `readParam()` and is equivalent to `readParam(7001, float, instance)`. Returns a dict containing the response data, parameter ID, and address. * **instance**: a two digit string corresponding to the channel to read (e.g. '01', '05') ''' return self.readParam(7001, float, instance='01')
[docs] def readParam(self, param, data_type, instance='01'): ''' Takes a parameter and writes data to the watlow controller at object's internal address. Using this function requires knowing the data type for the parameter (int or float). See the Watlow `user manual <https://www.watlow.com/-/media/documents/user-manuals/pm-pid-1.ashx>`_ for individual parameters and the Usage section of these docs. * **param**: a four digit integer corresponding to a Watlow parameter (e.g. 4001, 7001) * **data_type**: the Python type representing the data value type (i.e. `int` or `float`) * **instance**: a two digit string corresponding to the channel to read (e.g. '01', '05') `data_type` is used to determine how many characters to read following the controller's response. If `int` is passed when the data type should be `float`, it will not read the entire message and will throw an error. If `float` is passed when it should be `int`, it will timeout, possibly reading correctly. If multiple instances of `Watlow()` are using the same serial port for different controllers it will read too many characters. It is best to be completely sure which data type is being used by each parameter (`int` or `float`). Returns a dict containing the response data, parameter ID, and address. ''' request = self._buildReadRequest(param, instance) try: self.serial.write(request) except Exception as e: print('Exception: ', e) else: if data_type == float: response = self.serial.read(21) elif data_type == int: response = self.serial.read(20) output = self._parseResponse(response) return output
[docs] def write(self, value, instance='01'): ''' Changes the watlow temperature setpoint. Takes a value (in degrees F by default), builds request, writes to watlow, receives and returns response object. * **value**: an int or float representing the new target setpoint in degrees F by default * **instance**: a two digit string corresponding to the channel to set (e.g. '01', '05') This is a wrapper around `writeParam()` and is equivalent to `writeParam(7001, value, float, instance)`. Returns a dict containing the response data, parameter ID, and address. ''' return self.writeParam(7001, value, float, instance)
[docs] def writeParam(self, param, value, data_type, instance='01'): ''' Changes the value of the passed watlow parameter ID. Using this function requires knowing the data type for the parameter (int or float). See the Watlow `user manual <https://www.watlow.com/-/media/documents/user-manuals/pm-pid-1.ashx>`_ for individual parameters and the Usage section of these docs. * **value**: an int or float representing the new target setpoint in degrees F by default * **data_type**: the Python type representing the data value type (i.e. `int` or `float`) * **instance**: a two digit string corresponding to the channel to read (e.g. '01', '05') `data_type` is used to determine how the BACnet TP/MS message will be constructed and how many serial characters to read following the controller's response. Returns a dict containing the response data, parameter ID, and address. ''' request = self._buildWriteRequest(param, value, data_type, instance) try: self.serial.write(request) except Exception as e: print('Exception: ', e) else: if data_type == float: bytesResponse = self.serial.read(20) elif data_type == int: bytesResponse = self.serial.read(19) output = self._parseResponse(bytesResponse) return output