diff --git a/README.md b/README.md
index b4df69a..c29181d 100644
--- a/README.md
+++ b/README.md
@@ -40,13 +40,13 @@ Required properties are in bold.
| es.client.transport.nodes_sampler_interval | 5s | How often to sample / ping the nodes listed and connected |
| es.index | default | Index name to be used to store the documents |
| es.type | default | Type to be used to store the documents |
-| es.index.builder |com.cognitree.
flume.sink.
elasticsearch.
StaticIndexBuilder | Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface |
+| es.index.builder |com.cognitree.
flume.sink.
elasticsearch.
StaticIndexBuilder | Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface|
| es.serializer |com.cognitree.
flume.sink.
elasticsearch.
SimpleSerializer | Implementation of com.cognitree.flume.sink.elasticsearch.Serializer interface |
| es.serializer.csv.fields | - | Comma separated csv field name with data type i.e. column1:type1,column2:type2, Supported data types are string, boolean, int and float |
| es.serializer.csv.delimiter | ,(comma) | Delimiter for the data in flume event body|
| es.serializer.avro.schema.file | - | Absolute path for the schema configuration file |
-Example of agent named agent
+Example of agent named agent:
````
agent.channels = es_channel
@@ -74,3 +74,17 @@ Example of agent named agent
agent.sinks.es_sink.es.serializer.csv.delimiter=,
agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc
````
+
+#### Available index builders
+
+##### com.cognitree.flume.sink.elasticsearch.HeaderBasedIndexBuilder
+
+This builder doesn't have configurable parameters. You need to put FlumeEvent headers header called 'index' to customize target index name (default index name: 'default') and 'type' to customise target document type (default document type: 'default').
+
+##### com.cognitree.flume.sink.elasticsearch.TimestampBasedIndexBuilder
+This builder uses *timestamp* or *@timestamp* header, which expected to be a unix timestamp in milliseconds, to build index name, e.g. you can create names like: _my-awesome-index-2019-01-01_.
+
+| Property Name | Default | Description |
+|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------|
+| es.index.builder.date.format | - | Sets a format of date postfix you want to have. Supports ISO 8601 standart. If it's not set - no date postfix will be created|
+| es.index.builder.date.timeZone | UTC | Sets a timezone which will be used to parse the timestamp (uses _TimeZone.getTimeZone()_, so it supports formats like: 'UTC', 'UTC+03:00', 'Europe/Samara' and etc.
Ignored if *date.format* is not set.|
\ No newline at end of file
diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java
index ad92081..2f2f8c1 100644
--- a/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java
+++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/Constants.java
@@ -62,6 +62,9 @@ public class Constants {
public static final String ES_INDEX_BUILDER = "es.index.builder";
public static final String DEFAULT_ES_INDEX_BUILDER = "com.cognitree.flume.sink.elasticsearch.StaticIndexBuilder";
+ public static final String ES_INDEX_BUILDER_DATE_FORMAT = "es.index.builder.date.format";
+ public static final String ES_INDEX_BUILDER_DATE_TIME_ZONE = "es.index.builder.date.timeZone";
+ public static final String DEFAULT_ES_INDEX_BUILDER_DATE_TIME_ZONE = "UTC";
public static final String ES_SERIALIZER = "es.serializer";
public static final String DEFAULT_ES_SERIALIZER = "com.cognitree.flume.sink.elasticsearch.SimpleSerializer";
diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampBasedIndexBuilder.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampBasedIndexBuilder.java
new file mode 100644
index 0000000..2e9b51e
--- /dev/null
+++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampBasedIndexBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2017 Cognitree Technologies
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.cognitree.flume.sink.elasticsearch;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.formatter.output.BucketPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TimeZone;
+
+import static com.cognitree.flume.sink.elasticsearch.Constants.*;
+
+public class TimestampBasedIndexBuilder implements IndexBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimestampBasedIndexBuilder.class);
+
+ private String index;
+ private String type;
+ private String dateFormat;
+ private String dateTimeZone;
+
+ private FastDateFormat fastDateFormat = null;
+
+ @Override
+ public String getIndex(Event event) {
+ TimestampedEvent timestampedEvent = new TimestampedEvent(event);
+ long timestamp = timestampedEvent.getTimestamp();
+
+ String indexName = index;
+ if (indexName == null) {
+ indexName = DEFAULT_ES_INDEX;
+ }
+ indexName = BucketPath.escapeString(indexName, event.getHeaders());
+
+ String timestampSuffix = "";
+ if (fastDateFormat != null) {
+ timestampSuffix = new StringBuilder(fastDateFormat.format(timestamp)).insert(0, '-').toString();
+ }
+
+ return new StringBuilder(indexName).append(timestampSuffix).toString();
+ }
+
+ @Override
+ public String getType(Event event) {
+ String type;
+ if (this.type != null) {
+ type = this.type;
+ } else {
+ type = DEFAULT_ES_TYPE;
+ }
+ return type;
+ }
+
+ @Override
+ public String getId(Event event) {
+ return null;
+ }
+
+ @Override
+ public void configure(Context context) {
+ this.index = Util.getContextValue(context, ES_INDEX);
+ if (this.index == null) {
+ this.index = DEFAULT_ES_INDEX;
+ }
+ this.type = Util.getContextValue(context, ES_TYPE);
+ this.dateFormat = Util.getContextValue(context, ES_INDEX_BUILDER_DATE_FORMAT);
+ this.dateTimeZone = Util.getContextValue(context, ES_INDEX_BUILDER_DATE_TIME_ZONE);
+
+ if (this.dateFormat != null) {
+ if (this.dateTimeZone == null) {
+ this.dateTimeZone = DEFAULT_ES_INDEX_BUILDER_DATE_TIME_ZONE;
+ }
+
+ fastDateFormat = FastDateFormat.getInstance(dateFormat, TimeZone.getTimeZone(dateTimeZone));
+ }
+ logger.info("Simple Index builder, name [{}] type [{}] date format [{}] date time zone [{}] ",
+ new Object[]{this.index, this.type, this.dateFormat, this.dateTimeZone});
+ }
+}
diff --git a/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampedEvent.java b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampedEvent.java
new file mode 100644
index 0000000..1f4bfd1
--- /dev/null
+++ b/src/main/java/com/cognitree/flume/sink/elasticsearch/TimestampedEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 Cognitree Technologies
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.cognitree.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+
+import java.util.Map;
+
+final public class TimestampedEvent extends SimpleEvent {
+ private final long timestamp;
+
+ TimestampedEvent(Event base) {
+ setBody(base.getBody());
+ Map headers = Maps.newHashMap(base.getHeaders());
+ String timestampString = headers.get("timestamp");
+ if (StringUtils.isBlank(timestampString)) {
+ timestampString = headers.get("@timestamp");
+ }
+ if (StringUtils.isBlank(timestampString)) {
+ this.timestamp = DateTimeUtils.currentTimeMillis();
+ headers.put("timestamp", String.valueOf(timestamp ));
+ } else {
+ this.timestamp = Long.valueOf(timestampString);
+ }
+ setHeaders(headers);
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+}
diff --git a/src/test/java/com/cognitree/flume/sink/elasticsearch/TestTimestampBasedIndexBuilder.java b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestTimestampBasedIndexBuilder.java
new file mode 100644
index 0000000..6558eb5
--- /dev/null
+++ b/src/test/java/com/cognitree/flume/sink/elasticsearch/TestTimestampBasedIndexBuilder.java
@@ -0,0 +1,91 @@
+package com.cognitree.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.cognitree.flume.sink.elasticsearch.Constants.*;
+import static org.junit.Assert.*;
+
+public class TestTimestampBasedIndexBuilder {
+
+ private TimestampBasedIndexBuilder timestampBasedIndexBuilder;
+ private String index = "es-index";
+ private String type = "es-type";
+ private String dateFormat = "yyyy-MM-dd-HH";
+
+
+ @Before
+ public void init() throws Exception {
+ timestampBasedIndexBuilder = new TimestampBasedIndexBuilder();
+ }
+
+ @Test
+ public void testDefaultIndex() {
+ Event event = new SimpleEvent();
+ assertEquals(DEFAULT_ES_INDEX, timestampBasedIndexBuilder.getIndex(event));
+ assertEquals(DEFAULT_ES_TYPE, timestampBasedIndexBuilder.getType(event));
+ }
+
+ @Test
+ public void testTimestampedIndex() {
+ Event event = new SimpleEvent();
+
+ Context context = new Context();
+
+ context.put(ES_INDEX, index);
+ context.put(ES_TYPE, type);
+ context.put(ES_INDEX_BUILDER_DATE_FORMAT, dateFormat);
+
+ Map headers = new HashMap();
+ headers.put("timestamp", "1546350162000"); // 2019-01-01 13:42:42.000 UTC
+ event.setHeaders(headers);
+
+ timestampBasedIndexBuilder.configure(context);
+
+ String expectedIndexName = new StringBuilder(index).append('-').append("2019-01-01-13").toString();
+
+ assertEquals(expectedIndexName, timestampBasedIndexBuilder.getIndex(event));
+ }
+
+ @Test
+ public void testTimestampedWithTZIndex() {
+ Event event = new SimpleEvent();
+
+ Context context = new Context();
+
+ context.put(ES_INDEX, index);
+ context.put(ES_TYPE, type);
+ context.put(ES_INDEX_BUILDER_DATE_FORMAT, dateFormat);
+ context.put(ES_INDEX_BUILDER_DATE_TIME_ZONE, "Europe/Moscow"); // UTC+3
+
+ Map headers = new HashMap();
+ headers.put("@timestamp", "1546350162000"); // 2019-01-01 13:42:42.000 UTC
+ event.setHeaders(headers);
+
+ timestampBasedIndexBuilder.configure(context);
+
+ String expectedIndexName = new StringBuilder(index).append('-').append("2019-01-01-16").toString();
+
+ assertEquals(expectedIndexName, timestampBasedIndexBuilder.getIndex(event));
+ }
+
+ @Test
+ public void testConfigurationIndex() {
+ Event event = new SimpleEvent();
+ Context context = new Context();
+
+ context.put(ES_INDEX, index);
+ context.put(ES_TYPE, type);
+
+ timestampBasedIndexBuilder.configure(context);
+
+ assertEquals(index, timestampBasedIndexBuilder.getIndex(event));
+ assertEquals(type, timestampBasedIndexBuilder.getType(event));
+ }
+}
\ No newline at end of file