Skip to content

Commit 598bb01

Browse files
Christoph BauerChristoph Bauer
authored andcommitted
Warnings fixed
1 parent ea195bf commit 598bb01

File tree

8 files changed

+118
-96
lines changed

8 files changed

+118
-96
lines changed

src/NetMQ.Tests/MessageTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void Issue52_ReqToRouterBug()
128128

129129
var msg = router.ReceiveMultipartMessage();
130130
Assert.Equal(3, msg.FrameCount);
131-
Assert.Equal(msg[2].ConvertToString(), testmessage);
131+
Assert.Equal(testmessage, msg[2].ConvertToString());
132132
}
133133
}
134134

src/NetMQ.Tests/NetMQMonitorTests.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ public void Monitoring()
5757
}
5858
}
5959

60-
#if !NET35
60+
6161
[Fact]
62-
public void StartAsync()
62+
public async Task StartAsync()
6363
{
6464
using (var rep = new ResponseSocket())
6565
using (var monitor = new NetMQMonitor(rep, "inproc://foo", SocketEvents.Closed))
@@ -68,10 +68,11 @@ public void StartAsync()
6868
Thread.Sleep(200);
6969
Assert.Equal(TaskStatus.Running, task.Status);
7070
monitor.Stop();
71-
Assert.True(task.Wait(TimeSpan.FromMilliseconds(1000)));
71+
var completedTask = await Task.WhenAny(task, Task.Delay(1000));
72+
Assert.Equal(task, completedTask);
7273
}
7374
}
74-
#endif
75+
7576

7677
[Fact]
7778
public void NoHangWhenMonitoringUnboundInprocAddress()
@@ -126,7 +127,7 @@ public void ErrorCodeTest()
126127
}
127128

128129
[Fact]
129-
public void MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket()
130+
public async Task MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket()
130131
{
131132
// The bug:
132133
// Given we monitor a netmq tcp socket
@@ -141,7 +142,7 @@ public void MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket()
141142
using (var req = new RequestSocket())
142143
{
143144
monitor = new NetMQMonitor(req, "inproc://#monitor", SocketEvents.All);
144-
Task.Factory.StartNew(monitor.Start);
145+
_ = Task.Factory.StartNew(monitor.Start);
145146

146147
// Bug only occurs when monitoring a tcp socket
147148
var port = res.BindRandomPort("tcp://127.0.0.1");
@@ -152,10 +153,12 @@ public void MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket()
152153
res.SendFrame("response");
153154
Assert.Equal("response", req.ReceiveFrameString());
154155
}
155-
Thread.Sleep(100);
156+
157+
await Task.Delay(100);
156158
// Monitor.Dispose should complete
157-
var completed = Task.Factory.StartNew(() => monitor.Dispose()).Wait(1000);
158-
Assert.True(completed);
159+
var task = Task.Factory.StartNew(() => monitor.Dispose());
160+
var completedTask = await Task.WhenAny(task, Task.Delay(1000));
161+
Assert.Equal(task, completedTask);
159162
}
160163
// NOTE If this test fails, it will hang because context.Dispose will block
161164
}

src/NetMQ.Tests/NetMQPollerTest.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#if !NET35
1212
using System.Collections.Concurrent;
13+
using System.Collections.Generic;
1314
#endif
1415

1516
// ReSharper disable AccessToDisposedClosure
@@ -865,7 +866,7 @@ public void NativeSocket()
865866

866867
#if !NET35
867868
[Fact]
868-
public void OneTask()
869+
public async Task OneTask()
869870
{
870871
bool triggered = false;
871872

@@ -879,27 +880,27 @@ public void OneTask()
879880
Assert.True(poller.CanExecuteTaskInline, "Should be on NetMQPoller thread");
880881
});
881882
task.Start(poller);
882-
task.Wait();
883+
await task;
883884

884885
Assert.True(triggered);
885886
}
886887
}
887888

888889
[Fact]
889-
public void SetsCurrentTaskScheduler()
890+
public async Task SetsCurrentTaskScheduler()
890891
{
891892
using (var poller = new NetMQPoller())
892893
{
893894
poller.RunAsync();
894895

895896
var task = new Task(() => Assert.Same(TaskScheduler.Current, poller));
896897
task.Start(poller);
897-
task.Wait();
898+
await task;
898899
}
899900
}
900901

901902
[Fact]
902-
public void CanExecuteTaskInline()
903+
public async Task CanExecuteTaskInline()
903904
{
904905
using (var poller = new NetMQPoller())
905906
{
@@ -911,12 +912,12 @@ public void CanExecuteTaskInline()
911912

912913
var task = new Task(() => Assert.True(poller.CanExecuteTaskInline));
913914
task.Start(poller);
914-
task.Wait();
915+
await task;
915916
}
916917
}
917918

918919
[Fact]
919-
public void ContinueWith()
920+
public async Task ContinueWith()
920921
{
921922
int threadId1 = 0;
922923
int threadId2 = 1;
@@ -941,8 +942,7 @@ public void ContinueWith()
941942
}, poller);
942943

