#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# htheatpump - Serial communication module for Heliotherm heat pumps
# Copyright (C) 2022 Daniel Strigl
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
""" Protocol constants and functions for the Heliotherm heat pump communication. """
# ------------------------------------------------------------------------------------------------------------------- #
# Protocol constants
# ------------------------------------------------------------------------------------------------------------------- #
MAX_CMD_LENGTH = 253 # 253 = 255 - 1 byte for header - 1 byte for trailer
REQUEST_HEADER = b"\x02\xfd\xd0\xe0\x00\x00"
RESPONSE_HEADER_LEN = 6 # response header length
RESPONSE_HEADER = {
#
# NOTE:
# =====
# It seems that there is some inconsistency in the way how the heat pump replies to requests.
# Depending on the received header (the first 6 bytes) we have to correct the payload length for
# the checksum computation, so that the received checksum fits with the computed one.
# Additionally, for some replies the checksum seems to be totally ignored, because the received
# checksum is always zero (0x0), regardless of the content.
#
# This behavior will be handled in the following lines. See also function HtHeatpump.read_response().
# normal response header with answer
b"\x02\xfd\xe0\xd0\x00\x00": {
"payload_len": lambda payload_len: payload_len, # no payload length correction necessary
# method to calculate the checksum of the response:
"checksum": lambda header, payload_len, payload: calc_checksum(
header + bytes([payload_len]) + payload
),
},
# response header for some of the "MR" command (HtHeatpump.fast_query) answers
# for this kind of answers the payload length must be corrected (for the checksum computation)
# so that the received checksum fits with the computed one
# observed on: HP08S10W-WEB, SW 3.0.20
b"\x02\xfd\xe0\xd0\x01\x00": {
"payload_len": lambda payload_len: payload_len - 1, # payload length correction
# method to calculate the checksum of the response:
"checksum": lambda header, payload_len, payload: calc_checksum(
header + bytes([payload_len]) + payload
),
},
# response header with answer
# for error messages (e.g. "ERR,INVALID IDX") and some "MR" command (HtHeatpump.fast_query) answers
# for this kind of answers the payload length must be corrected (for the checksum computation)
# so that the received checksum fits with the computed one
# observed on: HP08S10W-WEB, SW 3.0.20
b"\x02\xfd\xe0\xd0\x02\x00": {
"payload_len": lambda payload_len: payload_len - 2, # payload length correction
# method to calculate the checksum of the response:
"checksum": lambda header, payload_len, payload: calc_checksum(
header + bytes([payload_len]) + payload
),
},
# response header with answer
# when receiving an answer from the heat pump with this header the checksum is always 0x0 (don't ask me why!)
# observed on: HP08S10W-WEB, SW 3.0.20 for parameter requests ("SP"/"MP" commands)
b"\x02\xfd\xe0\xd0\x04\x00": {
"payload_len": lambda payload_len: payload_len, # no payload length correction necessary
# we don't know why, but for this kind of responses the checksum is always 0x0:
"checksum": lambda header, payload_len, payload: 0x00,
},
# response header with answer
# when receiving an answer from the heat pump with this header the checksum is always 0x0 (don't ask me why!)
# observed on: HP10S12W-WEB, SW 3.0.8 for parameter requests ("SP"/"MP" commands)
b"\x02\xfd\xe0\xd0\x08\x00": {
"payload_len": lambda payload_len: payload_len, # no payload length correction necessary
# we don't know why, but for this kind of responses the checksum is always 0x0:
"checksum": lambda header, payload_len, payload: 0x00,
},
}
# special commands of the heat pump:
# ----------------------------------
#
LOGIN_CMD = r"LIN" # login command
LOGIN_RESP = r"^OK"
LOGOUT_CMD = r"LOUT" # logout command
LOGOUT_RESP = r"^OK"
RID_CMD = r"RID" # query for the manufacturer's serial number
RID_RESP = r"^RID,(\d+)$" # e.g. '~RID,123456;\r\n'
VERSION_CMD = r"SP,NR=9" # query for the software version of the heat pump
VERSION_RESP = r"^SP,NR=9,.*NAME=([^,]+).*VAL=([^,]+).*$" # e.g. 'SP,NR=9,ID=9,NAME=3.0.20,...,VAL=2321,...'
CLK_CMD = (
r"CLK", # get/set the current date and time of the heat pump
r"CLK,DA={:02d}.{:02d}.{:02d},TI={:02d}:{:02d}:{:02d},WD={:d}",
)
CLK_RESP = (
r"^CLK" # answer for the current date and time of the heat pump
r",DA=(3[0-1]|[1-2]\d|0[1-9])\.(1[0-2]|0[1-9])\.(\d\d)" # date, e.g. '26.11.15'
r",TI=([0-1]\d|2[0-3]):([0-5]\d):([0-5]\d)" # time, e.g. '21:28:57'
r",WD=([1-7])$"
) # weekday 1-7 (Monday through Sunday)
ALC_CMD = r"ALC" # query for the last fault message of the heat pump
ALC_RESP = (
r"^AA,(\d+),(\d+)" # fault list index and error code (?)
r",(3[0-1]|[1-2]\d|0[1-9])\.(1[0-2]|0[1-9])\.(\d\d)" # date, e.g. '14.09.14'
r"-([0-1]\d|2[0-3]):([0-5]\d):([0-5]\d)" # time, e.g. '11:52:08'
r",(.*)$"
) # error message, e.g. 'EQ_Spreizung'
ALS_CMD = r"ALS" # query for the fault list size of the heat pump
ALS_RESP = r"^SUM=(\d+)$" # e.g. 'SUM=2757'
AR_CMD = r"AR" # query for specific entries of the fault list
AR_RESP = (
r"^AA,(\d+),(\d+)" # fault list index and error code (?)
r",(3[0-1]|[1-2]\d|0[1-9])\.(1[0-2]|0[1-9])\.(\d\d)" # date, e.g. '14.09.14'
r"-([0-1]\d|2[0-3]):([0-5]\d):([0-5]\d)" # time, e.g. '11:52:08'
r",(.*)$"
) # error message, e.g. 'EQ_Spreizung'
MR_CMD = r"MR" # fast query for several MP data point values
MR_RESP = r"^MA,(\d+),([^,]+),(\d+)$" # MP data point number, value and ?; e.g. 'MA,0,-3.4,17'
PRL_CMD = r"PRL" # query for the time programs of the heat pump
PRL_RESP = (
r"^SUM=(\d+)$", # e.g. 'SUM=5'
r"^PRI{:d},.*NAME=([^,]+).*EAD=([^,]+).*NOS=([^,]+).*STE=([^,]+).*NOD=([^,]+).*$",
) # e.g. 'PRI0,...'
PRI_CMD = r"PRI{:d}" # query for a specific time program of the heat pump
PRI_RESP = r"^PRI{:d},.*NAME=([^,]+).*EAD=([^,]+).*NOS=([^,]+).*STE=([^,]+).*NOD=([^,]+).*$" # e.g. 'PRI2,NAME=..'
PRD_CMD = (
r"PRD{:d}" # query for the entries of a specific time program of the heat pump
)
PRD_RESP = (
r"^PRI{:d},*NAME=([^,]+).*EAD=([^,]+).*NOS=([^,]+).*STE=([^,]+).*NOD=([^,]+).*$", # e.g. 'PRI0,...'
r"^PRE,.*PR={:d},.*DAY={:d},.*EV={:d},.*ST=(\d+)," # e.g. 'PRE,PR=0,DAY=3,EV=1,ST=1,...'
r".*BEG=(\d?\d:\d?\d),.*END=(\d?\d:\d?\d).*$",
) # '...BEG=03:30,END=22:00'
PRE_CMD = (
r"PRE,PR={:d},DAY={:d},EV={:d}", # get/set a specific time program entry of the heat pump
r"PRE,PR={:d},DAY={:d},EV={:d},ST={:d},BEG={},END={}",
)
PRE_RESP = (
r"^PRE,.*PR={:d},.*DAY={:d},.*EV={:d},.*ST=(\d+)," # e.g. 'PRE,PR=2,DAY=5,EV=4,ST=1,...'
r".*BEG=(\d?\d:\d?\d),.*END=(\d?\d:\d?\d).*$"
) # '...BEG=13:30,END=14:45'
# ------------------------------------------------------------------------------------------------------------------- #
# Protocol functions
# ------------------------------------------------------------------------------------------------------------------- #
[docs]def calc_checksum(s: bytes) -> int:
"""Function that calculates the checksum of a provided bytes array.
:param s: Byte array from which the checksum should be computed.
:type s: bytes
:returns: The computed checksum as ``int``.
:rtype: ``int``
"""
assert isinstance(s, bytes)
checksum = 0x0
for i in range(len(s)):
databyte = s[i]
checksum ^= databyte
databyte = (databyte << 1) & 0xFF
checksum ^= databyte
return checksum
[docs]def verify_checksum(s: bytes) -> bool:
"""Verify if the provided bytes array is terminated with a valid checksum.
:param s: The byte array including the checksum.
:type s: bytes
:returns: :const:`True` if valid, :const:`False` otherwise.
:rtype: ``bool``
:raises ValueError:
Will be raised for an invalid byte array with length less than 2 bytes.
"""
assert isinstance(s, bytes)
if len(s) < 2:
raise ValueError(
"the provided array of bytes needs to be at least 2 bytes long"
)
return (
calc_checksum(s[:-1]) == s[-1]
) # is the last byte of the array the correct checksum?
[docs]def add_checksum(s: bytes) -> bytes:
"""Add a checksum at the end of the provided bytes array.
:param s: The provided byte array.
:type s: bytes
:returns: Byte array with the added checksum.
:rtype: ``bytes``
:raises ValueError:
Will be raised for an invalid byte array with length less than 1 byte.
"""
assert isinstance(s, bytes)
if len(s) < 1:
raise ValueError("the provided array of bytes needs to be at least 1 byte long")
return s + bytes(
[calc_checksum(s)]
) # append the checksum at the end of the bytes array
[docs]def create_request(cmd: str) -> bytes:
"""Create a specified request command for the heat pump.
:param cmd: The command string.
:type cmd: str
:returns: The request string for the specified command as byte array.
:rtype: ``bytes``
:raises ValueError:
Will be raised for an invalid byte array with length greater than 253 byte.
"""
assert isinstance(cmd, str)
if len(cmd) > MAX_CMD_LENGTH:
raise ValueError("command must be lesser than 254 characters")
cmd = "~" + cmd + ";" # add header '~' and trailer ';'
return add_checksum(REQUEST_HEADER + bytes([len(cmd)]) + cmd.encode("ascii"))