Source code for xcomcan.node

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from struct import pack, unpack
from threading import Condition
from stucancommon.node import Service, CanNode, Timeout
from .addresses import RCC_GROUP_DEVICE_ID

logger = logging.getLogger(__name__)


[docs]class Request(Service): """ Base class for a request service that have a condition to wait until notification from another thread """ cv = Condition()
[docs]class Response(Service): """ Base class for a response service """
[docs] def handle(self, source_address, destination_address, exception=None): """ Method override, handle response from a previously sended Service. It will handle response object and notify request service on success otherwise forward exception """ with self.request_class.cv: if exception is None: self.request_class.response = self else: self.request_class.response = exception self.request_class.response_count -= 1 if self.request_class.response_count == 0: self.request_class.cv.notify()
[docs]class ReadUserInfoRequest(Request): """ Read User Info Service request inherits from Request """ SERVICE_ID = 0x0 """ const int : Service identifier """ PACK_FORMAT = '>H' """ const string : Format to generate a byte-string representation of the object """ def __init__(self, *args): self.info_id = args[0] def __bytes__(self): return pack(self.PACK_FORMAT, self.info_id)
[docs]class ReadUserInfoResponse(Response): """ Read User Info Service response inherits from Response """ SERVICE_ID = 0x0 """ const int : Service identifier """ PACK_FORMAT = '>Hf' """ const string : Format to generate a byte-string representation of the object """ request_class = ReadUserInfoRequest """ Request service object """ def __init__(self, *args): self.info_id, self.value = args[:2] def __bytes__(self): return pack(self.PACK_FORMAT, self.info_id, self.value)
[docs]class WriteParameterRequest(Request): """ Write Parameter Service request inherits from Request """ SERVICE_ID = 0x2 """ const int : Service identifier """ PACK_FORMAT = '>HBf' """ const string : Format to generate a byte-string representation of the object """ def __init__(self, *args): self.parameter_id, self.part, self.value = args[:3] def __bytes__(self): return pack(self.PACK_FORMAT, self.parameter_id, self.part, self.value)
[docs]class WriteParameterResponse(Response): """ Write Parameter Service response inherits from Response """ SERVICE_ID = 0x2 """ const int : Service identifier """ PACK_FORMAT = '>HBf' """ const string : Format to generate a byte-string representation of the object """ request_class = WriteParameterRequest """ Request service object """ def __init__(self, *args): self.parameter_id, self.part, self.value = args[:3] def __bytes__(self): return pack(self.PACK_FORMAT, self.parameter_id, self.part, self.value)
[docs]class ReadParameterRequest(Request): """ Read Parameter Service request inherits from Request """ SERVICE_ID = 0x1 """ const int : Service identifier """ PACK_FORMAT = '>HB' """ const string : Format to generate a byte-string representation of the object """ def __init__(self, *args): self.parameter_id, self.part = args[:2] def __bytes__(self): return pack(self.PACK_FORMAT, self.parameter_id, self.part)
[docs]class ReadParameterResponse(Response): """ Read Parameter Service response inherits from Response """ SERVICE_ID = 0x1 """ const int : Service identifier """ PACK_FORMAT = '>HBf' """ const string : Format to generate a byte-string representation of the object """ request_class = ReadParameterRequest """ Request service object """ def __init__(self, *args): self.parameter_id, self.part, self.value = args[:3] def __bytes__(self): return pack(self.PACK_FORMAT, self.parameter_id, self.part, self.value)
[docs]class MessageNotification(Response): """ Message Notification Service inherits from Response """ SERVICE_ID = 0x3 """ const int : Service identifier """ PACK_FORMAT = '>HI' """ const string : Format to generate a byte-string representation of the object """ messages = [] """ list : List of messages """ def __init__(self, *args): self.message_id, self.value = args[:2] def __bytes__(self): return pack(self.PACK_FORMAT, self.message_id, self.value)
[docs] def handle(self, source_address, destination_address, exception=None): self.messages.append((source_address, self))
error_identifier_dictionary = { 0x01: 'INVALID_FRAME', 0x02: 'DEVICE_NOT_FOUND', 0x03: 'RESPONSE_TIMEOUT', 0x12: 'INVALID_SERVICE_ARGUMENT', 0x13: 'GATEWAY_BUSY', 0x22: 'OBJECT_ID_NOT_FOUND', 0x24: 'INVALID_DATA_LENGTH', 0x25: 'PROPERTY_IS_READ_ONLY', 0x26: 'INVALID_DATA', 0x27: 'DATA_TOO_SMALL', 0x28: 'DATA_TOO_BIG', 0x29: 'WRITE_PROPERTY_FAILED', 0x2A: 'READ_PROPERTY_FAILED', 0x2B: 'ACCESS_DENIED', 0x2D: 'MULTICAST_READ_NOT_SUPPORTED', } """ Those error codes applies for "User Info read service" and "Parameter read/write service". They are coded in the response in case of a request failure. """
[docs]class StuCanPublicError(Exception): """ Class representing a StuCan2 error, also can generate a string representation of it """
[docs] def __init__(self, id, error_code): """ id : int Service identifier error_code : int Error code number """ self.id = id self.error_code = error_code self.identifier = error_identifier_dictionary.get(error_code, 'UNKNOWN')
def __str__(self): return 'StuCanPublicError(id={}, error_code={}, identifier={})'.format(self.id, self.error_code, self.identifier)
[docs]class StuCanPublicNode(CanNode): """ Class representing a StuCan public node, inherits from `CanNode` """
[docs] def __init__(self, driver, address, debug=False): """ Initialize CanNode driver : PythonCanDriver Relying can interface address : int Node CAN address """ CanNode.__init__(self, driver, address) if debug is True: logging.basicConfig(level=logging.DEBUG)
[docs] def handle_rx_frame(self, identifier, data, dlc, flag, time): """ Handle a frame received on the CAN bus Manage behavior as described into StuCan2 public protocol Parameters ---------- identifier : bytes CAN frame id data : bytes CAN frame data """ destination_address = (identifier >> 19) & 0x3FF if not (destination_address in (self.address, RCC_GROUP_DEVICE_ID)): return source_address = (identifier >> 9) & 0x3FF service_id = (identifier >> 6) & 0x7 flags = identifier & 0x3F error = flags & 0x1 for service_class in self.services: if service_id == service_class.SERVICE_ID: if error == 1: id, error_code = unpack('>HI', data) exception = StuCanPublicError(id, error_code) logger.debug('<- rx: %s from address %d to %d', repr(exception), source_address, destination_address) service = service_class(None, None, None) response = service.handle(source_address, destination_address, exception) else: service = service_class.from_bytes(data) logger.debug('<- rx: %s from address %d to %d', str(service), source_address, destination_address) response = service.handle(source_address, destination_address) if response is not None: self.send_service(source_address, response)
[docs] def send_from(self, service_id, destination_address, source_address, data): """ Create CAN identifier and access underlying driver to send it and relevant data Parameters ---------- service_id : int StuCan2 service identifier destination_address : int Targeted device address source_address : int Source address data : bytes The data parameter of a CAN message, length from 0 to 8 bytes """ assert 0 <= service_id <= 0x7 assert 0 <= destination_address <= 0x3FF assert 0 <= source_address <= 0x3FF identifier = (destination_address << 19) + (source_address << 9) + (service_id << 6) self.driver.send(identifier, data, is_extended_id=True)
[docs] def send(self, service_id, destination_address, data): """ Forward data to send by adding source address Parameters ---------- service_id : int StuCan2 service identifier destination_address : int Targeted device address data : bytes The data parameter of a CAN message """ self.send_from(service_id, destination_address, self.address, data)
[docs] def send_service(self, address, service): """ Send a service Parameters ---------- address : int Targeted device address service : Service Service object """ logger.debug('-> tx: %s to address %d', str(service), address) data = bytes(service) assert len(data) <= 8 self.send(service.SERVICE_ID, address, data)
[docs] def wait_response(self, address, request, timeout=None): """ Entry point to send a service and then wait for the service response, can raise a timeout exception a StuCanPublicError or the response when successfull Parameters ---------- address : int Targeted device address request : Request Request service object Returns ------- Response Response object of the service """ request_class = type(request) request_class.response_count = 1 with request_class.cv: self.send_service(address, request) return_value = request_class.cv.wait(timeout) if not return_value: raise Timeout() if isinstance(request_class.response, StuCanPublicError): raise request_class.response return request_class.response
[docs] def messages(self): """ Retreive the list of messages previously happened on the CAN bus Returns ------- list Notifications messages """ return MessageNotification.messages