diff --git a/README.md b/README.md index b4df69a..c29181d 100644 --- a/README.md +++ b/README.md @@ -40,13 +40,13 @@ Required properties are in bold. | es.client.transport.nodes_sampler_interval | 5s | How often to sample / ping the nodes listed and connected | | es.index | default | Index name to be used to store the documents | | es.type | default | Type to be used to store the documents | -| es.index.builder |com.cognitree.
flume.sink.
elasticsearch.
StaticIndexBuilder | Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface | +| es.index.builder |com.cognitree.
flume.sink.
elasticsearch.
StaticIndexBuilder | Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface| | es.serializer |com.cognitree.
flume.sink.
elasticsearch.
SimpleSerializer | Implementation of com.cognitree.flume.sink.elasticsearch.Serializer interface | | es.serializer.csv.fields | - | Comma separated csv field name with data type i.e. column1:type1,column2:type2, Supported data types are string, boolean, int and float | | es.serializer.csv.delimiter | ,(comma) | Delimiter for the data in flume event body| | es.serializer.avro.schema.file | - | Absolute path for the schema configuration file | -Example of agent named agent +Example of agent named agent: ```` agent.channels = es_channel @@ -74,3 +74,17 @@ Example of agent named agent agent.sinks.es_sink.es.serializer.csv.delimiter=, agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc ```` + +#### Available index builders + +##### com.cognitree.flume.sink.elasticsearch.HeaderBasedIndexBuilder + +This builder doesn't have configurable parameters. You need to put FlumeEvent headers header called 'index' to customize target index name (default index name: 'default') and 'type' to customise target document type (default document type: 'default'). + +##### com.cognitree.flume.sink.elasticsearch.TimestampBasedIndexBuilder +This builder uses *timestamp* or *@timestamp* header, which expected to be a unix timestamp in milliseconds, to build index name, e.g. you can create names like: _my-awesome-index-2019-01-01_. + +| Property Name | Default | Description | +|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------| +| es.index.builder.date.format | - | Sets a format of date postfix you want to have. Supports ISO 8601 standart. If it's not set - no date postfix will be created| +| es.index.builder.date.timeZone | UTC | Sets a timezone which will be used to parse the timestamp (uses _TimeZone.getTimeZone()_, so it supports formats like: 'UTC', 'UTC+03:00', 'Europe/Samara' and etc.
Ignored if *date.format* is not set.| \ No newline at end of file diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java index ad92081..2f2f8c1 100644 --- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java @@ -62,6 +62,9 @@ public class Constants { public static final String ES_INDEX_BUILDER = "es.index.builder"; public static final String DEFAULT_ES_INDEX_BUILDER = "com.cognitree.flume.sink.elasticsearch.StaticIndexBuilder"; + public static final String ES_INDEX_BUILDER_DATE_FORMAT = "es.index.builder.date.format"; + public static final String ES_INDEX_BUILDER_DATE_TIME_ZONE = "es.index.builder.date.timeZone"; + public static final String DEFAULT_ES_INDEX_BUILDER_DATE_TIME_ZONE = "UTC"; public static final String ES_SERIALIZER = "es.serializer"; public static final String DEFAULT_ES_SERIALIZER = "com.cognitree.flume.sink.elasticsearch.SimpleSerializer"; diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampBasedIndexBuilder.java new file mode 100644 index 0000000..2e9b51e --- /dev/null +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampBasedIndexBuilder.java @@ -0,0 +1,96 @@ +/* + * Copyright 2017 Cognitree Technologies + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.cognitree.flume.sink.elasticsearch; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.formatter.output.BucketPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.TimeZone; + +import static com.cognitree.flume.sink.elasticsearch.Constants.*; + +public class TimestampBasedIndexBuilder implements IndexBuilder { + + private static final Logger logger = LoggerFactory.getLogger(TimestampBasedIndexBuilder.class); + + private String index; + private String type; + private String dateFormat; + private String dateTimeZone; + + private FastDateFormat fastDateFormat = null; + + @Override + public String getIndex(Event event) { + TimestampedEvent timestampedEvent = new TimestampedEvent(event); + long timestamp = timestampedEvent.getTimestamp(); + + String indexName = index; + if (indexName == null) { + indexName = DEFAULT_ES_INDEX; + } + indexName = BucketPath.escapeString(indexName, event.getHeaders()); + + String timestampSuffix = ""; + if (fastDateFormat != null) { + timestampSuffix = new StringBuilder(fastDateFormat.format(timestamp)).insert(0, '-').toString(); + } + + return new StringBuilder(indexName).append(timestampSuffix).toString(); + } + + @Override + public String getType(Event event) { + String type; + if (this.type != null) { + type = this.type; + } else { + type = DEFAULT_ES_TYPE; + } + return type; + } + + @Override + public String getId(Event event) { + return null; + } + + @Override + public void configure(Context context) { + this.index = Util.getContextValue(context, ES_INDEX); + if (this.index == null) { + this.index = DEFAULT_ES_INDEX; + } + this.type = Util.getContextValue(context, ES_TYPE); + this.dateFormat = Util.getContextValue(context, ES_INDEX_BUILDER_DATE_FORMAT); + this.dateTimeZone = Util.getContextValue(context, ES_INDEX_BUILDER_DATE_TIME_ZONE); + + if (this.dateFormat != null) { + if (this.dateTimeZone == null) { + this.dateTimeZone = DEFAULT_ES_INDEX_BUILDER_DATE_TIME_ZONE; + } + + fastDateFormat = FastDateFormat.getInstance(dateFormat, TimeZone.getTimeZone(dateTimeZone)); + } + logger.info("Simple Index builder, name [{}] type [{}] date format [{}] date time zone [{}] ", + new Object[]{this.index, this.type, this.dateFormat, this.dateTimeZone}); + } +} diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampedEvent.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampedEvent.java new file mode 100644 index 0000000..1f4bfd1 --- /dev/null +++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampedEvent.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017 Cognitree Technologies + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.cognitree.flume.sink.elasticsearch; + +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Event; +import org.apache.flume.event.SimpleEvent; +import org.joda.time.DateTimeUtils; + +import java.util.Map; + +final public class TimestampedEvent extends SimpleEvent { + private final long timestamp; + + TimestampedEvent(Event base) { + setBody(base.getBody()); + Map headers = Maps.newHashMap(base.getHeaders()); + String timestampString = headers.get("timestamp"); + if (StringUtils.isBlank(timestampString)) { + timestampString = headers.get("@timestamp"); + } + if (StringUtils.isBlank(timestampString)) { + this.timestamp = DateTimeUtils.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp )); + } else { + this.timestamp = Long.valueOf(timestampString); + } + setHeaders(headers); + } + + long getTimestamp() { + return timestamp; + } +} diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestTimestampBasedIndexBuilder.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestTimestampBasedIndexBuilder.java new file mode 100644 index 0000000..6558eb5 --- /dev/null +++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestTimestampBasedIndexBuilder.java @@ -0,0 +1,91 @@ +package com.cognitree.flume.sink.elasticsearch; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.SimpleEvent; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.cognitree.flume.sink.elasticsearch.Constants.*; +import static org.junit.Assert.*; + +public class TestTimestampBasedIndexBuilder { + + private TimestampBasedIndexBuilder timestampBasedIndexBuilder; + private String index = "es-index"; + private String type = "es-type"; + private String dateFormat = "yyyy-MM-dd-HH"; + + + @Before + public void init() throws Exception { + timestampBasedIndexBuilder = new TimestampBasedIndexBuilder(); + } + + @Test + public void testDefaultIndex() { + Event event = new SimpleEvent(); + assertEquals(DEFAULT_ES_INDEX, timestampBasedIndexBuilder.getIndex(event)); + assertEquals(DEFAULT_ES_TYPE, timestampBasedIndexBuilder.getType(event)); + } + + @Test + public void testTimestampedIndex() { + Event event = new SimpleEvent(); + + Context context = new Context(); + + context.put(ES_INDEX, index); + context.put(ES_TYPE, type); + context.put(ES_INDEX_BUILDER_DATE_FORMAT, dateFormat); + + Map headers = new HashMap(); + headers.put("timestamp", "1546350162000"); // 2019-01-01 13:42:42.000 UTC + event.setHeaders(headers); + + timestampBasedIndexBuilder.configure(context); + + String expectedIndexName = new StringBuilder(index).append('-').append("2019-01-01-13").toString(); + + assertEquals(expectedIndexName, timestampBasedIndexBuilder.getIndex(event)); + } + + @Test + public void testTimestampedWithTZIndex() { + Event event = new SimpleEvent(); + + Context context = new Context(); + + context.put(ES_INDEX, index); + context.put(ES_TYPE, type); + context.put(ES_INDEX_BUILDER_DATE_FORMAT, dateFormat); + context.put(ES_INDEX_BUILDER_DATE_TIME_ZONE, "Europe/Moscow"); // UTC+3 + + Map headers = new HashMap(); + headers.put("@timestamp", "1546350162000"); // 2019-01-01 13:42:42.000 UTC + event.setHeaders(headers); + + timestampBasedIndexBuilder.configure(context); + + String expectedIndexName = new StringBuilder(index).append('-').append("2019-01-01-16").toString(); + + assertEquals(expectedIndexName, timestampBasedIndexBuilder.getIndex(event)); + } + + @Test + public void testConfigurationIndex() { + Event event = new SimpleEvent(); + Context context = new Context(); + + context.put(ES_INDEX, index); + context.put(ES_TYPE, type); + + timestampBasedIndexBuilder.configure(context); + + assertEquals(index, timestampBasedIndexBuilder.getIndex(event)); + assertEquals(type, timestampBasedIndexBuilder.getType(event)); + } +} \ No newline at end of file