diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index 608024f3..3db13805 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -25,7 +25,7 @@ def import_eds(source, node_id): eds = RawConfigParser(inline_comment_prefixes=(';',)) - eds.optionxform = str + eds.optionxform = lambda optionstr: str(optionstr) # type: ignore[method-assign] opened_here = False try: if hasattr(source, "read"): @@ -80,16 +80,13 @@ def import_eds(source, node_id): (bool, "LSS_Supported", "LSS_supported"), ]: try: - if t in (int, bool): - setattr(od.device_information, odprop, - t(int(eds.get("DeviceInfo", eprop), 0)) - ) - elif t is str: - setattr(od.device_information, odprop, - eds.get("DeviceInfo", eprop) - ) + raw_string = eds.get("DeviceInfo", eprop) except NoOptionError: - pass + continue + if t in (int, bool): + setattr(od.device_information, odprop, t(int(raw_string, 0))) + elif t is str: + setattr(od.device_information, odprop, raw_string) if eds.has_section("DeviceComissioning"): if val := eds.getint("DeviceComissioning", "Baudrate", fallback=None): @@ -100,10 +97,12 @@ def import_eds(source, node_id): node_id = int(val, base=0) od.node_id = node_id + DUMMY_SECTION_REGEX = re.compile(r"^[Dd]ummy[Uu]sage$") + INDEX_SECTION_REGEX = re.compile(r"^[0-9A-Fa-f]{4}$") + SUB_SECTION_REGEX = re.compile(r"^([0-9A-Fa-f]{4})[S|s]ub([0-9A-Fa-f]+)$") + NAME_SECTION_REGEX = re.compile(r"^([0-9A-Fa-f]{4})Name") for section in eds.sections(): - # Match dummy definitions - match = re.match(r"^[Dd]ummy[Uu]sage$", section) - if match is not None: + if DUMMY_SECTION_REGEX.match(section) is not None: for i in range(1, 8): key = f"Dummy{i:04d}" if eds.getint(section, key) == 1: @@ -112,9 +111,7 @@ def import_eds(source, node_id): var.access_type = "const" od.add_object(var) - # Match indexes - match = re.match(r"^[0-9A-Fa-f]{4}$", section) - if match is not None: + if INDEX_SECTION_REGEX.match(section) is not None: index = int(section, 16) name = eds.get(section, "ParameterName") try: @@ -154,11 +151,9 @@ def import_eds(source, node_id): continue - # Match subindexes - match = re.match(r"^([0-9A-Fa-f]{4})[S|s]ub([0-9A-Fa-f]+)$", section) - if match is not None: - index = int(match.group(1), 16) - subindex = int(match.group(2), 16) + if (m := SUB_SECTION_REGEX.match(section)) is not None: + index = int(m.group(1), 16) + subindex = int(m.group(2), 16) entry = od[index] if isinstance(entry, (ODRecord, ODArray)): try: @@ -169,13 +164,19 @@ def import_eds(source, node_id): entry.add_member(var) # Match [index]Name - match = re.match(r"^([0-9A-Fa-f]{4})Name", section) - if match is not None: - index = int(match.group(1), 16) + if (m := NAME_SECTION_REGEX.match(section)) is not None: + index = int(m.group(1), 16) num_of_entries = int(eds.get(section, "NrOfEntries")) entry = od[index] + if not isinstance(entry, ODArray): + logger.warning( + "Compact subobject section %r refers to non-array object 0x%04X", + section, + index, + ) + continue # For CompactSubObj index 1 is were we find the variable - src_var = od[index][1] + src_var = entry[1] for subindex in range(1, num_of_entries + 1): var = copy_variable(eds, section, subindex, src_var) if var is not None: @@ -246,17 +247,15 @@ def _signed_int_from_hex(hex_str, bit_length): def _decode_from_eds(node_id: int, var_type: int, value: Any) -> Any: if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN): return bytes.fromhex(value) - elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING): + if var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING): return value - elif var_type in datatypes.FLOAT_TYPES: + if var_type in datatypes.FLOAT_TYPES: return float(value) - else: - # COB-ID can contain '$NODEID+' so replace this with node_id before converting - value = value.replace(" ", "").upper() - if '$NODEID' in value and node_id is not None: - return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id - else: - return int(value, 0) + # COB-ID can contain '$NODEID+' so replace this with node_id before converting + value = value.replace(" ", "").upper() + if '$NODEID' in value and node_id is not None: + return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id + return int(value, 0) def _encode_to_eds(var_type: int, value: Any) -> Any: @@ -264,12 +263,11 @@ def _encode_to_eds(var_type: int, value: Any) -> Any: return None if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN): return bytes.hex(value) - elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING): + if var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING): return value - elif var_type in datatypes.FLOAT_TYPES: + if var_type in datatypes.FLOAT_TYPES: return value - else: - return f"0x{value:02X}" + return f"0x{value:02X}" _STANDARD_OPTIONS = { @@ -299,12 +297,12 @@ def build_variable( node_id: int, object_type: int, index: int, - subindex: int = 0 + subindex: int = 0, ) -> ODVariable: """Create a object dictionary entry. :param eds: String stream of the eds file - :param section: + :param section: EDS file section corresponding to the object :param node_id: Node ID :param index: Index of the CANOpen object :param subindex: Subindex of the CANOpen object (if present, else 0) @@ -336,64 +334,59 @@ def build_variable( var.pdo_mappable = bool(int(eds.get(section, "PDOMapping", fallback="0"), 0)) - if eds.has_option(section, "LowLimit"): + if (raw_string := eds.get(section, "LowLimit", fallback=None)) is not None: try: - min_string = eds.get(section, "LowLimit") if var.data_type in datatypes.SIGNED_TYPES: - var.min = _signed_int_from_hex(min_string, _calc_bit_length(var.data_type)) + var.min = _signed_int_from_hex(raw_string, _calc_bit_length(var.data_type)) else: - var.min = int(min_string, 0) + var.min = int(raw_string, 0) except ValueError: logger.warning( - "Invalid LowLimit %r for %s (0x%X), ignoring", - min_string, var.name, var.index, + "Invalid LowLimit %r for %s (0x%X), ignoring", raw_string, var.name, var.index ) - if eds.has_option(section, "HighLimit"): + if (raw_string := eds.get(section, "HighLimit", fallback=None)) is not None: try: - max_string = eds.get(section, "HighLimit") if var.data_type in datatypes.SIGNED_TYPES: - var.max = _signed_int_from_hex(max_string, _calc_bit_length(var.data_type)) + var.max = _signed_int_from_hex(raw_string, _calc_bit_length(var.data_type)) else: - var.max = int(max_string, 0) + var.max = int(raw_string, 0) except ValueError: logger.warning( - "Invalid HighLimit %r for %s (0x%X), ignoring", - max_string, var.name, var.index, + "Invalid HighLimit %r for %s (0x%X), ignoring", raw_string, var.name, var.index ) - if eds.has_option(section, "DefaultValue"): + if (raw_string := eds.get(section, "DefaultValue", fallback=None)) is not None: + var.default_raw = raw_string # type: ignore[attr-defined] # custom round-trip addition try: - var.default_raw = eds.get(section, "DefaultValue") - if '$NODEID' in var.default_raw: + if '$NODEID' in raw_string: var.relative = True - var.default = _decode_from_eds(node_id, var.data_type, var.default_raw) + var.default = _decode_from_eds(node_id, var.data_type, raw_string) except ValueError: logger.warning( "Invalid DefaultValue %r for %s (0x%X), ignoring", - var.default_raw, var.name, var.index, + raw_string, var.name, var.index, ) - if eds.has_option(section, "ParameterValue"): + if (raw_string := eds.get(section, "ParameterValue", fallback=None)) is not None: + var.value_raw = raw_string # type: ignore[attr-defined] # custom round-trip addition try: - var.value_raw = eds.get(section, "ParameterValue") - var.value = _decode_from_eds(node_id, var.data_type, var.value_raw) + var.value = _decode_from_eds(node_id, var.data_type, raw_string) except ValueError: logger.warning( "Invalid ParameterValue %r for %s (0x%X), ignoring", - var.value_raw, var.name, var.index, + raw_string, var.name, var.index, ) # Factor, Description and Unit are not standard according to the CANopen specifications, but # they are implemented in the python canopen package, so we can at least try to use them - if eds.has_option(section, "Factor"): + if (raw_string := eds.get(section, "Factor", fallback=None)) is not None: try: - var.factor = float(eds.get(section, "Factor")) + var.factor = float(raw_string) except ValueError: logger.warning( - "Invalid Factor %r for %s (0x%X), ignoring", - eds.get(section, "Factor"), var.name, var.index, + "Invalid Factor %r for %s (0x%X), ignoring", raw_string, var.name, var.index ) - if eds.has_option(section, "Description"): - var.description = eds.get(section, "Description") - if eds.has_option(section, "Unit"): - var.unit = eds.get(section, "Unit") + if (raw_string := eds.get(section, "Description", fallback=None)) is not None: + var.description = raw_string + if (raw_string := eds.get(section, "Unit", fallback=None)) is not None: + var.unit = raw_string var.custom_options = _get_custom_options(eds, section) return var @@ -445,30 +438,28 @@ def export_variable(var, eds): if getattr(var, 'default_raw', None) is not None: eds.set(section, "DefaultValue", var.default_raw) - elif getattr(var, 'default', None) is not None: - eds.set(section, "DefaultValue", _encode_to_eds( - var.data_type, var.default)) + elif var.default is not None: + eds.set(section, "DefaultValue", _encode_to_eds(var.data_type, var.default)) if device_commisioning: if getattr(var, 'value_raw', None) is not None: eds.set(section, "ParameterValue", var.value_raw) - elif getattr(var, 'value', None) is not None: - eds.set(section, "ParameterValue", - _encode_to_eds(var.data_type, var.value)) + elif var.value is not None: + eds.set(section, "ParameterValue", _encode_to_eds(var.data_type, var.value)) eds.set(section, "DataType", f"0x{var.data_type:04X}") eds.set(section, "PDOMapping", hex(var.pdo_mappable)) - if getattr(var, 'min', None) is not None: + if var.min is not None: eds.set(section, "LowLimit", var.min) - if getattr(var, 'max', None) is not None: + if var.max is not None: eds.set(section, "HighLimit", var.max) - if getattr(var, 'description', '') != '': + if var.description != '': eds.set(section, "Description", var.description) - if getattr(var, 'factor', 1) != 1: + if var.factor != 1: eds.set(section, "Factor", var.factor) - if getattr(var, 'unit', '') != '': + if var.unit != '': eds.set(section, "Unit", var.unit) for option, value in var.custom_options.items(): @@ -498,7 +489,7 @@ def export_record(var, eds): try: # only if eds was loaded by us - origFileInfo = od.__edsFileInfo + origFileInfo = od.__edsFileInfo # type: ignore[attr-defined] # custom addition except AttributeError: origFileInfo = { # just set some defaults @@ -584,12 +575,12 @@ def optional_indices(x): supported_optional_indices = list(filter(optional_indices, od)) supported_manufacturer_indices = list(filter(manufacturer_indices, od)) - def add_list(section, list): + def add_list(section, lst): eds.add_section(section) - eds.set(section, "SupportedObjects", len(list)) - for i in range(0, len(list)): - eds.set(section, (i + 1), f"0x{list[i]:04X}") - for index in list: + eds.set(section, "SupportedObjects", len(lst)) + for i in range(0, len(lst)): + eds.set(section, (i + 1), f"0x{lst[i]:04X}") + for index in lst: export_object(od[index], eds) add_list("MandatoryObjects", supported_mantatory_indices)