diff --git a/README.md b/README.md index 40d6425..9decee8 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Follow these steps to use this sink in Apache flume: `mvn clean assembly:assembly` -* Extract the file into the flume installation directories plugin.d folder. +* Extract the file into the flume installation directories plugins.d folder. * Configure the sink in the flume configuration file with properties as below diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java index 0d42627..b76030d 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/AvroSerializer.java @@ -30,6 +30,8 @@ import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import static com.cognitree.flume.sink.elasticsearch.Constants.ES_AVRO_SCHEMA_FILE; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -43,7 +45,13 @@ public class AvroSerializer implements Serializer { private static final Logger logger = LoggerFactory.getLogger(AvroSerializer.class); - private DatumReader datumReader; + private static final String FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD = "flume.avro.schema.literal"; + private static final String FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD = "flume.avro.schema.hash"; + + private DatumReader defaultDatumReader; + + // avro schema hash string to datum reader map + private Map> hashToReaderMap; /** * Converts the avro binary data to the json format @@ -51,19 +59,26 @@ public class AvroSerializer implements Serializer { @Override public XContentBuilder serialize(Event event) { XContentBuilder builder = null; + try { - if (datumReader != null) { - Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); - GenericRecord data = datumReader.read(null, decoder); - logger.trace("Record in event " + data); - XContentParser parser = XContentFactory + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + logger.trace("Record in event " + data); + + XContentParser parser = null; + try { + parser = XContentFactory .xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, data.toString()); builder = jsonBuilder().copyCurrentStructure(parser); - parser.close(); - } else { - logger.error("Schema File is not configured"); + } finally { + if (parser != null) { + parser.close(); + } } + } catch (IOException e) { logger.error("Exception in parsing avro format data but continuing serialization to process further records", e.getMessage(), e); @@ -71,6 +86,18 @@ public XContentBuilder serialize(Event event) { return builder; } + DatumReader getDatumReader(Event event) { + if(event.getHeaders().containsKey(FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD)) { + Schema schema = new Schema.Parser().parse((event.getHeaders().get(FLUME_AVRO_SCHEMA_STRING_HEADER_FIELD))); + DatumReader datumReader = new GenericDatumReader(schema); + hashToReaderMap.put(event.getHeaders().get(FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), datumReader); + return datumReader; + } else { + return hashToReaderMap.getOrDefault(event.getHeaders().get( + FLUME_AVRO_SCHEMA_HASH_HEADER_FIELD), defaultDatumReader); + } + } + @Override public void configure(Context context) { String file = context.getString(ES_AVRO_SCHEMA_FILE); @@ -80,7 +107,8 @@ public void configure(Context context) { } try { Schema schema = new Schema.Parser().parse(new File(file)); - datumReader = new GenericDatumReader(schema); + defaultDatumReader = new GenericDatumReader(schema); + hashToReaderMap = new HashMap>(); } catch (IOException e) { logger.error("Error in parsing schema file ", e.getMessage(), e); Throwables.propagate(e); diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index ad92081..a99e6f8 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -60,6 +60,8 @@ public class Constants { public static final String ES_TYPE = "es.type"; public static final String DEFAULT_ES_TYPE = "default"; + public static final String ES_ID = "es.id"; + public static final String ES_INDEX_BUILDER = "es.index.builder"; public static final String DEFAULT_ES_INDEX_BUILDER = "com.cognitree.flume.sink.elasticsearch.StaticIndexBuilder"; @@ -70,7 +72,7 @@ public class Constants { public static final String ES_CLUSTER_NAME = "cluster.name"; public static final String DEFAULT_ES_CLUSTER_NAME = "elasticsearch"; - public static final String ES_HOSTS = "es.client.hosts"; + public static final String ES_HOSTS = "hostNames"; public static final Integer DEFAULT_ES_PORT = 9300; diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java index 6406f02..47447f5 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/ElasticSearchSink.java @@ -85,10 +85,10 @@ public Status process() throws EventDeliveryException { String body = new String(event.getBody(), Charsets.UTF_8); if (!Strings.isNullOrEmpty(body)) { logger.debug("start to sink event [{}].", body); + XContentBuilder xContentBuilder = serializer.serialize(event); String index = indexBuilder.getIndex(event); String type = indexBuilder.getType(event); String id = indexBuilder.getId(event); - XContentBuilder xContentBuilder = serializer.serialize(event); if(xContentBuilder != null) { if (!StringUtil.isNullOrEmpty(id)) { bulkProcessor.add(new IndexRequest(index, type, id) diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java new file mode 100644 index 0000000..cfdd55b --- /dev/null +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/EventDataBasedIndexBuilder.java @@ -0,0 +1,120 @@ +/* + * Copyright 2017 Cognitree Technologies + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.cognitree.flume.sink.elasticsearch; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static com.cognitree.flume.sink.elasticsearch.Constants.*; + +/** + * Creates Index, type and id based on the event data. + */ +public class EventDataBasedIndexBuilder implements IndexBuilder { + + private static final Logger logger = LoggerFactory.getLogger(EventDataBasedIndexBuilder.class); + + private static final ObjectMapper objectMapper= new ObjectMapper(); + + private String indexField; + private String typeField; + private String idField; + + private AvroSerializer avroSerializer; + + /** + * Get the field identified by indexField and returns its value as index name. If the field is absent + * then returns default index value. + */ + @Override + public String getIndex(Event event) { + try { + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = avroSerializer.getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + if(data.get(indexField) != null) { + return data.get(indexField).toString(); + } + } catch (IOException e) { + logger.error("Error parsing logger body", e); + } + return DEFAULT_ES_INDEX; + } + + /** + * Get the field identified by typeField and returns its value as type name. If the field is absent + * then returns default type value. + */ + @Override + public String getType(Event event) { + try { + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = avroSerializer.getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + if(data.get(typeField) != null) { + return data.get(typeField).toString(); + } + } catch (IOException e) { + logger.error("Error parsing logger body", e); + } + return DEFAULT_ES_TYPE; + } + + /** + * Get the field identified by idField and returns its value as type name. If the field is absent + * then returns value obtained from header for ID. + */ + @Override + public String getId(Event event) { + try { + Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null); + DatumReader datumReader = avroSerializer.getDatumReader(event); + GenericRecord data = datumReader.read(null, decoder); + + if(data.get(idField) != null) { + return data.get(idField).toString(); + } + } catch (IOException e) { + logger.error("Error parsing logger body", e); + } + return event.getHeaders().get(ID); + } + + @Override + public void configure(Context context) { + this.indexField = Util.getContextValue(context, ES_INDEX); + this.typeField = Util.getContextValue(context, ES_TYPE); + this.idField = Util.getContextValue(context, ES_ID); + logger.info("Simple Index builder, name [{}] typeIdentifier [{}] id [[]]", + new Object[]{this.indexField, this.typeField, this.idField}); + + avroSerializer = new AvroSerializer(); + avroSerializer.configure(context); + } +} \ No newline at end of file