943944
task.Start(poller);
944-
task.Wait();
945-
task2.Wait();
945+
await Task.WhenAll(new List<Task>{task, task2});
946946

947947
Assert.Equal(threadId1, threadId2);
948948
Assert.Equal(1, runCount1);
@@ -951,7 +951,7 @@ public void ContinueWith()
951951
}
952952

953953
[Fact]
954-
public void TwoThreads()
954+
public async Task TwoThreads()
955955
{
956956
int count1 = 0;
957957
int count2 = 0;
@@ -982,9 +982,8 @@ public void TwoThreads()
982982
}
983983
});
984984

985-
t1.Wait(1000);
986-
t2.Wait(1000);
987-
Task.WaitAll(allTasks.ToArray(), 1000);
985+
await Task.WhenAny(Task.WhenAll(new List<Task>{ t1, t2 }), Task.Delay(1000));
986+
await Task.WhenAny(Task.WhenAll(allTasks.ToArray()), Task.Delay(1000));
988987

989988
Assert.Equal(100, count1);
990989
Assert.Equal(100, count2);

src/NetMQ.Tests/XPubSubTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public void Manual()
332332
sub.SendFrame(new byte[] { 1, (byte)'A' });
333333
var subscription = pub.ReceiveFrameBytes();
334334

335-
Assert.Equal(subscription[1], (byte)'A');
335+
Assert.Equal((byte)'A', subscription[1]);
336336

337337
pub.Subscribe("B");
338338
pub.SendFrame("A");
@@ -356,7 +356,7 @@ public void WelcomeMessage()
356356

357357
var subscription = pub.ReceiveFrameBytes();
358358

359-
Assert.Equal(subscription[1], (byte)'W');
359+
Assert.Equal((byte)'W', subscription[1]);
360360

361361
Assert.Equal("W", sub.ReceiveFrameString());
362362
}
@@ -377,7 +377,7 @@ public void ClearWelcomeMessage()
377377

378378
var subscription = pub.ReceiveFrameBytes();
379379

380-
Assert.Equal(subscription[1], (byte)'W');
380+
Assert.Equal( (byte)'W', subscription[1]);
381381

382382
Assert.False(sub.TrySkipFrame());
383383
}

src/NetMQ/Core/Transports/Tcp/TcpConnector.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private void StartConnecting()
206206
// TerminatingException can occur in above call to EventConnectDelayed via
207207
// MonitorEvent.Write if corresponding PairSocket has been sent Term command
208208
catch (TerminatingException)
209-
{}
209+
{ }
210210
}
211211

212212
/// <summary>
@@ -236,9 +236,12 @@ public void OutCompleted(SocketError socketError, int bytesTransferred)
236236
m_ioObject.RemoveSocket(m_s);
237237
m_handleValid = false;
238238

239-
try {
239+
try
240+
{
240241
m_s.NoDelay = true;
241-
} catch (ArgumentException) {
242+
}
243+
catch (ArgumentException)
244+
{
242245
// OSX sometime fail while the socket is still connecting
243246
}
244247

@@ -259,7 +262,16 @@ public void OutCompleted(SocketError socketError, int bytesTransferred)
259262
bytes.PutInteger(endian, m_options.TcpKeepaliveIdle, 4);
260263
bytes.PutInteger(endian, m_options.TcpKeepaliveIntvl, 8);
261264

265+
bytes.PutInteger(endian, m_options.TcpKeepaliveIntvl, 8);
266+
#if NET
267+
if (!OperatingSystem.IsWindows())
268+
{
269+
throw new InvalidOperationException("Not supported on you platform"); // There is a pull request for .net8.0
270+
271+
}
272+
#endif
262273
m_s.IOControl(IOControlCode.KeepAliveValues, (byte[])bytes, null);
274+
263275
}
264276
}
265277

src/NetMQ/Core/Transports/Tcp/TcpListener.cs

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ internal class TcpListener : Own, IProactorEvents
4545
/// </summary>
4646
private AsyncSocket? m_handle;
4747

48-
/*
49-
/// <summary>
50-
/// socket being accepted
51-
/// </summary>
52-
private AsyncSocket m_acceptedSocket;
53-
*/
48+
/*
49+
/// <summary>
50+
/// socket being accepted
51+
/// </summary>
52+
private AsyncSocket m_acceptedSocket;
53+
*/
5454

