Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,48 @@ Example of agent named agent
agent.sinks.es_sink.es.serializer.csv.delimiter=,
agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc
````

#### Available serializers
##### com.cognitree.flume.sink.elasticsearch.SimpleSerializer
This Serializer assumes the event body to be in JSON format
##### com.cognitree.flume.sink.elasticsearch.CsvSerializer
This Serializer assumes the event body to be in CSV format with custom delimiter specified.

| Property Name | Default | Description |
|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------|
| es.serializer.csv.fields | - | Must be specified, fires exception otherwise. |
| es.serializer.csv.delimiter | , | Delimiter for *es.serializer.csv.fields* property |

##### com.cognitree.flume.sink.elasticsearch.HeaderBasedSerializer
This Serializer assumes the event body as the main field and event headers as additional fields for json document.

| Property Name | Default | Description |
|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------|
| es.serializer.headerBased.bodyField.name | message | Specifies the name of event body field in resulting json document |
| es.serializer.headerBased.fields | - | Must be specified, fires exception otherwise. Specifies data types of event headers. <br> Example: _timestamp:long,host:string,index:int_. <br> Supported data types: *string*, *int*, *long*, *float*, *boolean*|

So, if you have an event such as:

<!-- language: lang-none -->
---------------------------
| Headers |
---------------------------
| host: localhost |
| timestamp: 100000000000 |
| index: 1 |
---------------------------
| Body |
---------------------------
| Lorem ipsum dolor sit |
| amet, consectetur |
| adipiscing elit |
---------------------------
then Serializer will produce json doc like this:
```json
{
"host": "localhost",
"timestamp": 100000000000,
"index": 1,
"message": "Lorem ipsum dolor sit amet, consectetur adipiscing elit"
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public class Constants {

public static final String ES_AVRO_SCHEMA_FILE = "es.serializer.avro.schema.file";

public static final String ES_HEADERBASED_BODY_FIELD_NAME = "es.serializer.headerBased.bodyField.name";
public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME = "message";
public static final String ES_HEADERBASED_FIELDS = "es.serializer.headerBased.fields";
public static final String DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE = "string";

/**
* This enum is used for the time unit
*
Expand Down Expand Up @@ -146,6 +151,7 @@ public String toString() {
public enum FieldTypeEnum {
STRING("string"),
INT("int"),
LONG("long"),
FLOAT("float"),
BOOLEAN("boolean");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.cognitree.flume.sink.elasticsearch;

import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static com.cognitree.flume.sink.elasticsearch.Constants.*;

public class HeaderBasedSerializer implements Serializer {
private static final Logger logger = LoggerFactory.getLogger(HeaderBasedSerializer.class);

private final List<String> names = new ArrayList<String>();

private final List<String> types = new ArrayList<String>();

private String bodyFieldName;

public XContentBuilder serialize(Event event) {
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody(), Charsets.UTF_8);

XContentBuilder xContentBuilder = null;

try {
if (!names.isEmpty() && !types.isEmpty()) {
xContentBuilder = jsonBuilder().startObject();

for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
Util.addField(xContentBuilder, name, headers.get(name), types.get(i));
}

Util.addField(xContentBuilder, bodyFieldName, body, DEFAULT_ES_HEADERBASED_BODY_FIELD_TYPE);

xContentBuilder.endObject();
} else {
logger.error("Fields for headers based serializer are not configured, " +
"please configured the property " + ES_HEADERBASED_FIELDS);
}
} catch (IOException e) {
logger.error("Error in converting the event to the json format " + e.getMessage(), e);
}

return xContentBuilder;
}

public void configure(Context context) {
bodyFieldName = context.getString(ES_HEADERBASED_BODY_FIELD_NAME, DEFAULT_ES_HEADERBASED_BODY_FIELD_NAME);

String fields = context.getString(ES_HEADERBASED_FIELDS);
if(fields == null) {
Throwables.propagate(new Exception("Fields for headers based serializer are not configured," +
" please configured the property " + ES_HEADERBASED_FIELDS));
}
try {
String[] fieldTypes = fields.split(COMMA);
for (String fieldType : fieldTypes) {
names.add(getValue(fieldType, 0));
types.add(getValue(fieldType, 1));
}
} catch(Exception e) {
Throwables.propagate(e);
}
}

private String getValue(String fieldType, Integer index) {
String value = "";
if (fieldType.length() > index) {
value = fieldType.split(COLONS)[index];
}
return value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public static void addField(XContentBuilder xContentBuilder,String key, String v
case INT:
xContentBuilder.field(key, Integer.parseInt(value));
break;
case LONG:
xContentBuilder.field(key, Long.parseLong(value));
break;
case BOOLEAN:
xContentBuilder.field(key, Boolean.valueOf(value));
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.cognitree.flume.sink.elasticsearch;

import com.google.gson.JsonParser;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_FIELDS;
import static com.cognitree.flume.sink.elasticsearch.Constants.ES_HEADERBASED_BODY_FIELD_NAME;
import static org.junit.Assert.*;

public class TestHeaderBasedSerializer {
private HeaderBasedSerializer headerBasedSerializer;

private static final Charset charset = Charset.defaultCharset();

private String message = "Lorem ipsum dolor sit amet";

@Before
public void init() throws Exception {
headerBasedSerializer = new HeaderBasedSerializer();
}

@Test
public void testSerializer() throws IOException {
Context context = new Context();
context.put(ES_HEADERBASED_BODY_FIELD_NAME, "message");
context.put(ES_HEADERBASED_FIELDS, "id:int,name:string,datetime:string,@timestamp:long");

Map<String, String> headers = new HashMap<String, String>();
headers.put("id", "1");
headers.put("name", "test");
headers.put("datetime", "2018-12-12 12:42:42.424");
headers.put("@timestamp", "1544607762424");

Event event = EventBuilder.withBody(message, charset, headers);

headerBasedSerializer.configure(context);

XContentBuilder expected = generateContentBuilder();
XContentBuilder actual = headerBasedSerializer.serialize(event);

JsonParser parser = new JsonParser();
assertEquals(parser.parse(expected.string()), parser.parse(actual.string()));
}

private XContentBuilder generateContentBuilder() throws IOException {
XContentBuilder expected = jsonBuilder().startObject();
expected.field("id", 1);
expected.field("name", "test");
expected.field("datetime", "2018-12-12 12:42:42.424");
expected.field("@timestamp", 1544607762424L);
expected.field("message", message);
expected.endObject();
return expected;
}
}