Skip to content
163 changes: 77 additions & 86 deletions canopen/objectdictionary/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def import_eds(source, node_id):
eds = RawConfigParser(inline_comment_prefixes=(';',))
eds.optionxform = str
eds.optionxform = lambda optionstr: str(optionstr) # type: ignore[method-assign]
opened_here = False
try:
if hasattr(source, "read"):
Expand Down Expand Up @@ -80,16 +80,13 @@ def import_eds(source, node_id):
(bool, "LSS_Supported", "LSS_supported"),
]:
try:
if t in (int, bool):
setattr(od.device_information, odprop,
t(int(eds.get("DeviceInfo", eprop), 0))
)
elif t is str:
setattr(od.device_information, odprop,
eds.get("DeviceInfo", eprop)
)
raw_string = eds.get("DeviceInfo", eprop)
except NoOptionError:
pass
continue
if t in (int, bool):
setattr(od.device_information, odprop, t(int(raw_string, 0)))
elif t is str:
setattr(od.device_information, odprop, raw_string)

if eds.has_section("DeviceComissioning"):
if val := eds.getint("DeviceComissioning", "Baudrate", fallback=None):
Expand All @@ -100,10 +97,12 @@ def import_eds(source, node_id):
node_id = int(val, base=0)
od.node_id = node_id

DUMMY_SECTION_REGEX = re.compile(r"^[Dd]ummy[Uu]sage$")
INDEX_SECTION_REGEX = re.compile(r"^[0-9A-Fa-f]{4}$")
SUB_SECTION_REGEX = re.compile(r"^([0-9A-Fa-f]{4})[S|s]ub([0-9A-Fa-f]+)$")
NAME_SECTION_REGEX = re.compile(r"^([0-9A-Fa-f]{4})Name")
for section in eds.sections():
# Match dummy definitions
match = re.match(r"^[Dd]ummy[Uu]sage$", section)
if match is not None:
if DUMMY_SECTION_REGEX.match(section) is not None:
for i in range(1, 8):
key = f"Dummy{i:04d}"
if eds.getint(section, key) == 1:
Expand All @@ -112,9 +111,7 @@ def import_eds(source, node_id):
var.access_type = "const"
od.add_object(var)

# Match indexes
match = re.match(r"^[0-9A-Fa-f]{4}$", section)
if match is not None:
if INDEX_SECTION_REGEX.match(section) is not None:
index = int(section, 16)
name = eds.get(section, "ParameterName")
try:
Expand Down Expand Up @@ -154,11 +151,9 @@ def import_eds(source, node_id):

continue

# Match subindexes
match = re.match(r"^([0-9A-Fa-f]{4})[S|s]ub([0-9A-Fa-f]+)$", section)
if match is not None:
index = int(match.group(1), 16)
subindex = int(match.group(2), 16)
if (m := SUB_SECTION_REGEX.match(section)) is not None:
index = int(m.group(1), 16)
subindex = int(m.group(2), 16)
entry = od[index]
if isinstance(entry, (ODRecord, ODArray)):
try:
Expand All @@ -169,13 +164,19 @@ def import_eds(source, node_id):
entry.add_member(var)

# Match [index]Name
match = re.match(r"^([0-9A-Fa-f]{4})Name", section)
if match is not None:
index = int(match.group(1), 16)
if (m := NAME_SECTION_REGEX.match(section)) is not None:
index = int(m.group(1), 16)
num_of_entries = int(eds.get(section, "NrOfEntries"))
entry = od[index]
if not isinstance(entry, ODArray):
logger.warning(
"Compact subobject section %r refers to non-array object 0x%04X",
section,
index,
)
continue
# For CompactSubObj index 1 is were we find the variable
src_var = od[index][1]
src_var = entry[1]
for subindex in range(1, num_of_entries + 1):
var = copy_variable(eds, section, subindex, src_var)
if var is not None:
Expand Down Expand Up @@ -246,30 +247,27 @@ def _signed_int_from_hex(hex_str, bit_length):
def _decode_from_eds(node_id: int, var_type: int, value: Any) -> Any:
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
return bytes.fromhex(value)
elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
if var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
return value
elif var_type in datatypes.FLOAT_TYPES:
if var_type in datatypes.FLOAT_TYPES:
return float(value)
else:
# COB-ID can contain '$NODEID+' so replace this with node_id before converting
value = value.replace(" ", "").upper()
if '$NODEID' in value and node_id is not None:
return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id
else:
return int(value, 0)
# COB-ID can contain '$NODEID+' so replace this with node_id before converting
value = value.replace(" ", "").upper()
if '$NODEID' in value and node_id is not None:
return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id
return int(value, 0)


