Source code for multiwii.messaging

from ._command import _MspCommand

from typing import Final, NamedTuple
from struct import pack, unpack

MESSAGE_ERROR_HEADER: Final[bytes] = b'$M!'
"""bytes: The serialized error message header. (0x24, 0x4d, 0x21)"""

MESSAGE_INCOMING_HEADER: Final[bytes] = b'$M<'
"""bytes: The serialized incoming message header. (0x24, 0x4d, 0x3c)"""

MESSAGE_OUTGOING_HEADER: Final[bytes] = b'$M>'
"""bytes: The serialized outgoing message header. (0x24, 0x4d, 0x3e)"""

class _MspResponseMessage(NamedTuple):
    """
    Represents a tuple with the data size and values for a received MSP message.

    Attributes
    ----------
    command : _MspCommand
        An instance of `_MspCommand` of the targeted MSP command.
    data : tuple[int]
        A tuple of parsed data values.
    data_size : int
        The size of the unserialized data values.
    """
    command: _MspCommand

    data: tuple[int]

    data_size: int

[docs] class MspMessageError(Exception): """Represents a specific errors related to MSP messages.""" pass
def _crc8_xor(payload: bytes) -> int: """ Calculates the checksum for the payload using an XOR CRC. Parameters ---------- payload : bytes Bytes of the full payload (including the command code and size). Returns ------- int The checksum for the provided payload. """ checksum = 0 for byte in payload: checksum ^= byte return checksum & 0xff def _create_request_message(command: _MspCommand, data: tuple[int]) -> bytes: """ Constructs a serialized message for a provided command and data values. Attributes ---------- command : _MspCommand An instance of `_MspCommand` representing the MSP command used to create the message. data : tuple[int] The data values to serialize and include in the payload. Returns ------- bytes The full message in bytes. """ data_size = 0 payload_content = bytes() if data: if command.has_variable_size: data_size = len(data) / command.data_field_count else: data_size = command.data_size payload_content = pack(command.data_struct_format, *data) payload_header = pack('<2B', data_size, command.code) payload = payload_header + payload_content checksum = pack('<B', _crc8_xor(payload)) return MESSAGE_OUTGOING_HEADER + payload + checksum def _decode_names(data: tuple) -> tuple[str]: """ Decodes the deserialized string value and splits it to a tuple. Parameters ---------- data : tuple The `struct` unpacked payload tuple to decode. Returns ------- tuple[str] A tuple of decoded names. """ return tuple(data[0].decode('ascii').split(';')) def _parse_response_message(command: _MspCommand, payload: bytes) -> _MspResponseMessage: """ Parses the payload of a response message for a given command. Attributes ---------- command : _MspCommand An instance of `_MspCommand` representing the MSP command for the response message. payload : bytes The received payload buffer from a response message. Raises ------ ValueError If the command code in the payload does not match the code of the provided command. Returns ------- _MspResponseMessage A named tuple with the command, parsed data and additional information. """ command_code = payload[1] if command_code != command.code: raise ValueError( 'Payload with an invalid command code detected. ({}, {})'.format( command.code, command_code ) ) data = unpack(command.data_struct_format, payload[2:]) data_size = payload[0] return _MspResponseMessage(command, data, data_size)