|
2 | 2 | from __future__ import annotations |
3 | 3 |
|
4 | 4 | from dataclasses import dataclass |
| 5 | +from typing import cast |
5 | 6 |
|
6 | | -from .simdata import SimData |
| 7 | +from pymodbus.constants import DATATYPE_STRUCT, DataType |
7 | 8 |
|
| 9 | +from .simdata import SimData |
8 | 10 |
|
9 | | -OFFSET_NONE = (-1, -1, -1, -1) |
10 | 11 |
|
11 | 12 | @dataclass(order=True, frozen=True) |
12 | 13 | class SimDevice: |
@@ -62,70 +63,140 @@ class SimDevice: |
62 | 63 | #: |
63 | 64 | registers: list[SimData] |
64 | 65 |
|
65 | | - #: Use this for old devices with 4 blocks. |
| 66 | + #: Default SimData to be used for registers not defined. |
| 67 | + default: SimData | None = None |
| 68 | + |
| 69 | + #: Define starting address for each of the 4 blocks. |
66 | 70 | #: |
67 | | - #: .. tip:: content is (coil, direct, holding, input) |
68 | | - offset_address: tuple[int, int, int, int] = OFFSET_NONE |
| 71 | + #: .. tip:: Content (coil, direct, holding, input) in growing order. |
| 72 | + offset_address: tuple[int, int, int, int] | None = None |
69 | 73 |
|
70 | 74 | #: Enforce type checking, if True access are controlled to be conform with datatypes. |
71 | 75 | #: |
72 | 76 | #: Type violations like e.g. reading INT32 as INT16 are returned as ExceptionResponses, |
73 | 77 | #: as well as being logged. |
74 | 78 | type_check: bool = False |
75 | 79 |
|
| 80 | + #: Change endianness. |
| 81 | + #: |
| 82 | + #: Word order is not defined in the modbus standard and thus a device that |
| 83 | + #: uses little-endian is still within the modbus standard. |
| 84 | + #: |
| 85 | + #: Byte order is defined in the modbus standard to be big-endian, |
| 86 | + #: however it is definable to test non-standard modbus devices |
| 87 | + #: |
| 88 | + #: ..tip:: Content (word_order, byte_order) |
| 89 | + endian: tuple[bool, bool] = (True, True) |
| 90 | + |
| 91 | + #: Set device identity |
| 92 | + #: |
| 93 | + identity: str = "pymodbus simulator/server" |
| 94 | + |
76 | 95 |
|
77 | 96 | def __check_block(self, block: list[SimData]) -> list[SimData]: |
78 | 97 | """Check block content.""" |
| 98 | + if not block: |
| 99 | + return block |
79 | 100 | for inx, entry in enumerate(block): |
80 | 101 | if not isinstance(entry, SimData): |
81 | 102 | raise TypeError(f"registers[{inx}]= is a SimData entry") |
82 | 103 | block.sort(key=lambda x: x.address) |
83 | | - return self.__check_block_entries(block) |
84 | | - |
85 | | - def __check_block_entries(self, block: list[SimData]) -> list[SimData]: |
86 | | - """Check block entries.""" |
87 | 104 | last_address = -1 |
88 | | - if len(block) > 1 and block[1].default: |
89 | | - temp = block[0] |
90 | | - block[0] = block[1] |
91 | | - block[1] = temp |
92 | | - first = True |
93 | 105 | for entry in block: |
94 | | - if entry.default: |
95 | | - if first: |
96 | | - first = False |
97 | | - continue |
98 | | - raise TypeError("Multiple default SimData, not allowed") |
99 | | - first = False |
100 | | - if entry.address <= last_address: |
101 | | - raise TypeError("SimData address {entry.address} is overlapping!") |
102 | | - last_address = entry.address + entry.register_count -1 |
103 | | - if not block[0].default: |
104 | | - default = SimData(address=block[0].address, count=last_address - block[0].address +1, default=True) |
105 | | - block.insert(0, default) |
106 | | - max_address = block[0].address + block[0].register_count -1 |
107 | | - if last_address > max_address: |
108 | | - raise TypeError("Default set max address {max_address} but {last_address} is defined?") |
109 | | - if len(block) > 1 and block[0].address > block[1].address: |
110 | | - raise TypeError("Default set lowest address to {block[0].address} but {block[1].address} is defined?") |
| 106 | + last_address = self.__check_block_entries(last_address, entry) |
| 107 | + if self.default and block: |
| 108 | + first_address = block[0].address |
| 109 | + if self.default.address > first_address: |
| 110 | + raise TypeError("Default address is {self.default.address} but {first_address} is defined?") |
| 111 | + def_last_address = self.default.address + self.default.count -1 |
| 112 | + if last_address > def_last_address: |
| 113 | + raise TypeError("Default address+count is {def_last_address} but {last_address} is defined?") |
111 | 114 | return block |
112 | 115 |
|
113 | | - def __post_init__(self): |
114 | | - """Define a device.""" |
| 116 | + def __check_block_entries(self, last_address: int, entry: SimData) -> int: |
| 117 | + """Check block entries.""" |
| 118 | + values = entry.values if isinstance(entry.values, list) else [entry.values] |
| 119 | + if entry.address <= last_address: |
| 120 | + raise TypeError("SimData address {entry.address} is overlapping!") |
| 121 | + if entry.datatype == DataType.BITS: |
| 122 | + if isinstance(values[0], bool): |
| 123 | + reg_count = int((len(values) + 15) / 16) |
| 124 | + else: |
| 125 | + reg_count = len(values) |
| 126 | + return entry.address + reg_count * entry.count -1 |
| 127 | + if entry.datatype == DataType.STRING: |
| 128 | + return entry.address + len(cast(str, entry.values)) * entry.count -1 |
| 129 | + register_count = DATATYPE_STRUCT[entry.datatype][1] |
| 130 | + return entry.address + register_count * entry.count -1 |
| 131 | + |
| 132 | + def __check_simple(self): |
| 133 | + """Check simple parameters.""" |
115 | 134 | if not isinstance(self.id, int) or not 0 <= self.id <= 255: |
116 | 135 | raise TypeError("0 <= id < 255") |
117 | | - if not isinstance(self.registers, list) or not self.registers: |
| 136 | + if not isinstance(self.registers, list): |
118 | 137 | raise TypeError("registers= not a list") |
| 138 | + if not self.default and not self.registers: |
| 139 | + raise TypeError("Either registers= or default= must contain SimData") |
119 | 140 | if not isinstance(self.type_check, bool): |
120 | 141 | raise TypeError("type_check= not a bool") |
| 142 | + if (not self.endian |
| 143 | + or not isinstance(self.endian, tuple) |
| 144 | + or len(self.endian) != 2 |
| 145 | + or not isinstance(self.endian[0], bool) |
| 146 | + or not isinstance(self.endian[1], bool) |
| 147 | + ): |
| 148 | + raise TypeError("endian= must be a tuple with 2 bool") |
| 149 | + if not isinstance(self.identity, str): |
| 150 | + raise TypeError("identity= must be a string") |
| 151 | + if not self.default: |
| 152 | + return |
| 153 | + if not isinstance(self.default, SimData): |
| 154 | + raise TypeError("default= must be a SimData object") |
| 155 | + if not self.default.datatype == DataType.REGISTERS: |
| 156 | + raise TypeError("default= only allow datatype=DataType.REGISTERS") |
| 157 | + |
| 158 | + def __post_init__(self): |
| 159 | + """Define a device.""" |
| 160 | + self.__check_simple() |
121 | 161 | super().__setattr__("registers", self.__check_block(self.registers)) |
122 | | - if self.offset_address != OFFSET_NONE: |
| 162 | + if self.offset_address is not None: |
| 163 | + if not isinstance(self.offset_address, tuple): |
| 164 | + raise TypeError("offset_address= must be a tuple") |
123 | 165 | if len(self.offset_address) != 4: |
124 | | - raise TypeError("offset_address= must have 4 addresses") |
125 | | - reg_start = self.registers[0].address |
126 | | - reg_end = self.registers[0].address + self.registers[0].register_count |
| 166 | + raise TypeError("offset_address= must be a tuple with 4 addresses") |
| 167 | + if self.default: |
| 168 | + reg_start = self.default.address |
| 169 | + reg_end = self.default.address + self.default.count -1 |
| 170 | + else: |
| 171 | + reg_start = self.registers[0].address |
| 172 | + reg_end = self.registers[-1].address |
127 | 173 | for i in range(4): |
128 | 174 | if not (reg_start < self.offset_address[i] < reg_end): |
129 | 175 | raise TypeError(f"offset_address[{i}] outside defined range") |
130 | 176 | if i and self.offset_address[i-1] >= self.offset_address[i]: |
131 | 177 | raise TypeError("offset_address= must be ascending addresses") |
| 178 | + |
| 179 | +@dataclass(order=True, frozen=True) |
| 180 | +class SimDevices: |
| 181 | + """Define a group of devices. |
| 182 | +
|
| 183 | + If wanting to use multiple devices in a single server, |
| 184 | + each SimDevice must be grouped with SimDevices. |
| 185 | + """ |
| 186 | + |
| 187 | + #: Add a list of SimDevice |
| 188 | + devices: list[SimDevice] |
| 189 | + |
| 190 | + def __post_init__(self): |
| 191 | + """Define a group of devices.""" |
| 192 | + if not isinstance(self.devices, list): |
| 193 | + raise TypeError("devices= must be a list of SimDevice") |
| 194 | + if not self.devices: |
| 195 | + raise TypeError("devices= must contain at least 1 SimDevice") |
| 196 | + list_id = [] |
| 197 | + for device in self.devices: |
| 198 | + if not isinstance(device, SimDevice): |
| 199 | + raise TypeError("devices= contains non SimDevice entries") |
| 200 | + if device.id in list_id: |
| 201 | + raise TypeError(f"device_id={device.id} is duplicated") |
| 202 | + list_id.append(device.id) |
0 commit comments