def _encode_to_eds(var_type: int, value: Any) -> Any:
if value is None:
return None
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
return bytes.hex(value)
elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
if var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
return value
elif var_type in datatypes.FLOAT_TYPES:
if var_type in datatypes.FLOAT_TYPES:
return value
else:
return f"0x{value:02X}"
return f"0x{value:02X}"


_STANDARD_OPTIONS = {
Expand Down Expand Up @@ -299,12 +297,12 @@ def build_variable(
node_id: int,
object_type: int,
index: int,
subindex: int = 0
subindex: int = 0,
) -> ODVariable:
"""Create a object dictionary entry.

:param eds: String stream of the eds file
:param section:
:param section: EDS file section corresponding to the object
:param node_id: Node ID
:param index: Index of the CANOpen object
:param subindex: Subindex of the CANOpen object (if present, else 0)
Expand Down Expand Up @@ -336,64 +334,59 @@ def build_variable(

var.pdo_mappable = bool(int(eds.get(section, "PDOMapping", fallback="0"), 0))

if eds.has_option(section, "LowLimit"):
if (raw_string := eds.get(section, "LowLimit", fallback=None)) is not None:
try:
min_string = eds.get(section, "LowLimit")
if var.data_type in datatypes.SIGNED_TYPES:
var.min = _signed_int_from_hex(min_string, _calc_bit_length(var.data_type))
var.min = _signed_int_from_hex(raw_string, _calc_bit_length(var.data_type))
else:
var.min = int(min_string, 0)
var.min = int(raw_string, 0)
except ValueError:
logger.warning(
"Invalid LowLimit %r for %s (0x%X), ignoring",
min_string, var.name, var.index,
"Invalid LowLimit %r for %s (0x%X), ignoring", raw_string, var.name, var.index
)
if eds.has_option(section, "HighLimit"):
if (raw_string := eds.get(section, "HighLimit", fallback=None)) is not None:
try:
max_string = eds.get(section, "HighLimit")
if var.data_type in datatypes.SIGNED_TYPES:
var.max = _signed_int_from_hex(max_string, _calc_bit_length(var.data_type))
var.max = _signed_int_from_hex(raw_string, _calc_bit_length(var.data_type))
else:
var.max = int(max_string, 0)
var.max = int(raw_string, 0)
except ValueError:
logger.warning(
"Invalid HighLimit %r for %s (0x%X), ignoring",
max_string, var.name, var.index,
"Invalid HighLimit %r for %s (0x%X), ignoring", raw_string, var.name, var.index
)
if eds.has_option(section, "DefaultValue"):
if (raw_string := eds.get(section, "DefaultValue", fallback=None)) is not None:
var.default_raw = raw_string # type: ignore[attr-defined] # custom round-trip addition
try:
var.default_raw = eds.get(section, "DefaultValue")
if '$NODEID' in var.default_raw:
if '$NODEID' in raw_string:
var.relative = True
var.default = _decode_from_eds(node_id, var.data_type, var.default_raw)
var.default = _decode_from_eds(node_id, var.data_type, raw_string)
except ValueError:
logger.warning(
"Invalid DefaultValue %r for %s (0x%X), ignoring",
var.default_raw, var.name, var.index,
raw_string, var.name, var.index,
)
if eds.has_option(section, "ParameterValue"):
if (raw_string := eds.get(section, "ParameterValue", fallback=None)) is not None:
var.value_raw = raw_string # type: ignore[attr-defined] # custom round-trip addition
try:
var.value_raw = eds.get(section, "ParameterValue")
var.value = _decode_from_eds(node_id, var.data_type, var.value_raw)
var.value = _decode_from_eds(node_id, var.data_type, raw_string)
except ValueError:
logger.warning(
"Invalid ParameterValue %r for %s (0x%X), ignoring",
var.value_raw, var.name, var.index,
raw_string, var.name, var.index,
)
# Factor, Description and Unit are not standard according to the CANopen specifications, but
# they are implemented in the python canopen package, so we can at least try to use them
if eds.has_option(section, "Factor"):
if (raw_string := eds.get(section, "Factor", fallback=None)) is not None:
try:
var.factor = float(eds.get(section, "Factor"))
var.factor = float(raw_string)
except ValueError:
logger.warning(
"Invalid Factor %r for %s (0x%X), ignoring",
eds.get(section, "Factor"), var.name, var.index,
"Invalid Factor %r for %s (0x%X), ignoring", raw_string, var.name, var.index
)
if eds.has_option(section, "Description"):
var.description = eds.get(section, "Description")
if eds.has_option(section, "Unit"):
var.unit = eds.get(section, "Unit")
if (raw_string := eds.get(section, "Description", fallback=None)) is not None:
var.description = raw_string
if (raw_string := eds.get(section, "Unit", fallback=None)) is not None:
var.unit = raw_string

var.custom_options = _get_custom_options(eds, section)
return var
Expand Down Expand Up @@ -445,30 +438,28 @@ def export_variable(var, eds):

if getattr(var, 'default_raw', None) is not None:
eds.set(section, "DefaultValue", var.default_raw)
elif getattr(var, 'default', None) is not None:
eds.set(section, "DefaultValue", _encode_to_eds(
var.data_type, var.default))
elif var.default is not None:
eds.set(section, "DefaultValue", _encode_to_eds(var.data_type, var.default))

if device_commisioning:
if getattr(var, 'value_raw', None) is not None:
eds.set(section, "ParameterValue", var.value_raw)
elif getattr(var, 'value', None) is not None:
eds.set(section, "ParameterValue",
_encode_to_eds(var.data_type, var.value))
elif var.value is not None:
eds.set(section, "ParameterValue", _encode_to_eds(var.data_type, var.value))

eds.set(section, "DataType", f"0x{var.data_type:04X}")
eds.set(section, "PDOMapping", hex(var.pdo_mappable))

if getattr(var, 'min', None) is not None:
if var.min is not None:
eds.set(section, "LowLimit", var.min)
if getattr(var, 'max', None) is not None:
if var.max is not None:
eds.set(section, "HighLimit", var.max)

if getattr(var, 'description', '') != '':
if var.description != '':
eds.set(section, "Description", var.description)
if getattr(var, 'factor', 1) != 1:
if var.factor != 1:
eds.set(section, "Factor", var.factor)
if getattr(var, 'unit', '') != '':
if var.unit != '':
eds.set(section, "Unit", var.unit)

for option, value in var.custom_options.items():
Expand Down Expand Up @@ -498,7 +489,7 @@ def export_record(var, eds):

try:
# only if eds was loaded by us
origFileInfo = od.__edsFileInfo
origFileInfo = od.__edsFileInfo # type: ignore[attr-defined] # custom addition
except AttributeError:
origFileInfo = {
# just set some defaults
Expand Down Expand Up @@ -584,12 +575,12 @@ def optional_indices(x):
supported_optional_indices = list(filter(optional_indices, od))
supported_manufacturer_indices = list(filter(manufacturer_indices, od))

def add_list(section, list):
def add_list(section, lst):
eds.add_section(section)
eds.set(section, "SupportedObjects", len(list))
for i in range(0, len(list)):
eds.set(section, (i + 1), f"0x{list[i]:04X}")
for index in list:
eds.set(section, "SupportedObjects", len(lst))
for i in range(0, len(lst)):
eds.set(section, (i + 1), f"0x{lst[i]:04X}")
for index in lst:
export_object(od[index], eds)

add_list("MandatoryObjects", supported_mantatory_indices)
Expand Down
Loading