5555
/// <summary>
5656
/// Socket the listener belongs to.
@@ -107,7 +107,7 @@ protected override void ProcessPlug()
107107
protected override void ProcessTerm(int linger)
108108
{
109109
Assumes.NotNull(m_handle);
110-
110+
111111
m_ioObject.SetHandler(this);
112112
m_ioObject.RemoveSocket(m_handle);
113113
Close();
@@ -123,7 +123,7 @@ public virtual void SetAddress(string addr)
123123
m_address.Resolve(addr, m_options.IPv4Only);
124124

125125
Assumes.NotNull(m_address.Address);
126-
126+
127127

128128
try
129129
{
@@ -195,68 +195,75 @@ public void InCompleted(SocketError socketError, int bytesTransferred)
195195
switch (socketError)
196196
{
197197
case SocketError.Success:
198-
{
199-
// TODO: check TcpFilters
200-
var acceptedSocket = m_handle.GetAcceptedSocket();
198+
{
199+
// TODO: check TcpFilters
200+
var acceptedSocket = m_handle.GetAcceptedSocket();
201201

202202
acceptedSocket.NoDelay = true;
203203

204-
if (m_options.TcpKeepalive != -1)
205-
{
206-
acceptedSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, m_options.TcpKeepalive);
207-
208-
if (m_options.TcpKeepaliveIdle != -1 && m_options.TcpKeepaliveIntvl != -1)
204+
if (m_options.TcpKeepalive != -1)
209205
{
210-
var bytes = new ByteArraySegment(new byte[12]);
206+
acceptedSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, m_options.TcpKeepalive);
207+
208+
if (m_options.TcpKeepaliveIdle != -1 && m_options.TcpKeepaliveIntvl != -1)
209+
{
210+
var bytes = new ByteArraySegment(new byte[12]);
211+
212+
Endianness endian = BitConverter.IsLittleEndian ? Endianness.Little : Endianness.Big;
211213

212-
Endianness endian = BitConverter.IsLittleEndian ? Endianness.Little : Endianness.Big;
214+
bytes.PutInteger(endian, m_options.TcpKeepalive, 0);
215+
bytes.PutInteger(endian, m_options.TcpKeepaliveIdle, 4);
216+
bytes.PutInteger(endian, m_options.TcpKeepaliveIntvl, 8);
217+
#if NET
218+
if (!OperatingSystem.IsWindows())
219+
{
220+
throw new InvalidOperationException("Not supported on you platform"); // There is a pull request for .net8.0
213221

214-
bytes.PutInteger(endian, m_options.TcpKeepalive, 0);
215-
bytes.PutInteger(endian, m_options.TcpKeepaliveIdle, 4);
216-
bytes.PutInteger(endian, m_options.TcpKeepaliveIntvl, 8);
222+
}
223+
#endif
224+
acceptedSocket.IOControl(IOControlCode.KeepAliveValues, (byte[])bytes, null);
217225

218-
acceptedSocket.IOControl(IOControlCode.KeepAliveValues, (byte[])bytes, null);
226+
}
219227
}
220-
}
221228

222-
// Create the engine object for this connection.
223-
var engine = new StreamEngine(acceptedSocket, m_options, m_endpoint);
229+
// Create the engine object for this connection.
230+
var engine = new StreamEngine(acceptedSocket, m_options, m_endpoint);
224231

225-
// Choose I/O thread to run connector in. Given that we are already
226-
// running in an I/O thread, there must be at least one available.
227-
IOThread? ioThread = ChooseIOThread(m_options.Affinity);
232+
// Choose I/O thread to run connector in. Given that we are already
233+
// running in an I/O thread, there must be at least one available.
234+
IOThread? ioThread = ChooseIOThread(m_options.Affinity);
228235

229-
Assumes.NotNull(ioThread);
236+
Assumes.NotNull(ioThread);
230237

231-
// Create and launch a session object.
232-
// TODO: send null in address parameter, is unneeded in this case
233-
SessionBase session = SessionBase.Create(ioThread, false, m_socket, m_options, new Address(m_handle.LocalEndPoint));
234-
session.IncSeqnum();
235-
LaunchChild(session);
238+
// Create and launch a session object.
239+
// TODO: send null in address parameter, is unneeded in this case
240+
SessionBase session = SessionBase.Create(ioThread, false, m_socket, m_options, new Address(m_handle.LocalEndPoint));
241+
session.IncSeqnum();
242+
LaunchChild(session);
236243

237-
SendAttach(session, engine, false);
244+
SendAttach(session, engine, false);
238245

239-
m_socket.EventAccepted(m_endpoint, acceptedSocket);
246+
m_socket.EventAccepted(m_endpoint, acceptedSocket);
240247

241-
Accept();
242-
break;
243-
}
248+
Accept();
249+
break;
250+
}
244251
case SocketError.ConnectionReset:
245252
case SocketError.NoBufferSpaceAvailable:
246253
case SocketError.TooManyOpenSockets:
247-
{
248-
m_socket.EventAcceptFailed(m_endpoint, socketError.ToErrorCode());
254+
{
255+
m_socket.EventAcceptFailed(m_endpoint, socketError.ToErrorCode());
249256

250-
Accept();
251-
break;
252-
}
257+
Accept();
258+
break;
259+
}
253260
default:
254-
{
255-
NetMQException exception = NetMQException.Create(socketError);
261+
{
262+
NetMQException exception = NetMQException.Create(socketError);
256263

257-
m_socket.EventAcceptFailed(m_endpoint, exception.ErrorCode);
258-
throw exception;
259-
}
264+
m_socket.EventAcceptFailed(m_endpoint, exception.ErrorCode);
265+
throw exception;
266+
}
260267
}
261268
}
262269

0 commit comments

Comments
 (0)