Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/main/java/org/lwes/listener/DatagramDequeuer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.io.IOException;
import java.net.DatagramPacket;
import java.util.concurrent.TimeUnit;

public class DatagramDequeuer extends ThreadedDequeuer {

private static transient Log log = LogFactory.getLog(DatagramDequeuer.class);

private boolean running = false;
private volatile boolean running = false;

/* an event factory */
private EventFactory factory = new EventFactory();
Expand All @@ -40,6 +41,7 @@ public void initialize() throws IOException {

@Override
public synchronized void shutdown() {
super.shutdown();
running = false;
}

Expand All @@ -50,11 +52,12 @@ public void run() {
while (running) {
try {
QueueElement element = null;
element = queue.take();
if (log.isTraceEnabled()) {
log.trace("Removed from queue: " + element);
}
handleElement((DatagramQueueElement) element);
if ((element = queue.poll(1,TimeUnit.SECONDS)) != null) {
if (log.isTraceEnabled()) {
log.trace("Removed from queue: " + element);
}
handleElement((DatagramQueueElement) element);
}
}
catch (UnsupportedOperationException uoe) {
// not a problem, someone grabbed the event before we did
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/lwes/listener/DatagramEnqueuer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.SocketTimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -50,7 +51,7 @@ public class DatagramEnqueuer extends ThreadedEnqueuer {
protected byte[] buffer = null;

/* thread control */
protected boolean running = false;
protected volatile boolean running = false;

public DatagramEnqueuer() {
super();
Expand Down Expand Up @@ -163,6 +164,7 @@ public void initialize() throws IOException {
bufSize = Integer.parseInt(bufSizeStr);
}
socket.setReceiveBufferSize(bufSize);
socket.setSoTimeout(1000);
}

@Override
Expand All @@ -181,7 +183,11 @@ public void run() {
while (running) {
try {
DatagramPacket datagram = new DatagramPacket(buffer, buffer.length);
socket.receive(datagram);
try {
socket.receive(datagram);
} catch (SocketTimeoutException ste) {
continue;
}
if (log.isTraceEnabled()) {
log.trace("Received datagram: " + datagram);
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/lwes/listener/ThreadedDequeuer.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public void initialize() throws IOException {
* ThreadedDequeuer.
*/
public void shutdown() {
synchronized (idleProcessors) {
for (ThreadedEventDispatcher d : idleProcessors) {
d.shutdown();
}
}
}

/**
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/lwes/listener/ThreadedEventDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ThreadedEventDispatcher extends Thread {
private ThreadedDequeuer dequeuer;
private EventHandler eventHandler;
private Event event;
private volatile boolean stop = false;

protected ThreadedEventDispatcher(ThreadedDequeuer aDequeuer) {
this.dequeuer = aDequeuer;
Expand Down Expand Up @@ -61,7 +62,7 @@ public final boolean isIdle() {

@Override
public void run() {
while(true) {
while(!stop) {
synchronized(this) {
if(isActive()) {
try {
Expand All @@ -72,7 +73,7 @@ public void run() {
clearTask();
} else {
try {
wait();
wait(1000);
} catch(InterruptedException e) {}
}
}
Expand All @@ -88,4 +89,8 @@ private void clearTask() {
dequeuer.makeAvailable(this);
}

public void shutdown() {
stop = true;
}

}
33 changes: 33 additions & 0 deletions src/test/java/org/lwes/listener/TestListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import static java.lang.System.out;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.junit.Ignore;
import org.lwes.listener.DatagramEventListener;
import org.lwes.listener.EventHandler;

/*
* Not a unit test. Demonstrates thread leaks in threaded listeners.
* Before reaching 100 will be unable to spawn new threads with default JVM settings.
* Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
*/

@Ignore
public class TestListener {

public static void main(String[] args) throws InterruptedException, UnknownHostException {
int count = 0;
while (true) {
DatagramEventListener listener;
listener = new DatagramEventListener();
listener.setAddress(InetAddress.getByName("224.0.0.69"));
listener.setPort(9191);
listener.setQueueSize(50000);
listener.initialize();
out.println(count);
Thread.currentThread().sleep(1000);
listener.shutdown();
count++;
}
}

}