|
| 1 | +# TinyTuya Contrib RFRemoteControlDevice Module |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | +""" |
| 4 | + A community-contributed Python module to add support for Tuya WiFi smart universal RF remote controller |
| 5 | +
|
| 6 | + This module attempts to provide everything needed so there is no need to import the base tinytuya module |
| 7 | +
|
| 8 | + Module Author: uzlonewolf (https://github.com/uzlonewolf) |
| 9 | + Based on IRRemoteControlDevice by Alexey 'Cluster' Avdyukhin (https://github.com/clusterm) |
| 10 | +
|
| 11 | + Local Control Classes |
| 12 | + RFRemoteControlDevice(..., version=3.3) |
| 13 | + This class uses a default version of 3.3 |
| 14 | + See OutletDevice() for the other constructor arguments |
| 15 | +
|
| 16 | + Functions: |
| 17 | + rf = RFRemoteControlDevice(..., control_type=None) |
| 18 | + -> will immediately connect to the device to try and detect the control type if control_type is not provided |
| 19 | + control_type=1 for older devices using DPS 201/202 |
| 20 | + control_type=2 for newer devices using DPS 1-13 |
| 21 | +
|
| 22 | + rf.send_command( mode, data={} ) |
| 23 | + -> sends a command to the device |
| 24 | + IRRemoteControlDevice.send_command() is used when mode is not 'rf_study', 'rfstudy_exit', 'rfstudy_send', |
| 25 | + 'rf_shortstudy', 'rfshortstudy_exit', or 'send_cmd' |
| 26 | +
|
| 27 | + rf.rf_study_start( freq=0, short=False ) |
| 28 | + rf.rf_study_end( freq=0, short=False ) |
| 29 | + -> start or end a study session |
| 30 | + freq=0 auto-detects the frequency, or it can be specified as i.e. freq="433.92" or freq="315" |
| 31 | + when sort=True, 'rf_shortstudy' is used instead of 'rf_study' |
| 32 | +
|
| 33 | + rf.rf_receive_button( freq=0, timeout ) |
| 34 | + -> call this method and press button on real remote control to read its code in Base64 format |
| 35 | + freq - 0 to auto-detect |
| 36 | + timeout - maximum time to wait for button press |
| 37 | +
|
| 38 | + rf.rf_send_button( base64_code, times=6, delay=0, intervals=0 ) |
| 39 | + -> send a learned (raw base64-encoded) button press |
| 40 | +
|
| 41 | + rf.rf_send_key( keys, cmt_bank, system_bank, frequency_bank, datarate_bank, baseband_bank, tx_bank, mode=8, freq=0, rate=0 ) |
| 42 | + -> send pre-defined key(s) |
| 43 | + The *_bank values are directly copied from CMOSTEK's RFPDK software (select chip "CMT2300A") |
| 44 | + 'keys' can be: |
| 45 | + a dict containing 'code', 'delay', 'intervals', and 'times' |
| 46 | + a single hex string |
| 47 | + a list or tuple containing dicts or hex strings |
| 48 | +
|
| 49 | + RFRemoteControlDevice.rf_print_button( base64_code ) |
| 50 | + -> prints and returns the JSON dict as a string from a base64-encoded learned button |
| 51 | + the base64 string is base64 decoded but not JSON parsed |
| 52 | +
|
| 53 | + RFRemoteControlDevice.rf_decode_button( base64_code ) |
| 54 | + -> returns the JSON dict as a dict from a base64-encoded learned button |
| 55 | + the base64 string is base64 decoded and then JSON parsed |
| 56 | +
|
| 57 | +""" |
| 58 | + |
| 59 | +import base64 |
| 60 | +import json |
| 61 | +import logging |
| 62 | +import struct |
| 63 | +import time |
| 64 | + |
| 65 | +from ..core import log, CONTROL |
| 66 | +from .IRRemoteControlDevice import IRRemoteControlDevice |
| 67 | + |
| 68 | +# extends IRRemoteControlDevice |
| 69 | +class RFRemoteControlDevice(IRRemoteControlDevice): |
| 70 | + def send_command( self, mode, data={} ): |
| 71 | + if( mode in ('rf_study', 'rfstudy_exit', 'rfstudy_send', 'rf_shortstudy', 'rfshortstudy_exit') ): |
| 72 | + if 'rf_type' not in data or not data['rf_type']: |
| 73 | + data['rf_type'] = 'sub_2g' |
| 74 | + if 'freq' not in data or not data['freq']: |
| 75 | + data['freq'] = '0' |
| 76 | + if 'ver' not in data or not data['ver']: |
| 77 | + data['ver'] = '2' |
| 78 | + command = { RFRemoteControlDevice.NSDP_CONTROL: mode, 'rf_type': data['rf_type'], 'study_feq': data['freq'], 'ver': data['ver'] } |
| 79 | + if mode == 'rfstudy_send': |
| 80 | + for i in range( 1, 10 ): |
| 81 | + k = 'key%d' % i |
| 82 | + if k in data: |
| 83 | + command[k] = data[k] |
| 84 | + self.set_value( RFRemoteControlDevice.DP_SEND_IR, json.dumps(command), nowait=True ) |
| 85 | + elif mode == 'send_cmd': |
| 86 | + data[RFRemoteControlDevice.NSDP_CONTROL] = mode |
| 87 | + self.set_value( RFRemoteControlDevice.DP_SEND_IR, json.dumps(data), nowait=True ) |
| 88 | + else: |
| 89 | + super(RFRemoteControlDevice, self).send_command( mode, data ) |
| 90 | + |
| 91 | + def rf_study_start( self, freq=0, short=False ): |
| 92 | + # {"dps":{"201":"{\"rf_type\":\"sub_2g\",\"control\":\"rf_study\",\"study_feq\":\"433\",\"ver\":\"2\"}"} |
| 93 | + data = { 'freq': str(freq) } |
| 94 | + cmd = 'rf_shortstudy' if short else 'rf_study' |
| 95 | + self.send_command( cmd, data ) |
| 96 | + |
| 97 | + def rf_study_end( self, freq=0, short=False ): |
| 98 | + # {"dps":{"201":"{\"rf_type\":\"sub_2g\",\"control\":\"rfstudy_exit\",\"study_feq\":\"433\",\"ver\":\"2\"}"} |
| 99 | + data = { 'freq': str(freq) } |
| 100 | + cmd = 'rfshortstudy_exit' if short else 'rfstudy_exit' |
| 101 | + self.send_command( cmd, data ) |
| 102 | + |
| 103 | + def rf_receive_button( self, freq=0, timeout=30 ): |
| 104 | + log.debug("Receiving button") |
| 105 | + # Exit study mode in case it's enabled |
| 106 | + self.rf_study_end() |
| 107 | + # Enable study mode |
| 108 | + self.rf_study_start( freq=freq ) |
| 109 | + |
| 110 | + # Receiving button code |
| 111 | + response = None |
| 112 | + response_code = None |
| 113 | + found = False |
| 114 | + # Remember old timeout and set new timeout |
| 115 | + old_timeout = self.connection_timeout |
| 116 | + end_at_time = time.time() + timeout |
| 117 | + old_persist = self.socketPersistent |
| 118 | + self.set_socketPersistent( True ) |
| 119 | + try: |
| 120 | + while end_at_time > time.time(): |
| 121 | + timeo = round(time.time() - end_at_time) |
| 122 | + if timeo < 1: timeo = 1 |
| 123 | + self.set_socketTimeout(timeo) |
| 124 | + |
| 125 | + log.debug("Waiting for button...") |
| 126 | + response = self._send_receive(None) |
| 127 | + if response == None: |
| 128 | + # Nothing received |
| 129 | + log.debug("Timeout") |
| 130 | + elif type(response) != dict or "dps" not in response: |
| 131 | + # Some unexpected result |
| 132 | + log.debug("Unexpected response: %r", response) |
| 133 | + response_code = response # Some error message? Pass it. |
| 134 | + break |
| 135 | + elif self.DP_LEARNED_ID in response["dps"]: |
| 136 | + # Button code received, extracting it as Base64 string |
| 137 | + log.info( 'Response (type 1): %r', response ) |
| 138 | + response_code = response["dps"][self.DP_LEARNED_ID] |
| 139 | + found = True |
| 140 | + break |
| 141 | + elif self.DP_LEARNED_REPORT in response["dps"]: |
| 142 | + log.info( 'Response (type 2): %r', response ) |
| 143 | + response_code = response["dps"][self.DP_LEARNED_REPORT] |
| 144 | + found = True |
| 145 | + break |
| 146 | + else: |
| 147 | + # Unknown DPS |
| 148 | + log.debug("Unknown DPS in response: %r", response) |
| 149 | + response_code = response # Pass it if we do not get a response we like |
| 150 | + # try again |
| 151 | + finally: |
| 152 | + # Revert timeout |
| 153 | + self.set_socketTimeout(old_timeout) |
| 154 | + |
| 155 | + if found: |
| 156 | + self.rf_print_button( response_code ) |
| 157 | + |
| 158 | + # Exit study mode |
| 159 | + self.rf_study_end( freq=freq ) |
| 160 | + |
| 161 | + if not old_persist: |
| 162 | + self.set_socketPersistent( False ) |
| 163 | + |
| 164 | + return response_code |
| 165 | + |
| 166 | + def rf_send_button( self, base64_code, times=6, delay=0, intervals=0 ): |
| 167 | + # key1\":{\"code\":\"eyJud..iI==\",\"times\":6,\"delay\":0,\"intervals\":0}}"}}' |
| 168 | + log.debug( 'Sending Learned RF Button: ' + base64_code) |
| 169 | + self.rf_print_button( base64_code ) |
| 170 | + |
| 171 | + bdata = self.rf_decode_button( base64_code ) |
| 172 | + key1 = { 'code': base64_code, 'times': times, 'delay': delay, 'intervals': intervals } |
| 173 | + data = { 'key1': key1 } |
| 174 | + if bdata: |
| 175 | + if 'study_feq' in bdata: data['freq'] = bdata['study_feq'] |
| 176 | + if 'ver' in bdata: data['ver'] = bdata['ver'] |
| 177 | + return self.send_command( 'rfstudy_send', data ) |
| 178 | + |
| 179 | + def rf_send_key( self, keys, cmt_bank, system_bank, frequency_bank, datarate_bank, baseband_bank, tx_bank, mode=8, freq=0, rate=0 ): |
| 180 | + """ |
| 181 | + 'keys' can be: |
| 182 | + a dict containing 'code', 'delay', 'intervals', and 'times' |
| 183 | + a single hex string |
| 184 | + a list or tuple containing dicts or hex strings |
| 185 | +
|
| 186 | + The *_bank values are directly copied from CMOSTEK's RFPDK software (select chip "CMT2300A") |
| 187 | + Example: |
| 188 | + Baseband "b": [2,0,0,0,0,0,0,0,0,0,0,0,0,0,19,0,0,0,0,0,0,0,0,96,255,0,0,31,16] |
| 189 | + CMT "c": [0,102,236,28,240,128,20,8,145,2,2,208] |
| 190 | + System "s": [174,224,53,0,0,244,16,226,66,32,0,129] |
| 191 | + Data Rate "d": [63,30,128,204,0,0,0,0,0,0,0,41,192,218,33,75,5,0,80,45,0,1,5,5] |
| 192 | + TX "t": [81,154,12,0,12,176,0,31,4,63,127] |
| 193 | + Frequency "f": [66,113,206,28,66,91,28,28] |
| 194 | + """ |
| 195 | + if len(cmt_bank) != 12 or type(cmt_bank) not in (list, tuple): |
| 196 | + raise ValueError( 'CMT Bank list/tuple size must be 12' ) |
| 197 | + if len(system_bank) != 12 or type(system_bank) not in (list, tuple): |
| 198 | + raise ValueError( 'System Bank list/tuple size must be 12' ) |
| 199 | + if len(frequency_bank) != 8 or type(frequency_bank) not in (list, tuple): |
| 200 | + raise ValueError( 'Frequency Bank list/tuple size must be 8' ) |
| 201 | + if len(datarate_bank) != 24 or type(datarate_bank) not in (list, tuple): |
| 202 | + raise ValueError( 'Data Rate Bank list/tuple size must be 24' ) |
| 203 | + if len(baseband_bank) != 29 or type(baseband_bank) not in (list, tuple): |
| 204 | + raise ValueError( 'Baseband Bank list/tuple size must be 29' ) |
| 205 | + if len(tx_bank) != 11 or type(tx_bank) not in (list, tuple): |
| 206 | + raise ValueError( 'TX Bank list/tuple size must be 11' ) |
| 207 | + |
| 208 | + # {"dps":{"201":"{\"rf_type\":\"sub_2g\",\"mode\":8,\"key1\":{\"code\":\"ffffc01fa4934924924924934d34924da4926db0\",\"delay\":0,\"intervals\":0,\"times\":5},\"feq\":0,\"rate\":0,\"cfg\":{\"b\":[2,0,0,0,0,0,0,0,0,0,0,0,0,0,19,0,0,0,0,0,0,0,0,96,255,0,0,31,16],\"c\":[0,102,236,28,240,128,20,8,145,2,2,208],\"s\":[174,224,53,0,0,244,16,226,66,32,0,129],\"d\":[63,30,128,204,0,0,0,0,0,0,0,41,192,218,33,75,5,0,80,45,0,1,5,5],\"t\":[81,154,12,0,12,176,0,31,4,63,127],\"f\":[66,113,206,28,66,91,28,28]},\"control\":\"send_cmd\"}"} |
| 209 | + |
| 210 | + if type(keys) == dict: |
| 211 | + data = { 'key1': keys } |
| 212 | + elif type(keys) == str: |
| 213 | + data = { 'key1': { 'code': keys, 'delay': 0, 'intervals': 0, 'times': 5 } } |
| 214 | + elif type(keys) in (list, tuple): |
| 215 | + i = 1 |
| 216 | + data = {} |
| 217 | + for k in keys: |
| 218 | + kkey = 'key%d' % i |
| 219 | + if type(k) == dict: |
| 220 | + data[kkey] = k |
| 221 | + elif type(k) == str: |
| 222 | + data[kkey] = { 'code': k, 'delay': 0, 'intervals': 0, 'times': 5 } |
| 223 | + else: |
| 224 | + raise ValueError( 'rf_send_key(): Unknown data type for key: %r' % k ) |
| 225 | + else: |
| 226 | + raise ValueError( 'rf_send_key(): Unknown data type for keys: %r' % keys ) |
| 227 | + |
| 228 | + default = { 'delay': 0, 'intervals': 0, 'times': 5 } |
| 229 | + for k in data: |
| 230 | + for d in default: |
| 231 | + if d not in data[k] or type(data[k][d]) != int: |
| 232 | + data[k][d] = default[d] |
| 233 | + |
| 234 | + data['rf_type'] = 'sub_2g' |
| 235 | + data['mode'] = mode |
| 236 | + data['feq'] = freq |
| 237 | + data['rate'] = rate |
| 238 | + data['cfg'] = { 'c': cmt_bank, 's': system_bank, 'f': frequency_bank, 'd': datarate_bank, 'b': baseband_bank, 't': tx_bank } |
| 239 | + |
| 240 | + log.info( 'Sending Keys: %r', data ) |
| 241 | + return self.send_command( 'send_cmd', data ) |
| 242 | + |
| 243 | + @staticmethod |
| 244 | + def rf_print_button( base64_code, use_log=None ): |
| 245 | + if not use_log: use_log = log |
| 246 | + try: |
| 247 | + jstr = base64.b64decode( base64_code ) |
| 248 | + #jdata = json.loads( jstr ) |
| 249 | + use_log.debug( 'Learned button: %s', jstr ) |
| 250 | + return jstr |
| 251 | + except: |
| 252 | + use_log.debug( 'Failed to decode learned button: %r', base64_code ) |
| 253 | + return None |
| 254 | + |
| 255 | + @staticmethod |
| 256 | + def rf_decode_button( base64_code ): |
| 257 | + try: |
| 258 | + jstr = base64.b64decode |
| 259 | + jdata = json.loads( jstr ) |
| 260 | + return jdata |
| 261 | + except: |
| 262 | + return None |
0 commit comments