diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 7ad4ae0164..572302c8d6 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -37,6 +37,8 @@
2.5.0
0.11.0-SNAPSHOT
0.98.7-hadoop2
+ 0.8.2.0
+ kafka_2.10
4.0.25.Final
2.6
${project.parent.relativePath}/..
@@ -763,6 +765,11 @@
tajo-storage-hbase
${tajo.version}
+
+ org.apache.tajo
+ tajo-storage-kafka
+ ${tajo.version}
+
org.apache.tajo
tajo-pullserver
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index e74c744864..4c13944900 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -35,6 +35,7 @@
tajo-storage-common
tajo-storage-hdfs
tajo-storage-hbase
+ tajo-storage-kafka
diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml
new file mode 100644
index 0000000000..8ad2c18750
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/pom.xml
@@ -0,0 +1,257 @@
+
+
+
+
+
+ tajo-project
+ org.apache.tajo
+ 0.11.0-SNAPSHOT
+ ../../tajo-project
+
+ 4.0.0
+
+ tajo-storage-kafka
+ jar
+ Tajo Kafka Storage
+
+ UTF-8
+ UTF-8
+
+
+
+
+ repository.jboss.org
+ https://repository.jboss.org/nexus/content/repositories/releases/
+
+
+ false
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.6
+ 1.6
+ ${project.build.sourceEncoding}
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ verify
+
+ check
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ TRUE
+
+ -Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+
+ test-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ create-protobuf-generated-sources-directory
+ initialize
+
+
+
+
+
+
+ run
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 1.2
+
+
+ generate-sources
+ generate-sources
+
+ protoc
+
+ -Isrc/main/proto/
+ --proto_path=../../tajo-common/src/main/proto
+ --proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto
+ --java_out=target/generated-sources/proto
+ src/main/proto/StorageFragmentProtos.proto
+
+
+
+ exec
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.5
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ target/generated-sources/proto
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-report-plugin
+
+
+
+
+
+
+ org.apache.tajo
+ tajo-common
+ provided
+
+
+ org.apache.tajo
+ tajo-catalog-common
+ provided
+
+
+ org.apache.tajo
+ tajo-plan
+ provided
+
+
+ org.apache.tajo
+ tajo-storage-common
+ provided
+
+
+ org.apache.tajo
+ tajo-storage-hdfs
+ provided
+
+
+ org.apache.kafka
+ ${kafka.artifactId}
+ ${kafka.version}
+ provided
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.6
+ test
+
+
+ io.airlift
+ testing
+ 0.88
+ test
+
+
+ com.101tec
+ zkclient
+ test
+ 0.4
+
+
+
+
+
+ docs
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+
+ module-javadocs
+ package
+
+ jar
+
+
+ ${project.build.directory}
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-report-plugin
+ 2.15
+
+
+
+
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaAppender.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaAppender.java
new file mode 100644
index 0000000000..0df4e4dd3c
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaAppender.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import java.io.IOException;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.Tuple;
+
+//is not supported yet.
+public class KafkaAppender implements Appender {
+
+ @Override
+ public void init() throws IOException {
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ throw new IOException("It is not supported.");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ throw new IOException("It is not supported.");
+ }
+
+ @Override
+ public long getEstimatedOutputSize() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void enableStats() {
+ }
+
+ @Override
+ public TableStats getStats() {
+ return null;
+ }
+
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java
new file mode 100644
index 0000000000..7ecf10fc0f
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.tajo.storage.StorageConstants;
+
+public class KafkaStorageConstants extends StorageConstants {
+ public static final String KAFKA_TOPIC = "kafka.topic";
+ public static final String KAFKA_BROKER = "kafka.broker";
+ public static final String KAFKA_TOPIC_PARTITION = "kafka.topic.partition";
+ public static final String DEFAULT_KAFKA_SERDE_CLASS = "org.apache.tajo.storage.kafka.serDe.KafkaTextSerDe";
+ public static final String KAFKA_SERDE_CLASS = "kafka.serde";
+ public static final String KAFKA_FRAGMENT_SIZE = "kafka.fragment.size";
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageManager.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageManager.java
new file mode 100644
index 0000000000..84ab1db691
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageManager.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageProperty;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.fragment.KafkaFragment;
+
+// StorageManager for Kafka topic.
+public class KafkaStorageManager extends StorageManager {
+ private static final Log LOG = LogFactory.getLog(KafkaStorageManager.class);
+ static final int FLAG_NOT_SKIP = 0;
+ static final int FLAG_UNLIMITED = 0;
+ static final String DEFAULT_PARTITION = "all";
+ static final String DEFAULT_FRAGMENT_SIZE = "10000";
+ // kafka connections pool.
+ private Map connMap;
+
+ public KafkaStorageManager(StoreType storeType) {
+ super(storeType);
+ }
+
+ @Override
+ public void closeStorageManager() {
+ // Close kafka connections of connMap.
+ synchronized (connMap) {
+ for (SimpleConsumerManager eachConn : connMap.values()) {
+ try {
+ eachConn.close();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void storageInit() throws IOException {
+ connMap = new HashMap();
+ }
+
+ @Override
+ public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
+ TableStats stats = new TableStats();
+ stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+ tableDesc.setStats(stats);
+ // Actuality kafka topic is not create.
+ }
+
+ // is not supported yet.
+ @Override
+ public void purgeTable(TableDesc tableDesc) throws IOException {
+ throw new IOException("Purge is not supported.");
+ }
+
+ @Override
+ public List getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException {
+ return getFragmentList(tableDesc, FLAG_NOT_SKIP, FLAG_UNLIMITED);
+ }
+
+ @Override
+ public List getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException {
+ int limitNum = numFragments + currentPage;
+ List fragments = getFragmentList(tableDesc, currentPage, limitNum);
+ if (fragments.size() > 0) {
+ return fragments;
+ } else { // If any more isn't fragment, End paging.
+ return new ArrayList(1);
+ }
+ }
+
+ @Override
+ public StorageProperty getStorageProperty() {
+ StorageProperty storageProperty = new StorageProperty();
+ storageProperty.setSortedInsert(false);
+ storageProperty.setSupportsInsertInto(false);
+ return storageProperty;
+ }
+
+ // is not supported.
+ @Override
+ public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, Schema inputSchema,
+ SortSpec[] sortSpecs, TupleRange dataRange) throws IOException {
+ return null;
+ }
+
+ // is not supported.
+ @Override
+ public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+ }
+
+ // is not supported.
+ @Override
+ public void rollbackOutputCommit(LogicalNode node) throws IOException {
+ }
+
+ // Split scan data into fragments in order to distributed processing. And
+ // return fragment list.
+ // One partition of kafka topic is one fragment.
+ // But, If message count of one partition is so big, One partition is split to
+ // several fragments.
+ private List getFragmentList(TableDesc tableDesc, int numSkip, int limitNum) throws IOException {
+ int fragmentCount = 0;
+ String topic = tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_TOPIC);
+ String brokers = tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_BROKER);
+ int fragmentSize = Integer.parseInt(tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE,
+ DEFAULT_FRAGMENT_SIZE));
+ // If isn't specific partitions, scan all partition of topic.
+ String partitions = tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_TOPIC_PARTITION, DEFAULT_PARTITION);
+ Set partitionSet = new HashSet();
+ if (partitions.equals(DEFAULT_PARTITION)) {
+ partitionSet = SimpleConsumerManager.getPartitions(brokers, topic);
+ } else {
+ for (String partitionId : partitions.split(",")) {
+ partitionSet.add(Integer.parseInt(partitionId));
+ }
+ }
+ List fragments = new ArrayList();
+ boolean isBreak = false; // For paging of nonForwdSplit
+ for (Integer partitionId : partitionSet) {
+ if (isBreak)
+ break;
+ SimpleConsumerManager simpleConsumerManager = getConnection(brokers, topic, partitionId);
+ long lastOffset = simpleConsumerManager.getReadOffset(kafka.api.OffsetRequest.LatestTime());
+ long startOffset = simpleConsumerManager.getReadOffset(kafka.api.OffsetRequest.EarliestTime());
+ long messageSize = lastOffset - startOffset;
+ if (0 == lastOffset || 0 == messageSize)
+ continue;
+
+ // If message count of partition is less than fragmentSize(message count
+ // of one fragment),
+ if (messageSize <= fragmentSize) {
+ fragmentCount++;
+ if (FLAG_NOT_SKIP == numSkip || fragmentCount > numSkip) {
+ KafkaFragment fragment = new KafkaFragment(tableDesc.getName(), topic, partitionId, brokers, startOffset,
+ lastOffset);
+ fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+ fragments.add(fragment);
+ }
+ if (FLAG_UNLIMITED != limitNum && fragmentCount >= limitNum) {
+ isBreak = true;
+ }
+ } else { // If message count of partition is greater than fragmentSize,
+ long nextFragmentStartOffset = startOffset;
+ while (nextFragmentStartOffset < lastOffset) {
+ long nextFragmentlastOffset = nextFragmentStartOffset + fragmentSize;
+ fragmentCount++;
+ if (FLAG_NOT_SKIP == numSkip || fragmentCount > numSkip) {
+ KafkaFragment fragment = new KafkaFragment(tableDesc.getName(), topic, partitionId, brokers,
+ nextFragmentStartOffset, nextFragmentlastOffset);
+ fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+ fragments.add(fragment);
+ }
+ nextFragmentStartOffset = nextFragmentlastOffset;
+ if (limitNum != FLAG_UNLIMITED && fragmentCount >= limitNum) {
+ isBreak = true;
+ break;
+ }
+ }
+ }
+ }
+ return fragments;
+ }
+
+ // Manage the kafka connection pool.
+ public SimpleConsumerManager getConnection(String seedBrokers, String topic, int partition) throws IOException {
+ String conKey = topic + "_" + partition;
+ synchronized (connMap) {
+ SimpleConsumerManager conn = connMap.get(conKey);
+ if (conn == null) {
+ conn = SimpleConsumerManager.getSimpleConsumerManager(seedBrokers, topic, partition);
+ connMap.put(conKey, conn);
+ }
+ return conn;
+ }
+
+ }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto
new file mode 100644
index 0000000000..ef53f61f38
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage.kafka";
+option java_outer_classname = "StorageFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message KafkaFragmentProto {
+ required string topicName = 1;
+ required string brokers = 2;
+ required int64 startOffset = 3;
+ required int64 lastOffset = 4;
+ required int32 partitionId = 5;
+ required int64 length = 6;
+}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java
new file mode 100644
index 0000000000..ef868518de
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+public class TestConstants {
+ final static int kafka_partition_num = 3;
+ final static String test_topic = "test-topic";
+ final static String[] test_data = { "1|abc|0.2", "2|def|0.4", "3|ghi|0.6", "4|jkl|0.8", "5|mno|1.0" };
+ final static String test_json_data = "{\"col1\":1, \"col2\":\"abc\", \"col3\":0.2}";
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaStorageManager.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaStorageManager.java
new file mode 100644
index 0000000000..9de2ece239
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaStorageManager.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import kafka.producer.KeyedMessage;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.testUtil.EmbeddedKafka;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestKafkaStorageManager {
+ static EmbeddedKafka em_kafka;
+
+ // Start up EmbeddedKafka and Generate test data.
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ em_kafka = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+ em_kafka.start();
+ em_kafka.createTopic(TestConstants.kafka_partition_num, 1, TestConstants.test_topic);
+ genDataForTest();
+ }
+
+ // Close EmbeddedKafka.
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ em_kafka.close();
+ em_kafka = null;
+ }
+
+ // Test for getNonForwardSplit.
+ @Test
+ public void testGetNonForwardSplit() throws Exception {
+ KafkaStorageManager ksm = new KafkaStorageManager(CatalogProtos.StoreType.CSV);
+ ksm.storageInit();
+ try {
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+ Map option = new java.util.HashMap();
+ option.put(KafkaStorageConstants.KAFKA_BROKER, em_kafka.getConnectString());
+ option.put(KafkaStorageConstants.KAFKA_TOPIC, TestConstants.test_topic);
+ option.put(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, "10");
+ meta.setOptions(new KeyValueSet(option));
+ TableDesc td = new TableDesc("test_table", null, meta, null);
+ // for 100 message. 10 message per one fragment. 5 fragments per one page.
+ int fragmentCount = 0;
+ int pageCount = 0;
+ int fragmentPerPage = 5;
+ while (true) {
+ List fragmentList = ksm.getNonForwardSplit(td, fragmentCount, fragmentPerPage);
+ fragmentCount += fragmentList.size();
+ if (fragmentList.size() == 0)
+ break;
+ pageCount += 1;
+ }
+ // 100 message = 2 page * ( 10 fragment * 5 )
+ assertTrue(100 == (pageCount * fragmentCount * fragmentPerPage));
+ } finally {
+ ksm.closeStorageManager();
+ }
+ }
+
+ // Test for getSplit.
+ @Test
+ public void testGetSplit() throws Exception {
+ KafkaStorageManager ksm = new KafkaStorageManager(CatalogProtos.StoreType.CSV);
+ ksm.storageInit();
+ try {
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+ Map option = new java.util.HashMap();
+ option.put(KafkaStorageConstants.KAFKA_BROKER, em_kafka.getConnectString());
+ option.put(KafkaStorageConstants.KAFKA_TOPIC, TestConstants.test_topic);
+ option.put(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, "10");
+ meta.setOptions(new KeyValueSet(option));
+ TableDesc td = new TableDesc("test_table", null, meta, null);
+ List fragmentList = ksm.getSplits("", td, null);
+ // 100 message = 10 fragments * KAFKA_FRAGMENT_SIZE
+ assertTrue(100 == (fragmentList.size() * 10));
+ } finally {
+ ksm.closeStorageManager();
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private static void genDataForTest() throws Exception {
+ kafka.javaapi.producer.Producer producer = null;
+ try {
+ producer = em_kafka.createProducer(em_kafka.getConnectString());
+ // Generate 100 message.
+ for (int i = 0; i < 20; i++) {
+ List> messageList = new ArrayList>();
+ messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[0]));
+ messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[1]));
+ messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[2]));
+ messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[3]));
+ messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[4]));
+ producer.send(messageList);
+ }
+ } finally {
+ if (null != producer) {
+ producer.close();
+ }
+ }
+ }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java
new file mode 100644
index 0000000000..a52546e34b
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka.testUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.testing.FileUtils.deleteRecursively;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import kafka.admin.AdminUtils;
+import kafka.producer.ProducerConfig;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZKStringSerializer$;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import com.google.common.io.Files;
+
+public class EmbeddedKafka implements Closeable {
+ private final EmbeddedZookeeper zookeeper;
+ private final int port;
+ private final File kafkaDataDir;
+ private final KafkaServerStartable kafka;
+
+ private final AtomicBoolean started = new AtomicBoolean();
+ private final AtomicBoolean stopped = new AtomicBoolean();
+
+ public static EmbeddedKafka createEmbeddedKafka(int zookeeperPort, int kafkaPort) throws IOException {
+ return new EmbeddedKafka(new EmbeddedZookeeper(zookeeperPort), kafkaPort);
+ }
+
+ EmbeddedKafka(EmbeddedZookeeper zookeeper, int kafkaPort) throws IOException {
+ this.zookeeper = checkNotNull(zookeeper, "zookeeper is null");
+
+ this.port = kafkaPort;
+ this.kafkaDataDir = Files.createTempDir();
+
+ Properties properties = new Properties();
+ properties.setProperty("broker.id", "0");
+ properties.setProperty("host.name", "localhost");
+ properties.setProperty("num.partitions", "2");
+ properties.setProperty("log.flush.interval.messages", "10000");
+ properties.setProperty("log.flush.interval.ms", "1000");
+ properties.setProperty("log.retention.minutes", "60");
+ properties.setProperty("log.segment.bytes", "1048576");
+ properties.setProperty("auto.create.topics.enable", "false");
+ properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ properties.setProperty("port", Integer.toString(port));
+ properties.setProperty("log.dirs", kafkaDataDir.getAbsolutePath());
+ properties.setProperty("zookeeper.connect", zookeeper.getConnectString());
+
+ KafkaConfig config = new KafkaConfig(properties);
+ this.kafka = new KafkaServerStartable(config);
+ }
+
+ public void start() throws InterruptedException, IOException {
+ if (!started.getAndSet(true)) {
+ zookeeper.start();
+ kafka.startup();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (started.get() && !stopped.getAndSet(true)) {
+ kafka.shutdown();
+ kafka.awaitShutdown();
+ zookeeper.close();
+ deleteRecursively(kafkaDataDir);
+ }
+ }
+
+ public int getZookeeperPort() {
+ return zookeeper.getPort();
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getConnectString() {
+ return "localhost:" + Integer.toString(port);
+ }
+
+ public String getZookeeperConnectString() {
+ return zookeeper.getConnectString();
+ }
+
+ public void createTopic(int partitions, int replication, String topic) {
+ checkState(started.get() && !stopped.get(), "not started!");
+
+ ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
+ try {
+ AdminUtils.createTopic(zkClient, topic, partitions, replication, new Properties());
+ } finally {
+ zkClient.close();
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public kafka.javaapi.producer.Producer createProducer(String connecting) {
+ Properties properties = new Properties();
+ properties.put("serializer.class", "kafka.serializer.StringEncoder");
+ properties.put("metadata.broker.list", connecting);
+ kafka.javaapi.producer.Producer producer;
+ producer = new kafka.javaapi.producer.Producer(new ProducerConfig(properties));
+ return producer;
+ }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java
new file mode 100644
index 0000000000..a6f2d73dfb
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka.testUtil;
+
+import static io.airlift.testing.FileUtils.deleteRecursively;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import com.google.common.io.Files;
+
+public class EmbeddedZookeeper implements Closeable {
+ private final int port;
+ private final File zkDataDir;
+ private final ZooKeeperServer zkServer;
+ private final NIOServerCnxnFactory cnxnFactory;
+
+ private final AtomicBoolean started = new AtomicBoolean();
+ private final AtomicBoolean stopped = new AtomicBoolean();
+
+ public EmbeddedZookeeper() throws IOException {
+ this(2181);
+ }
+
+ public EmbeddedZookeeper(int port) throws IOException {
+ this.port = port;
+ zkDataDir = Files.createTempDir();
+ zkServer = new ZooKeeperServer();
+
+ FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir);
+ zkServer.setTxnLogFactory(ftxn);
+
+ cnxnFactory = new NIOServerCnxnFactory();
+ cnxnFactory.configure(new InetSocketAddress(port), 0);
+ }
+
+ public void start() throws InterruptedException, IOException {
+ if (!started.getAndSet(true)) {
+ cnxnFactory.startup(zkServer);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (started.get() && !stopped.getAndSet(true)) {
+ cnxnFactory.shutdown();
+ try {
+ cnxnFactory.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (zkServer.isRunning()) {
+ zkServer.shutdown();
+ }
+ deleteRecursively(zkDataDir);
+ }
+ }
+
+ public String getConnectString() {
+ return "127.0.0.1:" + Integer.toString(port);
+ }
+
+ public int getPort() {
+ return port;
+ }
+}