|
7 | 7 | from unittest.mock import MagicMock # Python 3 |
8 | 8 | except ImportError: |
9 | 9 | from mock import MagicMock # py2 use https://pypi.python.org/pypi/mock |
10 | | -from hashlib import md5 |
11 | 10 | import json |
12 | | -import logging |
13 | 11 | import struct |
14 | 12 |
|
15 | 13 | # Enable info logging to see version information |
|
24 | 22 |
|
25 | 23 | mock_byte_encoding = 'utf-8' |
26 | 24 |
|
27 | | -def compare_json_strings(json1, json2, ignoring_keys=None): |
28 | | - json1 = json.loads(json1) |
29 | | - json2 = json.loads(json2) |
30 | | - |
31 | | - if ignoring_keys is not None: |
32 | | - for key in ignoring_keys: |
33 | | - json1[key] = json2[key] |
34 | | - |
35 | | - return json.dumps(json1, sort_keys=True) == json.dumps(json2, sort_keys=True) |
36 | | - |
37 | | -def check_data_frame(data, expected_prefix, encrypted=True): |
38 | | - prefix = data[:15] |
39 | | - suffix = data[-8:] |
40 | | - |
41 | | - if encrypted: |
42 | | - payload_len = struct.unpack(">B",data[15:16])[0] # big-endian, unsigned char |
43 | | - version = data[16:19] |
44 | | - checksum = data[19:35] |
45 | | - encrypted_json = data[35:-8] |
46 | | - |
47 | | - json_data = tinytuya.AESCipher(LOCAL_KEY.encode(mock_byte_encoding)).decrypt(encrypted_json) |
48 | | - else: |
49 | | - json_data = data[16:-8].decode(mock_byte_encoding) |
50 | | - |
51 | | - frame_ok = True |
52 | | - if prefix != tinytuya.hex2bin(expected_prefix): |
53 | | - frame_ok = False |
54 | | - elif suffix != tinytuya.hex2bin("000000000000aa55"): |
55 | | - frame_ok = False |
56 | | - elif encrypted: |
57 | | - if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(suffix): |
58 | | - frame_ok = False |
59 | | - elif version != b"3.1": |
60 | | - frame_ok = False |
61 | | - |
62 | | - return json_data, frame_ok |
63 | | - |
64 | | -def mock_send_receive_set_timer(data): |
65 | | - if mock_send_receive_set_timer.call_counter == 0: |
66 | | - ret = 20*chr(0x0) + '{"devId":"DEVICE_ID","dps":{"1":false,"2":0}}' + 8*chr(0x0) |
67 | | - elif mock_send_receive_set_timer.call_counter == 1: |
68 | | - expected = '{"uid":"DEVICE_ID_HERE","devId":"DEVICE_ID_HERE","t":"","dps":{"2":6666}}' |
69 | | - json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") |
70 | | - |
71 | | - if frame_ok and compare_json_strings(json_data, expected, ['t']): |
72 | | - ret = '{"test_result":"SUCCESS"}' |
73 | | - else: |
74 | | - ret = '{"test_result":"FAIL"}' |
75 | | - |
76 | | - ret = ret.encode(mock_byte_encoding) |
77 | | - mock_send_receive_set_timer.call_counter += 1 |
78 | | - return ret |
79 | | - |
80 | | -def mock_send_receive_set_status(data): |
81 | | - expected = '{"dps":{"1":true},"uid":"DEVICE_ID_HERE","t":"1516117564","devId":"DEVICE_ID_HERE"}' |
82 | | - json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") |
83 | | - |
84 | | - if frame_ok and compare_json_strings(json_data, expected, ['t']): |
85 | | - ret = '{"test_result":"SUCCESS"}' |
86 | | - else: |
87 | | - logging.error("json data not the same: {} != {}".format(json_data, expected)) |
88 | | - ret = '{"test_result":"FAIL"}' |
89 | | - |
90 | | - ret = ret.encode(mock_byte_encoding) |
91 | | - return ret |
92 | | - |
93 | | -def mock_send_receive_status(data): |
94 | | - expected = '{"devId":"DEVICE_ID_HERE","gwId":"DEVICE_ID_HERE"}' |
95 | | - json_data, frame_ok = check_data_frame(data, "000055aa000000000000000a000000", False) |
96 | | - |
97 | | - # FIXME dead code block |
98 | | - if frame_ok and compare_json_strings(json_data, expected): |
99 | | - ret = '{"test_result":"SUCCESS"}' |
100 | | - else: |
101 | | - logging.error("json data not the same: {} != {}".format(json_data, expected)) |
102 | | - ret = '{"test_result":"FAIL"}' |
103 | | - |
104 | | - ret = 20*chr(0) + ret + 8*chr(0) |
105 | | - ret = ret.encode(mock_byte_encoding) |
106 | | - return ret |
107 | | - |
108 | | -def mock_send_receive_set_colour(data): |
109 | | - expected = '{"dps":{"2":"colour", "5":"ffffff000000ff"}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t":"1516117564"}' |
110 | | - |
111 | | - json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") |
112 | | - |
113 | | - if frame_ok and compare_json_strings(json_data, expected, ['t']): |
114 | | - ret = '{"test_result":"SUCCESS"}' |
115 | | - else: |
116 | | - logging.error("json data not the same: {} != {}".format(json_data, expected)) |
117 | | - ret = '{"test_result":"FAIL"}' |
118 | | - |
119 | | - ret = ret.encode(mock_byte_encoding) |
120 | | - return ret |
121 | | - |
122 | | -def mock_send_receive_set_white(data): |
123 | | - expected = '{"dps":{"2":"white", "3":255, "4":255}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t":"1516117564"}' |
124 | | - json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") |
125 | | - |
126 | | - if frame_ok and compare_json_strings(json_data, expected, ['t']): |
127 | | - ret = '{"test_result":"SUCCESS"}' |
128 | | - else: |
129 | | - logging.error("json data not the same: {} != {}".format(json_data, expected)) |
130 | | - ret = '{"test_result":"FAIL"}' |
131 | | - |
132 | | - ret = ret.encode(mock_byte_encoding) |
133 | | - return ret |
| 25 | + |
| 26 | +def get_results_from_mock(d): |
| 27 | + result_message_payload = d._send_receive.call_args[0][0] |
| 28 | + result_cmd = result_message_payload.cmd |
| 29 | + result_payload = json.loads(result_message_payload.payload.decode(mock_byte_encoding)) |
| 30 | + result_payload["t"] = "" # clear "t" |
| 31 | + |
| 32 | + return result_cmd, result_payload |
| 33 | + |
134 | 34 |
|
135 | 35 | class TestXenonDevice(unittest.TestCase): |
136 | 36 | def test_set_timer(self): |
| 37 | + # arrange |
137 | 38 | d = tinytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) |
138 | 39 | d.set_version(3.1) |
139 | | - d._send_receive = MagicMock(side_effect=mock_send_receive_set_timer) |
| 40 | + mock_response = {"devId":"DEVICE_ID","dps":{"1":False,"2":0}} |
| 41 | + d._send_receive = MagicMock(return_value=mock_response) |
140 | 42 |
|
141 | | - # Reset call_counter and start test |
142 | | - mock_send_receive_set_timer.call_counter = 0 |
143 | | - result = d.set_timer(6666) |
144 | | - result = result[result.find(b'{'):result.rfind(b'}')+1] |
145 | | - result = result.decode(mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json |
146 | | - result = json.loads(result) |
| 43 | + # act |
| 44 | + d.set_timer(6666) |
147 | 45 |
|
148 | | - # Make sure mock_send_receive_set_timer() has been called twice with correct parameters |
149 | | - self.assertEqual(result['test_result'], "SUCCESS") |
| 46 | + # gather results |
| 47 | + result_cmd, result_payload = get_results_from_mock(d) |
| 48 | + |
| 49 | + # expectations |
| 50 | + expected_cmd = tinytuya.CONTROL |
| 51 | + expected_payload = {"uid":"DEVICE_ID_HERE","devId":"DEVICE_ID_HERE","t":"","dps":{"2":6666}} |
| 52 | + |
| 53 | + # assert |
| 54 | + self.assertEqual(result_cmd, expected_cmd) |
| 55 | + self.assertDictEqual(result_payload, expected_payload) |
150 | 56 |
|
151 | 57 | def test_set_status(self): |
| 58 | + # arrange |
152 | 59 | d = tinytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) |
153 | 60 | d.set_version(3.1) |
154 | | - d._send_receive = MagicMock(side_effect=mock_send_receive_set_status) |
| 61 | + d._send_receive = MagicMock(return_value={"devId":"DEVICE_ID","dps":{"1":False,"2":0}}) |
| 62 | + |
| 63 | + # act |
| 64 | + d.set_status(True, 1) |
| 65 | + |
| 66 | + # gather results |
| 67 | + result_cmd, result_payload = get_results_from_mock(d) |
155 | 68 |
|
156 | | - result = d.set_status(True, 1) |
157 | | - result = result.decode(mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json |
158 | | - result = json.loads(result) |
| 69 | + # expectations |
| 70 | + expected_cmd = tinytuya.CONTROL |
| 71 | + expected_payload = {"dps":{"1":True},"uid":"DEVICE_ID_HERE","t":"","devId":"DEVICE_ID_HERE"} |
159 | 72 |
|
160 | | - # Make sure mock_send_receive_set_timer() has been called twice with correct parameters |
161 | | - self.assertEqual(result['test_result'], "SUCCESS") |
| 73 | + # assert |
| 74 | + self.assertEqual(result_cmd, expected_cmd) |
| 75 | + self.assertDictEqual(result_payload, expected_payload) |
162 | 76 |
|
163 | 77 | def test_status(self): |
| 78 | + # arrange |
164 | 79 | d = tinytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) |
165 | 80 | d.set_version(3.1) |
166 | | - d._send_receive = MagicMock(side_effect=mock_send_receive_status) |
| 81 | + d._send_receive = MagicMock(return_value={"devId":"DEVICE_ID","dps":{"1":False,"2":0}}) |
167 | 82 |
|
168 | | - result = d.status() |
| 83 | + # act |
| 84 | + d.status() |
| 85 | + |
| 86 | + # gather results |
| 87 | + result_cmd, result_payload = get_results_from_mock(d) |
| 88 | + |
| 89 | + # expectations |
| 90 | + expected_cmd = tinytuya.DP_QUERY |
| 91 | + expected_payload = {"devId": "DEVICE_ID_HERE", "gwId": "DEVICE_ID_HERE", "uid": "DEVICE_ID_HERE", "t": ""} |
| 92 | + |
| 93 | + # assert |
| 94 | + self.assertEqual(result_cmd, expected_cmd) |
| 95 | + self.assertDictEqual(result_payload, expected_payload) |
169 | 96 |
|
170 | | - # Make sure mock_send_receive_set_timer() has been called twice with correct parameters |
171 | | - self.assertEqual(result['test_result'], "SUCCESS") |
172 | | - |
173 | 97 | def test_set_colour(self): |
| 98 | + # arrange |
174 | 99 | d = tinytuya.BulbDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) |
| 100 | + d.status = MagicMock(return_value={}) # set_version calls this to figure out which commands it supports |
175 | 101 | d.set_version(3.1) |
176 | | - d._send_receive = MagicMock(side_effect=mock_send_receive_set_colour) |
| 102 | + d._send_receive = MagicMock(return_value={"devId":"DEVICE_ID","dps":{"2":"colour", "5":"ffffff000000ff"}}) |
| 103 | + |
| 104 | + # act |
| 105 | + d.set_colour(255,255,255) |
177 | 106 |
|
178 | | - result = d.set_colour(255,255,255) |
179 | | - result = result.decode(mock_byte_encoding) |
180 | | - result = json.loads(result) |
| 107 | + # gather results |
| 108 | + result_cmd, result_payload = get_results_from_mock(d) |
181 | 109 |
|
182 | | - self.assertEqual(result['test_result'], "SUCCESS") |
| 110 | + # expectations |
| 111 | + expected_cmd = tinytuya.CONTROL |
| 112 | + # expected_payload = {"dps":{"2":"colour", "5":"ffffff000000ff"}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t": ""} |
| 113 | + expected_payload = {"dps":{"21":"colour", "24":"0000000003e8"}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t": ""} |
| 114 | + |
| 115 | + # assert |
| 116 | + self.assertEqual(result_cmd, expected_cmd) |
| 117 | + self.assertDictEqual(result_payload, expected_payload) |
183 | 118 |
|
184 | 119 | def test_set_white(self): |
| 120 | + # arrange |
185 | 121 | d = tinytuya.BulbDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) |
| 122 | + d.status = MagicMock(return_value={}) # set_version calls this to figure out which commands it supports |
186 | 123 | d.set_version(3.1) |
187 | | - d._send_receive = MagicMock(side_effect=mock_send_receive_set_white) |
| 124 | + d._send_receive = MagicMock(return_value={"devId":"DEVICE_ID","dps":{"1":False,"2":0}}) |
| 125 | + |
| 126 | + # act |
| 127 | + d.set_white(255, 255) |
| 128 | + |
| 129 | + # gather results |
| 130 | + result_cmd, result_payload = get_results_from_mock(d) |
| 131 | + |
| 132 | + # expectations |
| 133 | + expected_cmd = tinytuya.CONTROL |
| 134 | + # expected_payload = {"dps":{"2":"white", "3": 255, "4": 255}, "devId": "DEVICE_ID_HERE","uid": "DEVICE_ID_HERE", "t": ""} |
| 135 | + expected_payload = {"dps":{"21":"white", "22": 255, "23": 255}, "devId": "DEVICE_ID_HERE","uid": "DEVICE_ID_HERE", "t": ""} |
188 | 136 |
|
189 | | - result = d.set_white(255, 255) |
190 | | - result = result.decode(mock_byte_encoding) |
191 | | - result = json.loads(result) |
| 137 | + # assert |
| 138 | + self.assertEqual(result_cmd, expected_cmd) |
| 139 | + self.assertDictEqual(result_payload, expected_payload) |
192 | 140 |
|
193 | | - self.assertEqual(result['test_result'], "SUCCESS") |
194 | 141 |
|
195 | 142 | if __name__ == '__main__': |
196 | 143 | unittest.main() |
0 commit comments