diff --git a/README.md b/README.md index b4df69a..e60d566 100644 --- a/README.md +++ b/README.md @@ -74,3 +74,48 @@ Example of agent named agent agent.sinks.es_sink.es.serializer.csv.delimiter=, agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc ```` + +#### Available serializers +##### com.cognitree.flume.sink.elasticsearch.SimpleSerializer +This Serializer assumes the event body to be in JSON format +##### com.cognitree.flume.sink.elasticsearch.CsvSerializer +This Serializer assumes the event body to be in CSV format with custom delimiter specified. + +| Property Name | Default | Description | +|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------| +| es.serializer.csv.fields | - | Must be specified, fires exception otherwise. | +| es.serializer.csv.delimiter | , | Delimiter for *es.serializer.csv.fields* property | + +##### com.cognitree.flume.sink.elasticsearch.HeaderBasedSerializer +This Serializer assumes the event body as the main field and event headers as additional fields for json document. + +| Property Name | Default | Description | +|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------| +| es.serializer.headerBased.bodyField.name | message | Specifies the name of event body field in resulting json document | +| es.serializer.headerBased.fields | - | Must be specified, fires exception otherwise. Specifies data types of event headers.
Example: _timestamp:long,host:string,index:int_.
Supported data types: *string*, *int*, *long*, *float*, *boolean*| + +So, if you have an event such as: + + + --------------------------- + | Headers | + --------------------------- + | host: localhost | + | timestamp: 100000000000 | + | index: 1 | + --------------------------- + | Body | + --------------------------- + | Lorem ipsum dolor sit | + | amet, consectetur | + | adipiscing elit | + --------------------------- +then Serializer will produce json doc like this: +```json +{ + "host": "localhost", + "timestamp": 100000000000, + "index": 1, + "message": "Lorem ipsum dolor sit amet, consectetur adipiscing elit" +} +``` diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index ad92081..55865e0 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -85,6 +85,11 @@ public class Constants { public static final String ES_AVRO_SCHEMA_FILE = "es.serializer.avro.schema.file"; + public static final String ES_HEADERBASED_BODY_FIELD_NAME = "es.serializer.headerBased.bodyField.name"; + public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME = "message"; + public static final String ES_HEADERBASED_FIELDS = "es.serializer.headerBased.fields"; + public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE = "string"; + /** * This enum is used for the time unit * @@ -146,6 +151,7 @@ public String toString() { public enum FieldTypeEnum { STRING("string"), INT("int"), + LONG("long"), FLOAT("float"), BOOLEAN("boolean"); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java new file mode 100644 index 0000000..3c8f784 --- /dev/null +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/HeaderBasedSerializer.java @@ -0,0 +1,84 @@ +package com.cognitree.flume.sink.elasticsearch; + +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static com.cognitree.flume.sink.elasticsearch.Constants.*; + +public class HeaderBasedSerializer implements Serializer { + private static final Logger logger = LoggerFactory.getLogger(HeaderBasedSerializer.class); + + private final List names = new ArrayList(); + + private final List types = new ArrayList(); + + private String bodyFieldName; + + public XContentBuilder serialize(Event event) { + Map headers = event.getHeaders(); + String body = new String(event.getBody(), Charsets.UTF_8); + + XContentBuilder xContentBuilder = null; + + try { + if (!names.isEmpty() && !types.isEmpty()) { + xContentBuilder = jsonBuilder().startObject(); + + for (int i = 0; i < names.size(); i++) { + String name = names.get(i); + Util.addField(xContentBuilder, name, headers.get(name), types.get(i)); + } + + Util.addField(xContentBuilder, bodyFieldName, body, DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE); + + xContentBuilder.endObject(); + } else { + logger.error("Fields for headers based serializer are not configured, " + + "please configured the property " + ES_HEADERBASED_FIELDS); + } + } catch (IOException e) { + logger.error("Error in converting the event to the json format " + e.getMessage(), e); + } + + return xContentBuilder; + } + + public void configure(Context context) { + bodyFieldName = context.getString(ES_HEADERBASED_BODY_FIELD_NAME, DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME); + + String fields = context.getString(ES_HEADERBASED_FIELDS); + if(fields == null) { + Throwables.propagate(new Exception("Fields for headers based serializer are not configured," + + " please configured the property " + ES_HEADERBASED_FIELDS)); + } + try { + String[] fieldTypes = fields.split(COMMA); + for (String fieldType : fieldTypes) { + names.add(getValue(fieldType, 0)); + types.add(getValue(fieldType, 1)); + } + } catch(Exception e) { + Throwables.propagate(e); + } + } + + private String getValue(String fieldType, Integer index) { + String value = ""; + if (fieldType.length() > index) { + value = fieldType.split(COLONS)[index]; + } + return value; + } + +} diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java index dfbcdbe..0978cd5 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Util.java @@ -114,6 +114,9 @@ public static void addField(XContentBuilder xContentBuilder,String key, String v case INT: xContentBuilder.field(key, Integer.parseInt(value)); break; + case LONG: + xContentBuilder.field(key, Long.parseLong(value)); + break; case BOOLEAN: xContentBuilder.field(key, Boolean.valueOf(value)); break; diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java new file mode 100644 index 0000000..f0a3388 --- /dev/null +++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestHeaderBasedSerializer.java @@ -0,0 +1,66 @@ +package com.cognitree.flume.sink.elasticsearch; + +import com.google.gson.JsonParser; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_FIELDS; +import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_BODY_FIELD_NAME; +import static org.junit.Assert.*; + +public class TestHeaderBasedSerializer { + private HeaderBasedSerializer headerBasedSerializer; + + private static final Charset charset = Charset.defaultCharset(); + + private String message = "Lorem ipsum dolor sit amet"; + + @Before + public void init() throws Exception { + headerBasedSerializer = new HeaderBasedSerializer(); + } + + @Test + public void testSerializer() throws IOException { + Context context = new Context(); + context.put(ES_HEADERBASED_BODY_FIELD_NAME, "message"); + context.put(ES_HEADERBASED_FIELDS, "id:int,name:string,datetime:string,@timestamp:long"); + + Map headers = new HashMap(); + headers.put("id", "1"); + headers.put("name", "test"); + headers.put("datetime", "2018-12-12 12:42:42.424"); + headers.put("@timestamp", "1544607762424"); + + Event event = EventBuilder.withBody(message, charset, headers); + + headerBasedSerializer.configure(context); + + XContentBuilder expected = generateContentBuilder(); + XContentBuilder actual = headerBasedSerializer.serialize(event); + + JsonParser parser = new JsonParser(); + assertEquals(parser.parse(expected.string()), parser.parse(actual.string())); + } + + private XContentBuilder generateContentBuilder() throws IOException { + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("id", 1); + expected.field("name", "test"); + expected.field("datetime", "2018-12-12 12:42:42.424"); + expected.field("@timestamp", 1544607762424L); + expected.field("message", message); + expected.endObject(); + return expected; + } +} \ No newline at end of file