diff --git a/README.md b/README.md
index b4df69a..e60d566 100644
--- a/README.md
+++ b/README.md
@@ -74,3 +74,48 @@ Example of agent named agent
agent.sinks.es_sink.es.serializer.csv.delimiter=,
agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc
````
+
+#### Available serializers
+##### com.cognitree.flume.sink.elasticsearch.SimpleSerializer
+This Serializer assumes the event body to be in JSON format
+##### com.cognitree.flume.sink.elasticsearch.CsvSerializer
+This Serializer assumes the event body to be in CSV format with custom delimiter specified.
+
+| Property Name | Default | Description |
+|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------|
+| es.serializer.csv.fields | - | Must be specified, fires exception otherwise. |
+| es.serializer.csv.delimiter | , | Delimiter for *es.serializer.csv.fields* property |
+
+##### com.cognitree.flume.sink.elasticsearch.HeaderBasedSerializer
+This Serializer assumes the event body as the main field and event headers as additional fields for json document.
+
+| Property Name | Default | Description |
+|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------|
+| es.serializer.headerBased.bodyField.name | message | Specifies the name of event body field in resulting json document |
+| es.serializer.headerBased.fields | - | Must be specified, fires exception otherwise. Specifies data types of event headers.
Example: _timestamp:long,host:string,index:int_.
Supported data types: *string*, *int*, *long*, *float*, *boolean*|
+
+So, if you have an event such as:
+
+
+ ---------------------------
+ | Headers |
+ ---------------------------
+ | host: localhost |
+ | timestamp: 100000000000 |
+ | index: 1 |
+ ---------------------------
+ | Body |
+ ---------------------------
+ | Lorem ipsum dolor sit |
+ | amet, consectetur |
+ | adipiscing elit |
+ ---------------------------
+then Serializer will produce json doc like this:
+```json
+{
+ "host": "localhost",
+ "timestamp": 100000000000,
+ "index": 1,
+ "message": "Lorem ipsum dolor sit amet, consectetur adipiscing elit"
+}
+```
diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java
index ad92081..55865e0 100644
--- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java
+++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java
@@ -85,6 +85,11 @@ public class Constants {
public static final String ES_AVRO_SCHEMA_FILE = "es.serializer.avro.schema.file";
+ public static final String ES_HEADERBASED_BODY_FIELD_NAME = "es.serializer.headerBased.bodyField.name";
+ public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME = "message";
+ public static final String ES_HEADERBASED_FIELDS = "es.serializer.headerBased.fields";
+ public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE = "string";
+
/**
* This enum is used for the time unit
*
@@ -146,6 +151,7 @@ public String toString() {
public enum FieldTypeEnum {
STRING("string"),
INT("int"),
+ LONG("long"),
FLOAT("float"),
BOOLEAN("boolean");
diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java
new file mode 100644
index 0000000..3c8f784
--- /dev/null
+++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java
@@ -0,0 +1,84 @@
+package com.cognitree.flume.sink.elasticsearch;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static com.cognitree.flume.sink.elasticsearch.Constants.*;
+
+public class HeaderBasedSerializer implements Serializer {
+ private static final Logger logger = LoggerFactory.getLogger(HeaderBasedSerializer.class);
+
+ private final List names = new ArrayList();
+
+ private final List types = new ArrayList();
+
+ private String bodyFieldName;
+
+ public XContentBuilder serialize(Event event) {
+ Map headers = event.getHeaders();
+ String body = new String(event.getBody(), Charsets.UTF_8);
+
+ XContentBuilder xContentBuilder = null;
+
+ try {
+ if (!names.isEmpty() && !types.isEmpty()) {
+ xContentBuilder = jsonBuilder().startObject();
+
+ for (int i = 0; i < names.size(); i++) {
+ String name = names.get(i);
+ Util.addField(xContentBuilder, name, headers.get(name), types.get(i));
+ }
+
+ Util.addField(xContentBuilder, bodyFieldName, body, DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE);
+
+ xContentBuilder.endObject();
+ } else {
+ logger.error("Fields for headers based serializer are not configured, " +
+ "please configured the property " + ES_HEADERBASED_FIELDS);
+ }
+ } catch (IOException e) {
+ logger.error("Error in converting the event to the json format " + e.getMessage(), e);
+ }
+
+ return xContentBuilder;
+ }
+
+ public void configure(Context context) {
+ bodyFieldName = context.getString(ES_HEADERBASED_BODY_FIELD_NAME, DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME);
+
+ String fields = context.getString(ES_HEADERBASED_FIELDS);
+ if(fields == null) {
+ Throwables.propagate(new Exception("Fields for headers based serializer are not configured," +
+ " please configured the property " + ES_HEADERBASED_FIELDS));
+ }
+ try {
+ String[] fieldTypes = fields.split(COMMA);
+ for (String fieldType : fieldTypes) {
+ names.add(getValue(fieldType, 0));
+ types.add(getValue(fieldType, 1));
+ }
+ } catch(Exception e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ private String getValue(String fieldType, Integer index) {
+ String value = "";
+ if (fieldType.length() > index) {
+ value = fieldType.split(COLONS)[index];
+ }
+ return value;
+ }
+
+}
diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java
index dfbcdbe..0978cd5 100644
--- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java
+++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java
@@ -114,6 +114,9 @@ public static void addField(XContentBuilder xContentBuilder,String key, String v
case INT:
xContentBuilder.field(key, Integer.parseInt(value));
break;
+ case LONG:
+ xContentBuilder.field(key, Long.parseLong(value));
+ break;
case BOOLEAN:
xContentBuilder.field(key, Boolean.valueOf(value));
break;
diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java
new file mode 100644
index 0000000..f0a3388
--- /dev/null
+++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java
@@ -0,0 +1,66 @@
+package com.cognitree.flume.sink.elasticsearch;
+
+import com.google.gson.JsonParser;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_FIELDS;
+import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_BODY_FIELD_NAME;
+import static org.junit.Assert.*;
+
+public class TestHeaderBasedSerializer {
+ private HeaderBasedSerializer headerBasedSerializer;
+
+ private static final Charset charset = Charset.defaultCharset();
+
+ private String message = "Lorem ipsum dolor sit amet";
+
+ @Before
+ public void init() throws Exception {
+ headerBasedSerializer = new HeaderBasedSerializer();
+ }
+
+ @Test
+ public void testSerializer() throws IOException {
+ Context context = new Context();
+ context.put(ES_HEADERBASED_BODY_FIELD_NAME, "message");
+ context.put(ES_HEADERBASED_FIELDS, "id:int,name:string,datetime:string,@timestamp:long");
+
+ Map headers = new HashMap();
+ headers.put("id", "1");
+ headers.put("name", "test");
+ headers.put("datetime", "2018-12-12 12:42:42.424");
+ headers.put("@timestamp", "1544607762424");
+
+ Event event = EventBuilder.withBody(message, charset, headers);
+
+ headerBasedSerializer.configure(context);
+
+ XContentBuilder expected = generateContentBuilder();
+ XContentBuilder actual = headerBasedSerializer.serialize(event);
+
+ JsonParser parser = new JsonParser();
+ assertEquals(parser.parse(expected.string()), parser.parse(actual.string()));
+ }
+
+ private XContentBuilder generateContentBuilder() throws IOException {
+ XContentBuilder expected = jsonBuilder().startObject();
+ expected.field("id", 1);
+ expected.field("name", "test");
+ expected.field("datetime", "2018-12-12 12:42:42.424");
+ expected.field("@timestamp", 1544607762424L);
+ expected.field("message", message);
+ expected.endObject();
+ return expected;
+ }
+}
\ No newline at end of file