From 3080e965ae284c479b83b9b0f940efc846879286 Mon Sep 17 00:00:00 2001 From: "a.mikka" Date: Fri, 11 Jan 2019 17:23:20 +0400 Subject: [PATCH 1/6] Add headers beased serializer --- .../elasticsearch/HeaderBasedSerializer.java | 49 +++++++++++++++ .../TestHeaderBasedSerializer.java | 59 +++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java create mode 100644 src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java new file mode 100644 index 0000000..e6cd9e5 --- /dev/null +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java @@ -0,0 +1,49 @@ +package com.cognitree.flume.sink.elasticsearch; + +import com.google.common.base.Charsets; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +public class HeaderBasedSerializer implements Serializer { + private static final Logger logger = LoggerFactory.getLogger(HeaderBasedSerializer.class); + public static final String BODY_FIELD_NAME = "es.serializer.json.bodyFieldName"; + public static final String BODY_FIELD_NAME_VALUE = "message"; + + private String bodyFieldName; + + public XContentBuilder serialize(Event event) { + Map headers = event.getHeaders(); + String body = new String(event.getBody(), Charsets.UTF_8); + + XContentBuilder xContentBuilder = null; + + try { + xContentBuilder = jsonBuilder().startObject(); + + for (Map.Entry entry: headers.entrySet()) { + xContentBuilder.field(entry.getKey(), entry.getValue()); + } + + xContentBuilder.field(bodyFieldName, body); + + xContentBuilder.endObject(); + } catch (IOException e) { + logger.error("Error in converting the event to the json format " + e.getMessage(), e); + } + + return xContentBuilder; + } + + public void configure(Context context) { + bodyFieldName = context.getString(BODY_FIELD_NAME, BODY_FIELD_NAME_VALUE); + } + +} diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java new file mode 100644 index 0000000..f8afc55 --- /dev/null +++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java @@ -0,0 +1,59 @@ +package com.cognitree.flume.sink.elasticsearch; + +import com.google.gson.JsonParser; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.*; + +public class TestHeaderBasedSerializer { + private HeaderBasedSerializer headerBasedSerializer; + + private static final Charset charset = Charset.defaultCharset(); + + private String message = "Lorem ipsum dolor sit amet"; + + @Before + public void init() throws Exception { + headerBasedSerializer = new HeaderBasedSerializer(); + } + + @Test + public void testSerializer() throws IOException { + Context context = new Context(); + context.put("es.serializer.json.bodyFieldName", "message"); + + Map headers = new HashMap(); + headers.put("id", "1"); + headers.put("name", "test"); + + Event event = EventBuilder.withBody(message, charset, headers); + + headerBasedSerializer.configure(context); + + XContentBuilder expected = generateContentBuilder(); + XContentBuilder actual = headerBasedSerializer.serialize(event); + + JsonParser parser = new JsonParser(); + assertEquals(parser.parse(expected.string()), parser.parse(actual.string())); + } + + private XContentBuilder generateContentBuilder() throws IOException { + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("id", "1"); + expected.field("name", "test"); + expected.field("message", message); + expected.endObject(); + return expected; + } +} \ No newline at end of file From a756d216153474ba67fc501151b2ceef0f3d5bcb Mon Sep 17 00:00:00 2001 From: "a.mikka" Date: Tue, 15 Jan 2019 12:09:26 +0400 Subject: [PATCH 2/6] Fix HeaderBasedSerializer: Add field:type mapping --- .../flume/sink/elasticsearch/Constants.java | 5 ++ .../elasticsearch/HeaderBasedSerializer.java | 54 +++++++++++++++---- .../TestHeaderBasedSerializer.java | 11 +++- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index ad92081..40db332 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -85,6 +85,11 @@ public class Constants { public static final String ES_AVRO_SCHEMA_FILE = "es.serializer.avro.schema.file"; + public static final String ES_HEADERBASED_BODY_FIELD_NAME = "es.serializer.headerBased.bodyField.name"; + public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME = "message"; + public static final String ES_HEADERBASED_FIELDS = "es.serializer.headerBased.fields"; + public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE = "string"; + /** * This enum is used for the time unit * diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java index e6cd9e5..1211d79 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java @@ -1,6 +1,7 @@ package com.cognitree.flume.sink.elasticsearch; import com.google.common.base.Charsets; +import com.google.common.base.Throwables; import org.apache.flume.Context; import org.apache.flume.Event; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -8,14 +9,19 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static com.cognitree.flume.sink.elasticsearch.Constants.*; public class HeaderBasedSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(HeaderBasedSerializer.class); - public static final String BODY_FIELD_NAME = "es.serializer.json.bodyFieldName"; - public static final String BODY_FIELD_NAME_VALUE = "message"; + + private final List names = new ArrayList(); + + private final List types = new ArrayList(); private String bodyFieldName; @@ -26,15 +32,22 @@ public XContentBuilder serialize(Event event) { XContentBuilder xContentBuilder = null; try { - xContentBuilder = jsonBuilder().startObject(); + if (!names.isEmpty() && !types.isEmpty()) { + xContentBuilder = jsonBuilder().startObject(); - for (Map.Entry entry: headers.entrySet()) { - xContentBuilder.field(entry.getKey(), entry.getValue()); - } + for (int i = 0; i < names.size(); i++) { + String name = names.get(i); + Util.addField(xContentBuilder, name, headers.get(name), types.get(i)); + } - xContentBuilder.field(bodyFieldName, body); + xContentBuilder.field(bodyFieldName, body); + Util.addField(xContentBuilder, bodyFieldName, body, DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE); - xContentBuilder.endObject(); + xContentBuilder.endObject(); + } else { + logger.error("Fields for headers based serializer are not configured, " + + "please configured the property " + ES_HEADERBASED_FIELDS); + } } catch (IOException e) { logger.error("Error in converting the event to the json format " + e.getMessage(), e); } @@ -43,7 +56,30 @@ public XContentBuilder serialize(Event event) { } public void configure(Context context) { - bodyFieldName = context.getString(BODY_FIELD_NAME, BODY_FIELD_NAME_VALUE); + bodyFieldName = context.getString(ES_HEADERBASED_BODY_FIELD_NAME, DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME); + + String fields = context.getString(ES_HEADERBASED_FIELDS); + if(fields == null) { + Throwables.propagate(new Exception("Fields for headers based serializer are not configured," + + " please configured the property " + ES_HEADERBASED_FIELDS)); + } + try { + String[] fieldTypes = fields.split(COMMA); + for (String fieldType : fieldTypes) { + names.add(getValue(fieldType, 0)); + types.add(getValue(fieldType, 1)); + } + } catch(Exception e) { + Throwables.propagate(e); + } + } + + private String getValue(String fieldType, Integer index) { + String value = ""; + if (fieldType.length() > index) { + value = fieldType.split(COLONS)[index]; + } + return value; } } diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java index f8afc55..af23445 100644 --- a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java +++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java @@ -14,6 +14,8 @@ import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_FIELDS; +import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_BODY_FIELD_NAME; import static org.junit.Assert.*; public class TestHeaderBasedSerializer { @@ -31,11 +33,14 @@ public void init() throws Exception { @Test public void testSerializer() throws IOException { Context context = new Context(); - context.put("es.serializer.json.bodyFieldName", "message"); + context.put(ES_HEADERBASED_BODY_FIELD_NAME, "message"); + context.put(ES_HEADERBASED_FIELDS, "id:int,name:string,datetime:string,@timestamp:int"); Map headers = new HashMap(); headers.put("id", "1"); headers.put("name", "test"); + headers.put("datetime", "2018-12-12 12:42:42.424"); + headers.put("@timestamp", "1544607762"); Event event = EventBuilder.withBody(message, charset, headers); @@ -50,8 +55,10 @@ public void testSerializer() throws IOException { private XContentBuilder generateContentBuilder() throws IOException { XContentBuilder expected = jsonBuilder().startObject(); - expected.field("id", "1"); + expected.field("id", 1); expected.field("name", "test"); + expected.field("datetime", "2018-12-12 12:42:42.424"); + expected.field("@timestamp", 1544607762); expected.field("message", message); expected.endObject(); return expected; From fb6d822c68adb2018699478a84d4073eba509f3e Mon Sep 17 00:00:00 2001 From: "a.mikka" Date: Tue, 15 Jan 2019 13:43:15 +0400 Subject: [PATCH 3/6] Add LONG type to type mapping --- .../com/cognitree/flume/sink/elasticsearch/Constants.java | 1 + .../java/com/cognitree/flume/sink/elasticsearch/Util.java | 3 +++ .../flume/sink/elasticsearch/TestHeaderBasedSerializer.java | 6 +++--- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index 40db332..55865e0 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -151,6 +151,7 @@ public String toString() { public enum FieldTypeEnum { STRING("string"), INT("int"), + LONG("long"), FLOAT("float"), BOOLEAN("boolean"); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java index dfbcdbe..0978cd5 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java @@ -114,6 +114,9 @@ public static void addField(XContentBuilder xContentBuilder,String key, String v case INT: xContentBuilder.field(key, Integer.parseInt(value)); break; + case LONG: + xContentBuilder.field(key, Long.parseLong(value)); + break; case BOOLEAN: xContentBuilder.field(key, Boolean.valueOf(value)); break; diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java index af23445..f0a3388 100644 --- a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java +++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java @@ -34,13 +34,13 @@ public void init() throws Exception { public void testSerializer() throws IOException { Context context = new Context(); context.put(ES_HEADERBASED_BODY_FIELD_NAME, "message"); - context.put(ES_HEADERBASED_FIELDS, "id:int,name:string,datetime:string,@timestamp:int"); + context.put(ES_HEADERBASED_FIELDS, "id:int,name:string,datetime:string,@timestamp:long"); Map headers = new HashMap(); headers.put("id", "1"); headers.put("name", "test"); headers.put("datetime", "2018-12-12 12:42:42.424"); - headers.put("@timestamp", "1544607762"); + headers.put("@timestamp", "1544607762424"); Event event = EventBuilder.withBody(message, charset, headers); @@ -58,7 +58,7 @@ private XContentBuilder generateContentBuilder() throws IOException { expected.field("id", 1); expected.field("name", "test"); expected.field("datetime", "2018-12-12 12:42:42.424"); - expected.field("@timestamp", 1544607762); + expected.field("@timestamp", 1544607762424L); expected.field("message", message); expected.endObject(); return expected; From 1622638ce2aa182d12fa28bc16b80d180e94b29c Mon Sep 17 00:00:00 2001 From: "a.mikka" Date: Tue, 15 Jan 2019 14:05:47 +0400 Subject: [PATCH 4/6] Fix HeaderBasedSerializer --- .../flume/sink/elasticsearch/HeaderBasedSerializer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java index 1211d79..3c8f784 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java @@ -40,7 +40,6 @@ public XContentBuilder serialize(Event event) { Util.addField(xContentBuilder, name, headers.get(name), types.get(i)); } - xContentBuilder.field(bodyFieldName, body); Util.addField(xContentBuilder, bodyFieldName, body, DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE); xContentBuilder.endObject(); From 3f340bd79957962b58d2a96a8a705028209a9a37 Mon Sep 17 00:00:00 2001 From: "a.mikka" Date: Thu, 24 Jan 2019 19:25:09 +0400 Subject: [PATCH 5/6] Update Readme --- README.md | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/README.md b/README.md index b4df69a..49f2dad 100644 --- a/README.md +++ b/README.md @@ -74,3 +74,48 @@ Example of agent named agent agent.sinks.es_sink.es.serializer.csv.delimiter=, agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc ```` + +####Available serializers +#####com.cognitree.flume.sink.elasticsearch.SimpleSerializer +This Serializer assumes the event body to be in JSON format +#####com.cognitree.flume.sink.elasticsearch.CsvSerializer +This Serializer assumes the event body to be in CSV format with custom delimiter specified. + +| Property Name | Default | Description | +|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------| +| es.serializer.csv.fields | - | Must be specified, fires exception otherwise. | +| es.serializer.csv.delimiter | , | Delimiter for *es.serializer.csv.fields* property | + +#####com.cognitree.flume.sink.elasticsearch.HeaderBasedSerializer +This Serializer assumes the event body as the main field and event headers as additional fields for json document. + +| Property Name | Default | Description | +|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------| +| es.serializer.headerBased.bodyField.name | message | Specifies the name of event body field in resulting json document | +| es.serializer.headerBased.fields | - | Must be specified, fires exception otherwise. Specifies data types of event headers.
Example: _timestamp:long,host:string,index:int_.
Supported data types: *string*, *int*, *long*, *float*, *boolean*| + +So, if you have an event such as: + + + --------------------------- + | Headers | + --------------------------- + | host: localhost | + | timestamp: 100000000000 | + | index: 1 | + --------------------------- + | Body | + --------------------------- + | Lorem ipsum dolor sit | + | amet, consectetur | + | adipiscing elit | + --------------------------- +then Serializer will produce json doc like this: +```json +{ + "host": "localhost", + "timestamp": 100000000000, + "index": 1, + "message": "Lorem ipsum dolor sit amet, consectetur adipiscing elit" +} +``` From fe3050278db4b724fb4cca30fdf32b82b1af279f Mon Sep 17 00:00:00 2001 From: "a.mikka" Date: Thu, 24 Jan 2019 19:40:00 +0400 Subject: [PATCH 6/6] Fix Readme --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 49f2dad..e60d566 100644 --- a/README.md +++ b/README.md @@ -75,10 +75,10 @@ Example of agent named agent agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc ```` -####Available serializers -#####com.cognitree.flume.sink.elasticsearch.SimpleSerializer +#### Available serializers +##### com.cognitree.flume.sink.elasticsearch.SimpleSerializer This Serializer assumes the event body to be in JSON format -#####com.cognitree.flume.sink.elasticsearch.CsvSerializer +##### com.cognitree.flume.sink.elasticsearch.CsvSerializer This Serializer assumes the event body to be in CSV format with custom delimiter specified. | Property Name | Default | Description | @@ -86,7 +86,7 @@ This Serializer assumes the event body to be in CSV format with custom delimiter | es.serializer.csv.fields | - | Must be specified, fires exception otherwise. | | es.serializer.csv.delimiter | , | Delimiter for *es.serializer.csv.fields* property | -#####com.cognitree.flume.sink.elasticsearch.HeaderBasedSerializer +##### com.cognitree.flume.sink.elasticsearch.HeaderBasedSerializer This Serializer assumes the event body as the main field and event headers as additional fields for json document. | Property Name | Default | Description |