44import asyncio
55from collections .abc import Awaitable , Callable
66from dataclasses import dataclass
7- from typing import TypeAlias
7+ from typing import TypeAlias , cast
88
99from pymodbus .constants import DATATYPE_STRUCT , DataType
1010from pymodbus .pdu import ExceptionResponse
1111
1212
13- SimValueTypeSimple : TypeAlias = int | float | str | bool | bytes
14- SimValueType : TypeAlias = SimValueTypeSimple | list [SimValueTypeSimple ]
13+ SimValueTypeSimple : TypeAlias = int | float | str | bytes
14+ SimValueType : TypeAlias = SimValueTypeSimple | list [SimValueTypeSimple | bool ]
1515SimAction : TypeAlias = Callable [[int , int , list [int ]], Awaitable [list [int ] | ExceptionResponse ]]
1616
17- @dataclass (frozen = True )
17+ @dataclass (order = True , frozen = True )
1818class SimData :
1919 """Configure a group of continuous identical values/registers.
2020
@@ -83,7 +83,7 @@ class SimData:
8383
8484 #: Value/Values of datatype,
8585 #: will automatically be converted to registers, according to datatype.
86- value : SimValueType = 0
86+ values : SimValueType = 0
8787
8888 #: Used to check access and convert value to/from registers.
8989 datatype : DataType = DataType .REGISTERS
@@ -104,22 +104,68 @@ class SimData:
104104 #: .. tip:: use functools.partial to add extra parameters if needed.
105105 action : SimAction | None = None
106106
107+ #: Mark register(s) as readonly.
108+ readonly : bool = False
107109
108- def __post_init__ (self ):
109- """Define a group of registers."""
110- if not isinstance (self .address , int ) or not 0 <= self .address < 65535 :
110+ #: Mark register(s) as invalid.
111+ #: **remark** only to be used with address= and count=
112+ invalid : bool = False
113+
114+ #: Use as default for undefined registers
115+ #: Define legal register range as:
116+ #:
117+ #: address= <= legal addresses <= address= + count=
118+ #:
119+ #: **remark** only to be used with address= and count=
120+ default : bool = False
121+
122+ #: The following are internal variables
123+ register_count : int = - 1
124+ type_size : int = - 1
125+
126+ def __check_default (self ):
127+ """Check use of default=."""
128+ if self .datatype != DataType .REGISTERS :
129+ raise TypeError ("default=True only works with datatype=DataType.REGISTERS" )
130+ if isinstance (self .values , list ):
131+ raise TypeError ("default=True only works with values=<integer>" )
132+
133+ def __check_simple (self ):
134+ """Check simple parameters."""
135+ if not isinstance (self .address , int ) or not 0 <= self .address <= 65535 :
111136 raise TypeError ("0 <= address < 65535" )
112- if not isinstance (self .count , int ) or not 0 <= self .count < 65535 :
113- raise TypeError ("0 <= count < 65535" )
137+ if not isinstance (self .count , int ) or not 1 <= self .count <= 65536 :
138+ raise TypeError ("1 <= count < 65536" )
139+ if not 1 <= self .address + self .count <= 65536 :
140+ raise TypeError ("1 <= address + count < 65536" )
114141 if not isinstance (self .datatype , DataType ):
115- raise TypeError ("datatype must by an DataType" )
116- if isinstance (self .value , list ):
117- if self .count > 1 or self .datatype == DataType .STRING :
118- raise TypeError ("count > 1 cannot be combined with given values=" )
119- for entry in self .value :
120- if not isinstance (entry , DATATYPE_STRUCT [self .datatype ][0 ]) or isinstance (entry , str ):
121- raise TypeError (f"elements in values must be { self .datatype !s} and not string" )
122- elif not isinstance (self .value , DATATYPE_STRUCT [self .datatype ][0 ]):
123- raise TypeError (f"value must be { self .datatype !s} " )
142+ raise TypeError ("datatype= must by an DataType" )
124143 if self .action and not (callable (self .action ) and asyncio .iscoroutinefunction (self .action )):
125- raise TypeError ("action not a async function" )
144+ raise TypeError ("action= not a async function" )
145+ if self .register_count != - 1 :
146+ raise TypeError ("register_count= is illegal" )
147+ if self .type_size != - 1 :
148+ raise TypeError ("type_size= is illegal" )
149+
150+ def __post_init__ (self ):
151+ """Define a group of registers."""
152+ self .__check_simple ()
153+ if self .default :
154+ self .__check_default ()
155+ x_datatype : type | tuple [type , type ]
156+ if self .datatype == DataType .STRING :
157+ if not isinstance (self .values , str ):
158+ raise TypeError ("datatype=DataType.STRING only allows values=\" string\" " )
159+ x_datatype , x_len = str , int ((len (self .values ) + 1 ) / 2 )
160+ else :
161+ x_datatype , x_len = DATATYPE_STRUCT [self .datatype ]
162+ if not isinstance (self .values , list ):
163+ super ().__setattr__ ("values" , [self .values ])
164+ for x_value in cast (list , self .values ):
165+ if not isinstance (x_value , x_datatype ):
166+ raise TypeError (f"value= can only contain { x_datatype !s} " )
167+ super ().__setattr__ ("register_count" , self .count * x_len )
168+ super ().__setattr__ ("type_size" , x_len )
169+
170+
171+
0 commit comments