Skip to content

Commit 9119368

Browse files
authored
Use selector when reading from socket (#203)
1 parent aabfe18 commit 9119368

File tree

4 files changed

+79
-184
lines changed

4 files changed

+79
-184
lines changed

test/stub.py

Lines changed: 0 additions & 74 deletions
This file was deleted.

test/test_connection.py

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from test.UdsTest import UdsTest
22
from udsoncan.connections import *
3-
from test.stub import StubbedIsoTPSocket
43
import socket
54
import threading
65
import time
@@ -32,13 +31,13 @@
3231
_AISOTP_POSSIBLE = False
3332

3433

34+
@unittest.skipIf(_isotp_module_available == False, "isotp module not available")
3535
class TestIsoTPSocketConnection(UdsTest):
3636

3737
def setUp(self):
38-
self.tpsock1 = StubbedIsoTPSocket(timeout=0.1)
39-
self.tpsock2 = StubbedIsoTPSocket(timeout=0.1)
38+
self.tpsock1 = isotp.socket()
39+
self.tpsock2 = isotp.socket()
4040

41-
@unittest.skipIf(_isotp_module_available == False, "Missing isotp module")
4241
def test_open(self):
4342
addr = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x001, txid=0x002)
4443
conn = IsoTPSocketConnection(interface='vcan0', address=addr, tpsock=self.tpsock1, name='unittest')
@@ -48,7 +47,6 @@ def test_open(self):
4847
conn.close()
4948
self.assertFalse(conn.is_open())
5049

51-
@unittest.skipIf(_isotp_module_available == False, "Missing isotp module")
5250
def test_transmit(self):
5351
addr1 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x100, txid=0x101)
5452
addr2 = isotp.Address(isotp.AddressingMode.Normal_11bits, rxid=0x101, txid=0x100)
@@ -62,14 +60,18 @@ def test_transmit(self):
6260
payload2 = conn2.wait_frame(timeout=0.3, exception=True)
6361
self.assertEqual(payload1, payload2)
6462

63+
def tearDown(self) -> None:
64+
self.tpsock1.close()
65+
self.tpsock2.close()
66+
6567

6668
class TestSocketConnection(UdsTest):
6769
def server_sock_thread_task(self):
68-
self.thread_started = True
70+
self.started_event.set()
6971
self.sock1, addr = self.server_sock.accept()
7072

7173
def setUp(self):
72-
self.thread_started = False
74+
self.started_event = threading.Event()
7375
self.server_sock_thread = threading.Thread(target=self.server_sock_thread_task)
7476

7577
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -83,17 +85,15 @@ def setUp(self):
8385
self.server_sock.listen(1)
8486
self.server_sock_thread.start()
8587

86-
t1 = time.time()
87-
while not self.thread_started:
88-
if (time.time() - t1) > 0.5:
89-
raise RuntimeError('Timeout while connecting sockets together.')
90-
time.sleep(0.01)
91-
time.sleep(0.01)
88+
self.started_event.wait(0.5)
89+
if not self.started_event.is_set():
90+
raise RuntimeError('Timeout while connecting sockets together.')
91+
time.sleep(0.01) # Handle race condition like an amateur
9292

9393
self.sock2.connect(self.server_sock.getsockname())
94-
t1 = time.time()
94+
t1 = time.monotonic()
9595
while self.sock1 is None:
96-
if (time.time() - t1) > 0.5:
96+
if (time.monotonic() - t1) > 0.5:
9797
raise RuntimeError('Timeout while connecting sockets together.')
9898

9999
def tearDown(self):
@@ -126,6 +126,55 @@ def test_transmit(self):
126126
self.assertEqual(payload1, payload2)
127127

128128

129+
class TestSocketConnectionBlocking(UdsTest):
130+
def server_sock_thread_task(self):
131+
self.started_event.set()
132+
# Race condition here.
133+
self.sock1, addr = self.server_sock.accept()
134+
135+
def setUp(self):
136+
self.started_event = threading.Event()
137+
self.server_sock_thread = threading.Thread(target=self.server_sock_thread_task)
138+
139+
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
140+
self.server_sock.setblocking(True)
141+
self.sock1 = None
142+
self.sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
143+
144+
self.server_sock.bind(('127.0.0.1', 0))
145+
self.server_sock.listen(1)
146+
self.server_sock_thread.start()
147+
148+
self.started_event.wait(0.5)
149+
if not self.started_event.is_set():
150+
raise RuntimeError('Timeout while connecting sockets together.')
151+
time.sleep(0.01) # Handle race condition like an amateur
152+
153+
self.sock2.connect(self.server_sock.getsockname())
154+
t1 = time.monotonic()
155+
while self.sock1 is None:
156+
if (time.monotonic() - t1) > 0.5:
157+
raise RuntimeError('Timeout while connecting sockets together.')
158+
159+
def test_open_close_no_block(self):
160+
conn = SocketConnection(self.sock1, name='unittest')
161+
self.assertFalse(conn.is_open())
162+
conn.open()
163+
self.assertTrue(conn.is_open())
164+
conn.close()
165+
self.assertFalse(conn.is_open())
166+
167+
def tearDown(self):
168+
if isinstance(self.sock1, socket.socket):
169+
self.sock1.close()
170+
171+
if isinstance(self.sock2, socket.socket):
172+
self.sock2.close()
173+
174+
if isinstance(self.server_sock, socket.socket):
175+
self.server_sock.close()
176+
177+
129178
class TestQueueConnection(UdsTest):
130179
def setUp(self):
131180
self.conn = QueueConnection(name='unittest')

test/test_stubbed_isotpsock.py

Lines changed: 0 additions & 84 deletions
This file was deleted.

udsoncan/connections.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99
from typing import Union, Any, Dict
1010
import ctypes
11+
import selectors
1112

1213
try:
1314
import can # type:ignore
@@ -198,7 +199,6 @@ def __init__(self, sock: socket.socket, bufsize: int = 4095, name: Optional[str]
198199
self.opened = False
199200
self.rxthread = None
200201
self.sock = sock
201-
self.sock.settimeout(0.1) # for recv
202202
self.bufsize = bufsize
203203

204204
def open(self) -> "SocketConnection":
@@ -219,13 +219,15 @@ def is_open(self) -> bool:
219219
return self.opened
220220

221221
def rxthread_task(self) -> None:
222+
sel = selectors.DefaultSelector()
223+
sel.register(self.sock, selectors.EVENT_READ)
222224
while not self.exit_requested:
223225
try:
224-
data = self.sock.recv(self.bufsize)
225-
if data is not None:
226-
self.rxqueue.put(data)
227-
except socket.timeout:
228-
pass
226+
events = sel.select(timeout=0.2)
227+
if events:
228+
data = self.sock.recv(self.bufsize)
229+
if data is not None:
230+
self.rxqueue.put(data)
229231
except Exception:
230232
self.exit_requested = True
231233

@@ -333,13 +335,15 @@ def is_open(self) -> bool:
333335
return self.tpsock.bound
334336

335337
def rxthread_task(self) -> None:
338+
sel = selectors.DefaultSelector()
339+
sel.register(self.tpsock._socket, selectors.EVENT_READ)
336340
while not self.exit_requested:
337341
try:
338-
data = self.tpsock.recv()
339-
if data is not None:
340-
self.rxqueue.put(data)
341-
except socket.timeout:
342-
pass
342+
events = sel.select(timeout=0.2)
343+
if events:
344+
data = self.tpsock.recv()
345+
if data is not None:
346+
self.rxqueue.put(data)
343347
except Exception:
344348
self.exit_requested = True
345349

0 commit comments

Comments
 (0)