From 4663c34e96709c63145887cacd5cacf4677092bc Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Thu, 7 Dec 2017 14:50:30 +0530 Subject: [PATCH 01/15] refactored avro serializer code for deserializing data with different schemas and changed es.client.host parameter value --- .../sink/elasticsearch/AvroSerializer.java | 39 ++++++++++++------- .../flume/sink/elasticsearch/Constants.java | 2 +- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 0d42627..fad6900 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -30,6 +30,8 @@ import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import static com.cognitree.flume.sink.elasticsearch.Constants.ES_AVRO_SCHEMA_FILE; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -42,8 +44,9 @@ public class AvroSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(AvroSerializer.class); + public static final String PATH_TO_FILE = "/Users/sumits/Documents/avscFolder/"; - private DatumReader datumReader; + private Map> fileToDatumReaderMap = new HashMap>(); /** * Converts the avro binary data to the json format @@ -51,23 +54,34 @@ public class AvroSerializer implements Serializer { @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; + String basename = event.getHeaders().get("basename").split("\\.")[0]; try { - if (datumReader != null) { - Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); - GenericRecord data = datumReader.read(null, decoder); - logger.trace("Record in event " + data); - XContentParser parser = XContentFactory - .xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, data.toString()); - builder = jsonBuilder().copyCurrentStructure(parser); - parser.close(); - } else { - logger.error("Schema File is not configured"); + if (!fileToDatumReaderMap.containsKey(basename)) { + Schema schema = new Schema.Parser().parse(new File(PATH_TO_FILE + basename + ".avsc")); + DatumReader datumReader = new GenericDatumReader(schema); + fileToDatumReaderMap.put(basename, datumReader); } + } catch (IOException e) { + logger.error("Error in parsing schema file ", e.getMessage(), e); + Throwables.propagate(e); + } + + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = fileToDatumReaderMap.get(basename); + + try { + GenericRecord data = datumReader.read(null, decoder); + logger.trace("Record in event " + data); + XContentParser parser = XContentFactory + .xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, data.toString()); + builder = jsonBuilder().copyCurrentStructure(parser); + parser.close(); } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); } + return builder; } @@ -80,7 +94,6 @@ public void configure(Context context) { } try { Schema schema = new Schema.Parser().parse(new File(file)); - datumReader = new GenericDatumReader(schema); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); Throwables.propagate(e); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index ad92081..b9002c7 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -70,7 +70,7 @@ public class Constants { public static final String ES_CLUSTER_NAME = "cluster.name"; public static final String DEFAULT_ES_CLUSTER_NAME = "elasticsearch"; - public static final String ES_HOSTS = "es.client.hosts"; + public static final String ES_HOSTS = "hostNames"; public static final Integer DEFAULT_ES_PORT = 9300; From 7cd93fb764ec5961927abb8d9a65d8ffe47fb234 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Thu, 7 Dec 2017 14:53:20 +0530 Subject: [PATCH 02/15] updated README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 40d6425..9decee8 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Follow these steps to use this sink in Apache flume: `mvn clean assembly:assembly` -* Extract the file into the flume installation directories plugin.d folder. +* Extract the file into the flume installation directories plugins.d folder. * Configure the sink in the flume configuration file with properties as below From eb9a4b136f129bfdffd4084f8ee414f1e15f0507 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Thu, 14 Dec 2017 11:29:55 +0530 Subject: [PATCH 03/15] populating index and schema path from header --- .../cognitree/flume/sink/elasticsearch/AvroSerializer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index fad6900..2e441fa 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -17,6 +17,7 @@ import com.google.common.base.Throwables; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -44,7 +45,6 @@ public class AvroSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(AvroSerializer.class); - public static final String PATH_TO_FILE = "/Users/sumits/Documents/avscFolder/"; private Map> fileToDatumReaderMap = new HashMap>(); @@ -54,10 +54,10 @@ public class AvroSerializer implements Serializer { @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; - String basename = event.getHeaders().get("basename").split("\\.")[0]; + String basename = event.getHeaders().get("eventType"); try { if (!fileToDatumReaderMap.containsKey(basename)) { - Schema schema = new Schema.Parser().parse(new File(PATH_TO_FILE + basename + ".avsc")); + Schema schema = new Schema.Parser().parse(new File(event.getHeaders().get("eventSchemaPath"))); DatumReader datumReader = new GenericDatumReader(schema); fileToDatumReaderMap.put(basename, datumReader); } From 28339185e9f7ea971b1e6a6eada7a30a78ccc48c Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Fri, 15 Dec 2017 08:09:56 +0530 Subject: [PATCH 04/15] getting schema string instead of schema location path from header --- .../sink/elasticsearch/AvroSerializer.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 2e441fa..d9b42d4 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -54,16 +54,12 @@ public class AvroSerializer implements Serializer { @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; - String basename = event.getHeaders().get("eventType"); - try { - if (!fileToDatumReaderMap.containsKey(basename)) { - Schema schema = new Schema.Parser().parse(new File(event.getHeaders().get("eventSchemaPath"))); - DatumReader datumReader = new GenericDatumReader(schema); - fileToDatumReaderMap.put(basename, datumReader); - } - } catch (IOException e) { - logger.error("Error in parsing schema file ", e.getMessage(), e); - Throwables.propagate(e); + String basename = event.getHeaders().get("type"); + + if (!fileToDatumReaderMap.containsKey(basename)) { + Schema schema = new Schema.Parser().parse((event.getHeaders().get("schemaString"))); + DatumReader datumReader = new GenericDatumReader(schema); + fileToDatumReaderMap.put(basename, datumReader); } Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); @@ -76,6 +72,7 @@ public XContentBuilder serialize(Event event) { .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); + parser.close(); } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", From 840d9ce72f3274da7772b6ae900a8971ce7eabc4 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Fri, 15 Dec 2017 14:20:58 +0530 Subject: [PATCH 05/15] reading avro schema from provided schema inside header. If not provided using default schema --- .../sink/elasticsearch/AvroSerializer.java | 34 ++++++++++++------- .../elasticsearch/BodyBasedIndexBuilder.java | 4 +++ 2 files changed, 26 insertions(+), 12 deletions(-) create mode 100644 src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index d9b42d4..0055bcb 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -25,6 +25,8 @@ import org.apache.avro.io.DecoderFactory; import org.apache.flume.Context; import org.apache.flume.Event; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.common.xcontent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +48,9 @@ public class AvroSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(AvroSerializer.class); - private Map> fileToDatumReaderMap = new HashMap>(); + private DatumReader datumReader; + + private static final ObjectMapper objectMapper= new ObjectMapper(); /** * Converts the avro binary data to the json format @@ -54,25 +58,30 @@ public class AvroSerializer implements Serializer { @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; - String basename = event.getHeaders().get("type"); - - if (!fileToDatumReaderMap.containsKey(basename)) { - Schema schema = new Schema.Parser().parse((event.getHeaders().get("schemaString"))); - DatumReader datumReader = new GenericDatumReader(schema); - fileToDatumReaderMap.put(basename, datumReader); - } - - Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); - DatumReader datumReader = fileToDatumReaderMap.get(basename); try { - GenericRecord data = datumReader.read(null, decoder); + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + GenericRecord data; + if(event.getHeaders().containsKey("flume.avro.schema.literal")) { + Schema schema = new Schema.Parser().parse((event.getHeaders().get("flume.avro.schema.literal"))); + DatumReader datumReader = new GenericDatumReader(schema); + data = datumReader.read(null, decoder); + } else { + data = datumReader.read(null, decoder); + } + logger.trace("Record in event " + data); XContentParser parser = XContentFactory .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); + JsonNode dataNode = objectMapper.readTree(builder.string()); + + System.out.println("BUILDER STRING: " + builder.string()); + event.getHeaders().put("index", dataNode.get("es_index").getTextValue()); + event.getHeaders().put("type", dataNode.get("es_type").getTextValue()); + parser.close(); } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", @@ -91,6 +100,7 @@ public void configure(Context context) { } try { Schema schema = new Schema.Parser().parse(new File(file)); + datumReader = new GenericDatumReader(schema); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); Throwables.propagate(e); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java new file mode 100644 index 0000000..3e28d61 --- /dev/null +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java @@ -0,0 +1,4 @@ +package com.cognitree.flume.sink.elasticsearch; + +public class BodyBasedIndexBuilder { +} From 0fb59942638dd4f43cd6b7c266a589fd0c4b2ef8 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Fri, 15 Dec 2017 14:22:13 +0530 Subject: [PATCH 06/15] added event data based index builder to populate index, type and id variable --- .../elasticsearch/BodyBasedIndexBuilder.java | 76 ++++++++++++++++++- .../flume/sink/elasticsearch/Constants.java | 2 + .../sink/elasticsearch/ElasticSearchSink.java | 2 +- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java index 3e28d61..4a1688d 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java @@ -1,4 +1,76 @@ package com.cognitree.flume.sink.elasticsearch; -public class BodyBasedIndexBuilder { -} +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static com.cognitree.flume.sink.elasticsearch.Constants.*; + +public class BodyBasedIndexBuilder implements IndexBuilder { + + private static final Logger logger = LoggerFactory.getLogger(BodyBasedIndexBuilder.class); + + private static final ObjectMapper objectMapper= new ObjectMapper(); + + + private String eventIndexIdentifier; + private String eventTypeIdentifier; + private String eventIdIdentifier; + + @Override + public String getIndex(Event event) { + try { + JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); + JsonNode eventIndexNode = dataNode.get(eventIndexIdentifier); + if(eventIndexNode != null) { + return eventIndexNode.asText(); + } + } catch (IOException e) { + logger.error("Error parsing logger body", e); + } + return DEFAULT_ES_INDEX; + } + + @Override + public String getType(Event event) { + try { + JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); + JsonNode eventTypeNode = dataNode.get(eventTypeIdentifier); + if(eventTypeNode != null) { + return eventTypeNode.asText(); + } + } catch (IOException e) { + logger.error("Error parsing logger body", e); + } + return DEFAULT_ES_TYPE; + } + + @Override + public String getId(Event event) { + try { + JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); + JsonNode eventIdNode = dataNode.get(eventIdIdentifier); + if(eventIdNode != null) { + return eventIdNode.asText(); + } + } catch (IOException e) { + logger.error("Error parsing logger body", e); + } + return event.getHeaders().get(ID); + } + + @Override + public void configure(Context context) { + this.eventIndexIdentifier = Util.getContextValue(context, ES_INDEX); + this.eventTypeIdentifier = Util.getContextValue(context, ES_TYPE); + this.eventIdIdentifier = Util.getContextValue(context, ES_ID); + logger.info("Simple Index builder, name [{}] typeIdentifier [{}] id [[]]", + new Object[]{this.eventIndexIdentifier, this.eventTypeIdentifier, this.eventIdIdentifier}); + + } +} \ No newline at end of file diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index b9002c7..a99e6f8 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -60,6 +60,8 @@ public class Constants { public static final String ES_TYPE = "es.type"; public static final String DEFAULT_ES_TYPE = "default"; + public static final String ES_ID = "es.id"; + public static final String ES_INDEX_BUILDER = "es.index.builder"; public static final String DEFAULT_ES_INDEX_BUILDER = "com.cognitree.flume.sink.elasticsearch.StaticIndexBuilder"; diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java index 6406f02..47447f5 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java @@ -85,10 +85,10 @@ public Status process() throws EventDeliveryException { String body = new String(event.getBody(), Charsets.UTF_8); if (!Strings.isNullOrEmpty(body)) { logger.debug("start to sink event [{}].", body); + XContentBuilder xContentBuilder = serializer.serialize(event); String index = indexBuilder.getIndex(event); String type = indexBuilder.getType(event); String id = indexBuilder.getId(event); - XContentBuilder xContentBuilder = serializer.serialize(event); if(xContentBuilder != null) { if (!StringUtil.isNullOrEmpty(id)) { bulkProcessor.add(new IndexRequest(index, type, id) From 093f6913608524cbc7d6046763c789e023d1eb63 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Fri, 15 Dec 2017 15:34:18 +0530 Subject: [PATCH 07/15] putting deserialized avro data into event's body --- .../flume/sink/elasticsearch/AvroSerializer.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 0055bcb..be83ba1 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -50,8 +50,6 @@ public class AvroSerializer implements Serializer { private DatumReader datumReader; - private static final ObjectMapper objectMapper= new ObjectMapper(); - /** * Converts the avro binary data to the json format */ @@ -76,11 +74,7 @@ public XContentBuilder serialize(Event event) { .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); - JsonNode dataNode = objectMapper.readTree(builder.string()); - - System.out.println("BUILDER STRING: " + builder.string()); - event.getHeaders().put("index", dataNode.get("es_index").getTextValue()); - event.getHeaders().put("type", dataNode.get("es_type").getTextValue()); + event.setBody(builder.string().getBytes()); parser.close(); } catch (IOException e) { From 89fefa74404611025033bb70a0713a9263bbc230 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Fri, 15 Dec 2017 20:01:53 +0530 Subject: [PATCH 08/15] populating hashValueToSchemaMap to keep track already seen schema --- .../flume/sink/elasticsearch/AvroSerializer.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index be83ba1..4c623fa 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -50,6 +50,8 @@ public class AvroSerializer implements Serializer { private DatumReader datumReader; + private Map> hashValueToDatumReaderMap; + /** * Converts the avro binary data to the json format */ @@ -64,8 +66,14 @@ public XContentBuilder serialize(Event event) { Schema schema = new Schema.Parser().parse((event.getHeaders().get("flume.avro.schema.literal"))); DatumReader datumReader = new GenericDatumReader(schema); data = datumReader.read(null, decoder); + hashValueToDatumReaderMap.put(event.getHeaders().get("flume.avro.schema.hash"), datumReader); } else { - data = datumReader.read(null, decoder); + if(hashValueToDatumReaderMap.containsKey(event.getHeaders().get("flume.avro.schema.hash"))) { + DatumReader datumReader = hashValueToDatumReaderMap.get(event.getHeaders().get("flume.avro.schema.hash")); + data = datumReader.read(null, decoder); + } else { + data = datumReader.read(null, decoder); + } } logger.trace("Record in event " + data); @@ -95,6 +103,7 @@ public void configure(Context context) { try { Schema schema = new Schema.Parser().parse(new File(file)); datumReader = new GenericDatumReader(schema); + hashValueToDatumReaderMap = new HashMap>(); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); Throwables.propagate(e); From 9e406c9a53433b330c523e63c811d9f417a21cf8 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Mon, 18 Dec 2017 15:52:48 +0530 Subject: [PATCH 09/15] added java doc and variable rename --- .../sink/elasticsearch/AvroSerializer.java | 13 +++++------- ...r.java => EventDataBasedIndexBuilder.java} | 20 ++++++++++++++++--- 2 files changed, 22 insertions(+), 11 deletions(-) rename src/main/java/com/cognitree/flume/sink/elasticsearch/{BodyBasedIndexBuilder.java => EventDataBasedIndexBuilder.java} (78%) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 4c623fa..88d3ef7 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -17,7 +17,6 @@ import com.google.common.base.Throwables; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -25,8 +24,6 @@ import org.apache.avro.io.DecoderFactory; import org.apache.flume.Context; import org.apache.flume.Event; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.common.xcontent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +47,7 @@ public class AvroSerializer implements Serializer { private DatumReader datumReader; - private Map> hashValueToDatumReaderMap; + private Map> avroHashSchemaToDatumReaderMap; /** * Converts the avro binary data to the json format @@ -66,10 +63,10 @@ public XContentBuilder serialize(Event event) { Schema schema = new Schema.Parser().parse((event.getHeaders().get("flume.avro.schema.literal"))); DatumReader datumReader = new GenericDatumReader(schema); data = datumReader.read(null, decoder); - hashValueToDatumReaderMap.put(event.getHeaders().get("flume.avro.schema.hash"), datumReader); + avroHashSchemaToDatumReaderMap.put(event.getHeaders().get("flume.avro.schema.hash"), datumReader); } else { - if(hashValueToDatumReaderMap.containsKey(event.getHeaders().get("flume.avro.schema.hash"))) { - DatumReader datumReader = hashValueToDatumReaderMap.get(event.getHeaders().get("flume.avro.schema.hash")); + if(avroHashSchemaToDatumReaderMap.containsKey(event.getHeaders().get("flume.avro.schema.hash"))) { + DatumReader datumReader = avroHashSchemaToDatumReaderMap.get(event.getHeaders().get("flume.avro.schema.hash")); data = datumReader.read(null, decoder); } else { data = datumReader.read(null, decoder); @@ -103,7 +100,7 @@ public void configure(Context context) { try { Schema schema = new Schema.Parser().parse(new File(file)); datumReader = new GenericDatumReader(schema); - hashValueToDatumReaderMap = new HashMap>(); + avroHashSchemaToDatumReaderMap = new HashMap>(); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); Throwables.propagate(e); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java similarity index 78% rename from src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java rename to src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java index 4a1688d..aefc196 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/BodyBasedIndexBuilder.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java @@ -11,17 +11,23 @@ import static com.cognitree.flume.sink.elasticsearch.Constants.*; -public class BodyBasedIndexBuilder implements IndexBuilder { +/** + * Creates Index, type and id based on the event data. + */ +public class EventDataBasedIndexBuilder implements IndexBuilder { - private static final Logger logger = LoggerFactory.getLogger(BodyBasedIndexBuilder.class); + private static final Logger logger = LoggerFactory.getLogger(EventDataBasedIndexBuilder.class); private static final ObjectMapper objectMapper= new ObjectMapper(); - private String eventIndexIdentifier; private String eventTypeIdentifier; private String eventIdIdentifier; + /** + * Get the field identified by eventIndexIdentifier and returns its value as index name. If the field is absent + * then returns default index value. + */ @Override public String getIndex(Event event) { try { @@ -36,6 +42,10 @@ public String getIndex(Event event) { return DEFAULT_ES_INDEX; } + /** + * Get the field identified by eventTypeIdentifier and returns its value as type name. If the field is absent + * then returns default type value. + */ @Override public String getType(Event event) { try { @@ -50,6 +60,10 @@ public String getType(Event event) { return DEFAULT_ES_TYPE; } + /** + * Get the field identified by eventIdIdentifier and returns its value as type name. If the field is absent + * then returns value obtained from header for ID. + */ @Override public String getId(Event event) { try { From b223d83170c445cc7a399dfd2626c552d8732a4a Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Wed, 20 Dec 2017 11:30:50 +0530 Subject: [PATCH 10/15] moved logic of finding a datumReader from event into separate method --- .../sink/elasticsearch/AvroSerializer.java | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 88d3ef7..19ea941 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -45,7 +45,7 @@ public class AvroSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(AvroSerializer.class); - private DatumReader datumReader; + private DatumReader defaultDatumReader; private Map> avroHashSchemaToDatumReaderMap; @@ -58,20 +58,8 @@ public XContentBuilder serialize(Event event) { try { Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); - GenericRecord data; - if(event.getHeaders().containsKey("flume.avro.schema.literal")) { - Schema schema = new Schema.Parser().parse((event.getHeaders().get("flume.avro.schema.literal"))); - DatumReader datumReader = new GenericDatumReader(schema); - data = datumReader.read(null, decoder); - avroHashSchemaToDatumReaderMap.put(event.getHeaders().get("flume.avro.schema.hash"), datumReader); - } else { - if(avroHashSchemaToDatumReaderMap.containsKey(event.getHeaders().get("flume.avro.schema.hash"))) { - DatumReader datumReader = avroHashSchemaToDatumReaderMap.get(event.getHeaders().get("flume.avro.schema.hash")); - data = datumReader.read(null, decoder); - } else { - data = datumReader.read(null, decoder); - } - } + DatumReader datumReader = getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); logger.trace("Record in event " + data); XContentParser parser = XContentFactory @@ -90,6 +78,17 @@ public XContentBuilder serialize(Event event) { return builder; } + private DatumReader getDatumReader(Event event) { + if(event.getHeaders().containsKey("flume.avro.schema.literal")) { + Schema schema = new Schema.Parser().parse((event.getHeaders().get("flume.avro.schema.literal"))); + DatumReader datumReader = new GenericDatumReader(schema); + avroHashSchemaToDatumReaderMap.put(event.getHeaders().get("flume.avro.schema.hash"), datumReader); + return datumReader; + } else { + return avroHashSchemaToDatumReaderMap.getOrDefault(event.getHeaders().get("flume.avro.schema.hash"), defaultDatumReader); + } + } + @Override public void configure(Context context) { String file = context.getString(ES_AVRO_SCHEMA_FILE); @@ -99,7 +98,7 @@ public void configure(Context context) { } try { Schema schema = new Schema.Parser().parse(new File(file)); - datumReader = new GenericDatumReader(schema); + defaultDatumReader = new GenericDatumReader(schema); avroHashSchemaToDatumReaderMap = new HashMap>(); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); From b5a8167d0e199c871dab2cb5023c699ccaaf962c Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Wed, 20 Dec 2017 11:36:18 +0530 Subject: [PATCH 11/15] extracted constants for String literals --- .../flume/sink/elasticsearch/AvroSerializer.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 19ea941..fcad5ed 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -45,6 +45,9 @@ public class AvroSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(AvroSerializer.class); + private static final String FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD = "flume.avro.schema.literal"; + private static final String FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD = "flume.avro.schema.hash"; + private DatumReader defaultDatumReader; private Map> avroHashSchemaToDatumReaderMap; @@ -79,13 +82,14 @@ public XContentBuilder serialize(Event event) { } private DatumReader getDatumReader(Event event) { - if(event.getHeaders().containsKey("flume.avro.schema.literal")) { - Schema schema = new Schema.Parser().parse((event.getHeaders().get("flume.avro.schema.literal"))); + if(event.getHeaders().containsKey(FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD)) { + Schema schema = new Schema.Parser().parse((event.getHeaders().get(FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD))); DatumReader datumReader = new GenericDatumReader(schema); - avroHashSchemaToDatumReaderMap.put(event.getHeaders().get("flume.avro.schema.hash"), datumReader); + avroHashSchemaToDatumReaderMap.put(event.getHeaders().get(FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), datumReader); return datumReader; } else { - return avroHashSchemaToDatumReaderMap.getOrDefault(event.getHeaders().get("flume.avro.schema.hash"), defaultDatumReader); + return avroHashSchemaToDatumReaderMap.getOrDefault(event.getHeaders().get( + FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), defaultDatumReader); } } From 60cd772dc0c8f6601f7f70e068fc0f455a742da3 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Wed, 20 Dec 2017 11:39:40 +0530 Subject: [PATCH 12/15] added copyright and variable rename --- .../EventDataBasedIndexBuilder.java | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java index aefc196..e0d66b7 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java @@ -1,3 +1,18 @@ +/* + * Copyright 2017 Cognitree Technologies + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ package com.cognitree.flume.sink.elasticsearch; import org.apache.flume.Context; @@ -20,19 +35,19 @@ public class EventDataBasedIndexBuilder implements IndexBuilder { private static final ObjectMapper objectMapper= new ObjectMapper(); - private String eventIndexIdentifier; - private String eventTypeIdentifier; - private String eventIdIdentifier; + private String indexField; + private String typeField; + private String idField; /** - * Get the field identified by eventIndexIdentifier and returns its value as index name. If the field is absent + * Get the field identified by indexField and returns its value as index name. If the field is absent * then returns default index value. */ @Override public String getIndex(Event event) { try { JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); - JsonNode eventIndexNode = dataNode.get(eventIndexIdentifier); + JsonNode eventIndexNode = dataNode.get(indexField); if(eventIndexNode != null) { return eventIndexNode.asText(); } @@ -43,14 +58,14 @@ public String getIndex(Event event) { } /** - * Get the field identified by eventTypeIdentifier and returns its value as type name. If the field is absent + * Get the field identified by typeField and returns its value as type name. If the field is absent * then returns default type value. */ @Override public String getType(Event event) { try { JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); - JsonNode eventTypeNode = dataNode.get(eventTypeIdentifier); + JsonNode eventTypeNode = dataNode.get(typeField); if(eventTypeNode != null) { return eventTypeNode.asText(); } @@ -61,14 +76,14 @@ public String getType(Event event) { } /** - * Get the field identified by eventIdIdentifier and returns its value as type name. If the field is absent + * Get the field identified by idField and returns its value as type name. If the field is absent * then returns value obtained from header for ID. */ @Override public String getId(Event event) { try { JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); - JsonNode eventIdNode = dataNode.get(eventIdIdentifier); + JsonNode eventIdNode = dataNode.get(idField); if(eventIdNode != null) { return eventIdNode.asText(); } @@ -80,11 +95,11 @@ public String getId(Event event) { @Override public void configure(Context context) { - this.eventIndexIdentifier = Util.getContextValue(context, ES_INDEX); - this.eventTypeIdentifier = Util.getContextValue(context, ES_TYPE); - this.eventIdIdentifier = Util.getContextValue(context, ES_ID); + this.indexField = Util.getContextValue(context, ES_INDEX); + this.typeField = Util.getContextValue(context, ES_TYPE); + this.idField = Util.getContextValue(context, ES_ID); logger.info("Simple Index builder, name [{}] typeIdentifier [{}] id [[]]", - new Object[]{this.eventIndexIdentifier, this.eventTypeIdentifier, this.eventIdIdentifier}); + new Object[]{this.indexField, this.typeField, this.idField}); } } \ No newline at end of file From 9d19fd061613178ec209d15043476254b5ebbc57 Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Wed, 20 Dec 2017 11:51:48 +0530 Subject: [PATCH 13/15] moved XContentParser inside try with resource statement --- .../flume/sink/elasticsearch/AvroSerializer.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index fcad5ed..d144584 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -65,14 +65,13 @@ public XContentBuilder serialize(Event event) { GenericRecord data = datumReader.read(null, decoder); logger.trace("Record in event " + data); - XContentParser parser = XContentFactory - .xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, data.toString()); - builder = jsonBuilder().copyCurrentStructure(parser); - - event.setBody(builder.string().getBytes()); - parser.close(); + try (XContentParser parser = XContentFactory + .xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, data.toString())) { + builder = jsonBuilder().copyCurrentStructure(parser); + event.setBody(builder.string().getBytes()); + } } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); From bde2276165848c34159d239270d20c96cafbc9bf Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Wed, 20 Dec 2017 12:28:03 +0530 Subject: [PATCH 14/15] avoiding modification to event body and using avro serializer inside EventDataBasedIndexBuilder to populate index, type, and id field --- .../sink/elasticsearch/AvroSerializer.java | 13 +++---- .../EventDataBasedIndexBuilder.java | 39 +++++++++++++------ 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index d144584..6343272 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -50,7 +50,8 @@ public class AvroSerializer implements Serializer { private DatumReader defaultDatumReader; - private Map> avroHashSchemaToDatumReaderMap; + // avro schema hash string to datum reader map + private Map> hashToReaderMap; /** * Converts the avro binary data to the json format @@ -70,24 +71,22 @@ public XContentBuilder serialize(Event event) { .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString())) { builder = jsonBuilder().copyCurrentStructure(parser); - event.setBody(builder.string().getBytes()); } } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); } - return builder; } - private DatumReader getDatumReader(Event event) { + DatumReader getDatumReader(Event event) { if(event.getHeaders().containsKey(FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD)) { Schema schema = new Schema.Parser().parse((event.getHeaders().get(FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD))); DatumReader datumReader = new GenericDatumReader(schema); - avroHashSchemaToDatumReaderMap.put(event.getHeaders().get(FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), datumReader); + hashToReaderMap.put(event.getHeaders().get(FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), datumReader); return datumReader; } else { - return avroHashSchemaToDatumReaderMap.getOrDefault(event.getHeaders().get( + return hashToReaderMap.getOrDefault(event.getHeaders().get( FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), defaultDatumReader); } } @@ -102,7 +101,7 @@ public void configure(Context context) { try { Schema schema = new Schema.Parser().parse(new File(file)); defaultDatumReader = new GenericDatumReader(schema); - avroHashSchemaToDatumReaderMap = new HashMap>(); + hashToReaderMap = new HashMap>(); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); Throwables.propagate(e); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java index e0d66b7..cfdd55b 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java @@ -15,10 +15,15 @@ */ package com.cognitree.flume.sink.elasticsearch; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; import org.apache.flume.Context; import org.apache.flume.Event; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +44,8 @@ public class EventDataBasedIndexBuilder implements IndexBuilder { private String typeField; private String idField; + private AvroSerializer avroSerializer; + /** * Get the field identified by indexField and returns its value as index name. If the field is absent * then returns default index value. @@ -46,10 +53,12 @@ public class EventDataBasedIndexBuilder implements IndexBuilder { @Override public String getIndex(Event event) { try { - JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); - JsonNode eventIndexNode = dataNode.get(indexField); - if(eventIndexNode != null) { - return eventIndexNode.asText(); + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = avroSerializer.getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + if(data.get(indexField) != null) { + return data.get(indexField).toString(); } } catch (IOException e) { logger.error("Error parsing logger body", e); @@ -64,10 +73,12 @@ public String getIndex(Event event) { @Override public String getType(Event event) { try { - JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); - JsonNode eventTypeNode = dataNode.get(typeField); - if(eventTypeNode != null) { - return eventTypeNode.asText(); + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = avroSerializer.getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + if(data.get(typeField) != null) { + return data.get(typeField).toString(); } } catch (IOException e) { logger.error("Error parsing logger body", e); @@ -82,10 +93,12 @@ public String getType(Event event) { @Override public String getId(Event event) { try { - JsonNode dataNode = objectMapper.readTree(new String(event.getBody())); - JsonNode eventIdNode = dataNode.get(idField); - if(eventIdNode != null) { - return eventIdNode.asText(); + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = avroSerializer.getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + if(data.get(idField) != null) { + return data.get(idField).toString(); } } catch (IOException e) { logger.error("Error parsing logger body", e); @@ -101,5 +114,7 @@ public void configure(Context context) { logger.info("Simple Index builder, name [{}] typeIdentifier [{}] id [[]]", new Object[]{this.indexField, this.typeField, this.idField}); + avroSerializer = new AvroSerializer(); + avroSerializer.configure(context); } } \ No newline at end of file From 88de8e7256d5da74eb2d3367428e5b9399dd513b Mon Sep 17 00:00:00 2001 From: Sumit Suthar Date: Wed, 20 Dec 2017 12:39:37 +0530 Subject: [PATCH 15/15] replaced try-resource block with try with finally block --- .../flume/sink/elasticsearch/AvroSerializer.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 6343272..b76030d 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -67,11 +67,18 @@ public XContentBuilder serialize(Event event) { logger.trace("Record in event " + data); - try (XContentParser parser = XContentFactory - .xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, data.toString())) { + XContentParser parser = null; + try { + parser = XContentFactory + .xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); + } finally { + if (parser != null) { + parser.close(); + } } + } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e);