diff --git a/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java
new file mode 100644
index 0000000..a3afbd7
--- /dev/null
+++ b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java
@@ -0,0 +1,139 @@
+
+package org.fluentd.logger.sender;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.fluentd.logger.errorhandler.ErrorHandler;
+import org.fluentd.logger.sender.ExponentialDelayReconnector;
+import org.fluentd.logger.sender.RawSocketSender;
+import org.fluentd.logger.sender.Reconnector;
+import org.fluentd.logger.sender.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An asynchronous wrapper around RawSocketSender
+ *
+ * This feature is highly experimental.
+ *
+ * @author mxk
+ *
+ */
+public class AsyncRawSocketSender implements Sender {
+
+ private final class EmitRunnable implements Runnable {
+ private final String tag;
+ private final Map data;
+ private final RawSocketSender sender;
+ private final long timestamp;
+
+ private EmitRunnable(String tag, Map data,
+ RawSocketSender sender, long timestamp) {
+ this.tag = tag;
+ this.data = data;
+ this.sender = sender;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public void run() {
+ sender.emit(tag, timestamp, data);
+ }
+ }
+
+ private final class FlushRunnable implements Runnable {
+ private final RawSocketSender sender;
+
+ private FlushRunnable(RawSocketSender sender) {
+ this.sender = sender;
+ }
+
+ @Override
+ public void run() {
+ sender.flush();
+ }
+ }
+
+ private RawSocketSender sender;
+ private Reconnector reconnector;
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class);
+
+ private final ExecutorService senderTask = Executors.newSingleThreadExecutor();
+
+ private static final ErrorHandler DEFAULT_ERROR_HANDLER = new ErrorHandler() {};
+
+ private ErrorHandler errorHandler = DEFAULT_ERROR_HANDLER;
+
+ public AsyncRawSocketSender() {
+ this("localhost", 24224);
+ }
+
+ public AsyncRawSocketSender(String host, int port) {
+ this(host, port, 3 * 1000, 8 * 1024 * 1024);
+ }
+
+ public AsyncRawSocketSender(String host, int port, int timeout,
+ int bufferCapacity) {
+ this(host, port, timeout, bufferCapacity,
+ new ExponentialDelayReconnector());
+ }
+
+ public AsyncRawSocketSender(String host, int port, int timeout,
+ int bufferCapacity, Reconnector reconnector) {
+ this.reconnector = reconnector;
+ this.sender = new RawSocketSender(host, port, timeout, bufferCapacity,
+ reconnector);
+ }
+
+ @Override
+ public synchronized void flush() {
+ final RawSocketSender sender = this.sender;
+ senderTask.execute(new FlushRunnable(sender));
+ }
+
+ @Override
+ public void close() {
+ sender.close();
+ }
+
+ @Override
+ public boolean emit(String tag, Map data) {
+ return emit(tag, System.currentTimeMillis() / 1000, data);
+ }
+
+ @Override
+ public boolean emit(final String tag, final long timestamp, final Map data) {
+ final RawSocketSender sender = this.sender;
+ senderTask.execute(new EmitRunnable(tag, data, sender, timestamp));
+
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return sender.getName();
+ }
+
+ @Override
+ public synchronized boolean isConnected() {
+ return sender.isConnected();
+ }
+
+ @Override
+ public void setErrorHandler(ErrorHandler errorHandler) {
+ if (errorHandler == null) {
+ throw new IllegalArgumentException("errorHandler is null");
+ }
+
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public void removeErrorHandler() {
+ this.errorHandler = DEFAULT_ERROR_HANDLER;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java
new file mode 100644
index 0000000..69736b0
--- /dev/null
+++ b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java
@@ -0,0 +1,426 @@
+package org.fluentd.logger.sender;
+import org.fluentd.logger.util.MockFluentd;
+import org.fluentd.logger.util.MockFluentd.MockProcess;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.msgpack.MessagePack;
+import org.msgpack.unpacker.Unpacker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestAsyncRawSocketSender {
+
+ @Test
+ public void testNormal01() throws Exception {
+ // start mock fluentd
+ int port = MockFluentd.randomPort();
+ final List elist = new ArrayList();
+ MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
+ public void process(MessagePack msgpack, Socket socket) throws IOException {
+ BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
+ try {
+ Unpacker unpacker = msgpack.createUnpacker(in);
+ while (true) {
+ Event e = unpacker.read(Event.class);
+ elist.add(e);
+ }
+ //socket.close();
+ } catch (EOFException e) {
+ // ignore
+ }
+ }
+ });
+ fluentd.start();
+ fluentd.waitUntilReady();
+
+ // start asyncSenders
+ Sender asyncSender = new AsyncRawSocketSender("localhost", port);
+ Map data = new HashMap();
+ data.put("t1k1", "t1v1");
+ data.put("t1k2", "t1v2");
+ asyncSender.emit("tag.label1", data);
+
+ Map data2 = new HashMap();
+ data2.put("t2k1", "t2v1");
+ data2.put("t2k2", "t2v2");
+ asyncSender.emit("tag.label2", data2);
+
+ // close asyncSender sockets
+ asyncSender.close();
+
+ // wait for unpacking event data on fluentd
+ Thread.sleep(2000);
+
+ // close mock server sockets
+ fluentd.close();
+
+
+ // check data
+ assertEquals(2, elist.size());
+ {
+ Event e = elist.get(0);
+ assertEquals("tag.label1", e.tag);
+ assertEquals("t1v1", e.data.get("t1k1"));
+ assertEquals("t1v2", e.data.get("t1k2"));
+ }
+ {
+ Event e = elist.get(1);
+ assertEquals("tag.label2", e.tag);
+ assertEquals("t2v1", e.data.get("t2k1"));
+ assertEquals("t2v2", e.data.get("t2k2"));
+ }
+ }
+
+
+
+ @Test
+ public void testNormal02() throws Exception {
+ // start mock fluentd
+ int port = MockFluentd.randomPort(); // Use a random port available
+ final List elist = new ArrayList();
+ MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
+ public void process(MessagePack msgpack, Socket socket) throws IOException {
+ BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
+ try {
+ Unpacker unpacker = msgpack.createUnpacker(in);
+ while (true) {
+ Event e = unpacker.read(Event.class);
+ elist.add(e);
+ }
+ //socket.close();
+ } catch (EOFException e) {
+ // ignore
+ }
+ }
+ });
+ fluentd.start();
+ fluentd.waitUntilReady();
+
+ // start asyncSenders
+ Sender asyncSender = new AsyncRawSocketSender("localhost", port);
+ int count = 10000;
+ for (int i = 0; i < count; i++) {
+ String tag = "tag:i";
+ Map record = new HashMap();
+ record.put("i", i);
+ record.put("n", "name:" + i);
+ asyncSender.emit(tag, record);
+ }
+
+ // close asyncSender sockets
+ asyncSender.close();
+
+ // wait for unpacking event data on fluentd
+ Thread.sleep(2000);
+
+ // close mock server sockets
+ fluentd.close();
+
+
+ // check elist size. But, it cannot detect correct elist size because async sender runs independently.
+ final int LOOSEN_CONSTRAINTS = 5;
+ assert(count - LOOSEN_CONSTRAINTS <= elist.size()|| elist.size() < count + LOOSEN_CONSTRAINTS);
+ }
+
+ @Test
+ public void testNormal03() throws Exception {
+ // start mock fluentds
+ final MockFluentd[] fluentds = new MockFluentd[2];
+ final List[] elists = new List[2];
+ final int[] ports = new int[2];
+ ports[0] = MockFluentd.randomPort();
+ AsyncRawSocketSender asyncRawSocketSender = new AsyncRawSocketSender("localhost", ports[0]); // it should be failed to connect to fluentd
+ elists[0] = new ArrayList();
+ fluentds[0] = new MockFluentd(ports[0], new MockFluentd.MockProcess() {
+ public void process(MessagePack msgpack, Socket socket) throws IOException {
+ BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
+ try {
+ Unpacker unpacker = msgpack.createUnpacker(in);
+ while (true) {
+ Event e = unpacker.read(Event.class);
+ elists[0].add(e);
+ }
+ //socket.close();
+ } catch (EOFException e) {
+ // ignore
+ }
+ }
+ });
+ fluentds[0].start();
+ fluentds[0].waitUntilReady();
+ ports[1] = MockFluentd.randomPort();
+ elists[1] = new ArrayList();
+ fluentds[1] = new MockFluentd(ports[1], new MockFluentd.MockProcess() {
+ public void process(MessagePack msgpack, Socket socket) throws IOException {
+ BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
+ try {
+ Unpacker unpacker = msgpack.createUnpacker(in);
+ while (true) {
+ Event e = unpacker.read(Event.class);
+ elists[1].add(e);
+ }
+ //socket.close();
+ } catch (EOFException e) {
+ // ignore
+ }
+ }
+ });
+ fluentds[1].start();
+ fluentds[1].waitUntilReady();
+
+ // start AsyncSenders
+ Sender[] asyncSenders = new Sender[2];
+ int[] counts = new int[2];
+ asyncSenders[0] = asyncRawSocketSender;
+ counts[0] = 10000;
+ for (int i = 0; i < counts[0]; i++) {
+ String tag = "tag:i";
+ Map record = new HashMap();
+ record.put("i", i);
+ record.put("n", "name:" + i);
+ asyncSenders[0].emit(tag, record);
+ }
+ asyncSenders[1] = new AsyncRawSocketSender("localhost", ports[1]);
+ counts[1] = 10000;
+ for (int i = 0; i < counts[1]; i++) {
+ String tag = "tag:i";
+ Map record = new HashMap();
+ record.put("i", i);
+ record.put("n", "name:" + i);
+ asyncSenders[1].emit(tag, record);
+ }
+
+ // close sender sockets
+ asyncSenders[0].close();
+ asyncSenders[1].close();
+
+ // wait for unpacking event data on fluentd
+ Thread.sleep(2000);
+
+ // close mock server sockets
+ fluentds[0].close();
+ fluentds[1].close();
+
+
+ // check data
+ assertEquals(counts[0], elists[0].size());
+ assertEquals(counts[1], elists[1].size());
+ }
+
+ @Test
+ public void testTimeout() throws InterruptedException {
+ final AtomicBoolean socketFinished = new AtomicBoolean(false);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ AsyncRawSocketSender asyncRawSocketSender = null;
+ try {
+ // try to connect to test network
+ asyncRawSocketSender = new AsyncRawSocketSender("192.0.2.1", 24224, 200, 8 * 1024);
+ }
+ finally {
+ if (asyncRawSocketSender != null) {
+ asyncRawSocketSender.close();
+ }
+ socketFinished.set(true);
+ }
+ }
+ });
+
+ while(!socketFinished.get())
+ Thread.yield();
+
+ assertTrue(socketFinished.get());
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testBufferingAndResending() throws InterruptedException, IOException {
+ final ConcurrentLinkedQueue readEvents = new ConcurrentLinkedQueue();
+ final CountDownLatch countDownLatch = new CountDownLatch(4);
+ int port = MockFluentd.randomPort();
+ MockProcess mockProcess = new MockFluentd.MockProcess() {
+ public void process(MessagePack msgpack, Socket socket) throws IOException {
+ BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
+ try {
+ Unpacker unpacker = msgpack.createUnpacker(in);
+ while (true) {
+ Event e = unpacker.read(Event.class);
+ readEvents.add(e);
+ countDownLatch.countDown();
+ }
+ } catch (EOFException e) {
+ // e.printStackTrace();
+ }
+ }
+ };
+
+ MockFluentd fluentd = new MockFluentd(port, mockProcess);
+ fluentd.start();
+ fluentd.waitUntilReady();
+
+ Sender asyncSender = new AsyncRawSocketSender("localhost", port);
+ assertFalse(asyncSender.isConnected());
+ Map data = new HashMap();
+ data.put("key0", "v0");
+ boolean emitted1 = asyncSender.emit("tag0", data);
+ assertTrue(emitted1);
+
+ // close fluentd to make the next sending failed
+ TimeUnit.MILLISECONDS.sleep(500);
+
+ fluentd.closeClientSockets();
+
+ TimeUnit.MILLISECONDS.sleep(500);
+
+ data = new HashMap();
+ data.put("key0", "v1");
+ boolean emitted2 = asyncSender.emit("tag0", data);
+ assertTrue(emitted2);
+
+ // wait to avoid the suppression of reconnection
+ TimeUnit.MILLISECONDS.sleep(500);
+
+ data = new HashMap();
+ data.put("key0", "v2");
+ boolean emitted3 = asyncSender.emit("tag0", data);
+ assertTrue(emitted3);
+
+ data = new HashMap();
+ data.put("key0", "v3");
+ boolean emitted4 = asyncSender.emit("tag0", data);
+ assertTrue(emitted4);
+
+ countDownLatch.await(500, TimeUnit.MILLISECONDS);
+
+ asyncSender.close();
+
+ fluentd.close();
+
+ assertEquals(4, readEvents.size());
+
+ Event event = readEvents.poll();
+ assertEquals("tag0", event.tag);
+ assertEquals(1, event.data.size());
+ assertTrue(event.data.keySet().contains("key0"));
+ assertTrue(event.data.values().contains("v0"));
+
+ event = readEvents.poll();
+ assertEquals("tag0", event.tag);
+ assertEquals(1, event.data.size());
+ assertTrue(event.data.keySet().contains("key0"));
+ assertTrue(event.data.values().contains("v1"));
+
+ event = readEvents.poll();
+ assertEquals("tag0", event.tag);
+ assertEquals(1, event.data.size());
+ assertTrue(event.data.keySet().contains("key0"));
+ assertTrue(event.data.values().contains("v2"));
+
+ event = readEvents.poll();
+ assertEquals("tag0", event.tag);
+ assertEquals(1, event.data.size());
+ assertTrue(event.data.keySet().contains("key0"));
+ assertTrue(event.data.values().contains("v3"));
+ }
+
+ @Ignore @Test
+ public void testReconnectAfterBufferFull() throws Exception {
+ final CountDownLatch bufferFull = new CountDownLatch(1);
+
+ // start mock fluentd
+ int port = MockFluentd.randomPort(); // Use a random port available
+ final List elist = new ArrayList();
+ final MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
+ public void process(MessagePack msgpack, Socket socket) throws IOException {
+ try {
+ BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
+ Unpacker unpacker = msgpack.createUnpacker(in);
+ while (true) {
+ Event e = unpacker.read(Event.class);
+ elist.add(e);
+ }
+ } catch (EOFException e) {
+ // ignore
+ } finally {
+ socket.close();
+ }
+ }
+ });
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ bufferFull.await(20, TimeUnit.SECONDS);
+ fluentd.start();
+ fluentd.waitUntilReady();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // start asyncSenders
+ Sender asyncSender = new AsyncRawSocketSender("localhost", port);
+ String tag = "tag";
+ int i;
+ for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer
+ Map record = new HashMap();
+ record.put("num", i);
+ record.put("str", "name" + i);
+
+ if (bufferFull.getCount() > 0) {
+ // Fill the sender's buffer
+ // But for now, asyncSender#emit always return true....
+ if (!asyncSender.emit(tag, record)) {
+ // Buffer full. Need to recover the fluentd
+ bufferFull.countDown();
+ Thread.sleep(2000);
+ }
+ }
+ else {
+ // Flush the sender's buffer after the fluentd starts
+ asyncSender.emit(tag, record);
+ break;
+ }
+ }
+
+ // close sender sockets
+ asyncSender.close();
+
+ // wait for unpacking event data on fluentd
+ Thread.sleep(2000);
+
+ // close mock server sockets
+ fluentd.close();
+
+ // check data
+ assertEquals(0, bufferFull.getCount());
+ // check elist size. But, it cannot detect correct elist size because async sender runs independently.
+ final int LOOSEN_CONSTRAINTS = 5;
+ assert(i - LOOSEN_CONSTRAINTS <= elist.size()|| elist.size() < i + LOOSEN_CONSTRAINTS);
+ }
+}