diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 7ad4ae0164..572302c8d6 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -37,6 +37,8 @@ 2.5.0 0.11.0-SNAPSHOT 0.98.7-hadoop2 + 0.8.2.0 + kafka_2.10 4.0.25.Final 2.6 ${project.parent.relativePath}/.. @@ -763,6 +765,11 @@ tajo-storage-hbase ${tajo.version} + + org.apache.tajo + tajo-storage-kafka + ${tajo.version} + org.apache.tajo tajo-pullserver diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index e74c744864..4c13944900 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -35,6 +35,7 @@ tajo-storage-common tajo-storage-hdfs tajo-storage-hbase + tajo-storage-kafka diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml new file mode 100644 index 0000000000..8ad2c18750 --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/pom.xml @@ -0,0 +1,257 @@ + + + + + + tajo-project + org.apache.tajo + 0.11.0-SNAPSHOT + ../../tajo-project + + 4.0.0 + + tajo-storage-kafka + jar + Tajo Kafka Storage + + UTF-8 + UTF-8 + + + + + repository.jboss.org + https://repository.jboss.org/nexus/content/repositories/releases/ + + + false + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + TRUE + + -Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8 + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-protobuf-generated-sources-directory + initialize + + + + + + + run + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2 + + + generate-sources + generate-sources + + protoc + + -Isrc/main/proto/ + --proto_path=../../tajo-common/src/main/proto + --proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto + --java_out=target/generated-sources/proto + src/main/proto/StorageFragmentProtos.proto + + + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.5 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/proto + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + org.apache.tajo + tajo-common + provided + + + org.apache.tajo + tajo-catalog-common + provided + + + org.apache.tajo + tajo-plan + provided + + + org.apache.tajo + tajo-storage-common + provided + + + org.apache.tajo + tajo-storage-hdfs + provided + + + org.apache.kafka + ${kafka.artifactId} + ${kafka.version} + provided + + + junit + junit + test + + + org.apache.zookeeper + zookeeper + 3.4.6 + test + + + io.airlift + testing + 0.88 + test + + + com.101tec + zkclient + test + 0.4 + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaAppender.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaAppender.java new file mode 100644 index 0000000000..0df4e4dd3c --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaAppender.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +import java.io.IOException; + +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.Tuple; + +//is not supported yet. +public class KafkaAppender implements Appender { + + @Override + public void init() throws IOException { + } + + @Override + public void addTuple(Tuple t) throws IOException { + throw new IOException("It is not supported."); + } + + @Override + public void flush() throws IOException { + throw new IOException("It is not supported."); + } + + @Override + public long getEstimatedOutputSize() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + } + + @Override + public void enableStats() { + } + + @Override + public TableStats getStats() { + return null; + } + +} diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java new file mode 100644 index 0000000000..7ecf10fc0f --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +import org.apache.tajo.storage.StorageConstants; + +public class KafkaStorageConstants extends StorageConstants { + public static final String KAFKA_TOPIC = "kafka.topic"; + public static final String KAFKA_BROKER = "kafka.broker"; + public static final String KAFKA_TOPIC_PARTITION = "kafka.topic.partition"; + public static final String DEFAULT_KAFKA_SERDE_CLASS = "org.apache.tajo.storage.kafka.serDe.KafkaTextSerDe"; + public static final String KAFKA_SERDE_CLASS = "kafka.serde"; + public static final String KAFKA_FRAGMENT_SIZE = "kafka.fragment.size"; +} diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageManager.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageManager.java new file mode 100644 index 0000000000..84ab1db691 --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageManager.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageProperty; +import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.kafka.fragment.KafkaFragment; + +// StorageManager for Kafka topic. +public class KafkaStorageManager extends StorageManager { + private static final Log LOG = LogFactory.getLog(KafkaStorageManager.class); + static final int FLAG_NOT_SKIP = 0; + static final int FLAG_UNLIMITED = 0; + static final String DEFAULT_PARTITION = "all"; + static final String DEFAULT_FRAGMENT_SIZE = "10000"; + // kafka connections pool. + private Map connMap; + + public KafkaStorageManager(StoreType storeType) { + super(storeType); + } + + @Override + public void closeStorageManager() { + // Close kafka connections of connMap. + synchronized (connMap) { + for (SimpleConsumerManager eachConn : connMap.values()) { + try { + eachConn.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + } + + @Override + protected void storageInit() throws IOException { + connMap = new HashMap(); + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + TableStats stats = new TableStats(); + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + tableDesc.setStats(stats); + // Actuality kafka topic is not create. + } + + // is not supported yet. + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + throw new IOException("Purge is not supported."); + } + + @Override + public List getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException { + return getFragmentList(tableDesc, FLAG_NOT_SKIP, FLAG_UNLIMITED); + } + + @Override + public List getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException { + int limitNum = numFragments + currentPage; + List fragments = getFragmentList(tableDesc, currentPage, limitNum); + if (fragments.size() > 0) { + return fragments; + } else { // If any more isn't fragment, End paging. + return new ArrayList(1); + } + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(false); + storageProperty.setSupportsInsertInto(false); + return storageProperty; + } + + // is not supported. + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, Schema inputSchema, + SortSpec[] sortSpecs, TupleRange dataRange) throws IOException { + return null; + } + + // is not supported. + @Override + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + } + + // is not supported. + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + } + + // Split scan data into fragments in order to distributed processing. And + // return fragment list. + // One partition of kafka topic is one fragment. + // But, If message count of one partition is so big, One partition is split to + // several fragments. + private List getFragmentList(TableDesc tableDesc, int numSkip, int limitNum) throws IOException { + int fragmentCount = 0; + String topic = tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_TOPIC); + String brokers = tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_BROKER); + int fragmentSize = Integer.parseInt(tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, + DEFAULT_FRAGMENT_SIZE)); + // If isn't specific partitions, scan all partition of topic. + String partitions = tableDesc.getMeta().getOption(KafkaStorageConstants.KAFKA_TOPIC_PARTITION, DEFAULT_PARTITION); + Set partitionSet = new HashSet(); + if (partitions.equals(DEFAULT_PARTITION)) { + partitionSet = SimpleConsumerManager.getPartitions(brokers, topic); + } else { + for (String partitionId : partitions.split(",")) { + partitionSet.add(Integer.parseInt(partitionId)); + } + } + List fragments = new ArrayList(); + boolean isBreak = false; // For paging of nonForwdSplit + for (Integer partitionId : partitionSet) { + if (isBreak) + break; + SimpleConsumerManager simpleConsumerManager = getConnection(brokers, topic, partitionId); + long lastOffset = simpleConsumerManager.getReadOffset(kafka.api.OffsetRequest.LatestTime()); + long startOffset = simpleConsumerManager.getReadOffset(kafka.api.OffsetRequest.EarliestTime()); + long messageSize = lastOffset - startOffset; + if (0 == lastOffset || 0 == messageSize) + continue; + + // If message count of partition is less than fragmentSize(message count + // of one fragment), + if (messageSize <= fragmentSize) { + fragmentCount++; + if (FLAG_NOT_SKIP == numSkip || fragmentCount > numSkip) { + KafkaFragment fragment = new KafkaFragment(tableDesc.getName(), topic, partitionId, brokers, startOffset, + lastOffset); + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + fragments.add(fragment); + } + if (FLAG_UNLIMITED != limitNum && fragmentCount >= limitNum) { + isBreak = true; + } + } else { // If message count of partition is greater than fragmentSize, + long nextFragmentStartOffset = startOffset; + while (nextFragmentStartOffset < lastOffset) { + long nextFragmentlastOffset = nextFragmentStartOffset + fragmentSize; + fragmentCount++; + if (FLAG_NOT_SKIP == numSkip || fragmentCount > numSkip) { + KafkaFragment fragment = new KafkaFragment(tableDesc.getName(), topic, partitionId, brokers, + nextFragmentStartOffset, nextFragmentlastOffset); + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + fragments.add(fragment); + } + nextFragmentStartOffset = nextFragmentlastOffset; + if (limitNum != FLAG_UNLIMITED && fragmentCount >= limitNum) { + isBreak = true; + break; + } + } + } + } + return fragments; + } + + // Manage the kafka connection pool. + public SimpleConsumerManager getConnection(String seedBrokers, String topic, int partition) throws IOException { + String conKey = topic + "_" + partition; + synchronized (connMap) { + SimpleConsumerManager conn = connMap.get(conKey); + if (conn == null) { + conn = SimpleConsumerManager.getSimpleConsumerManager(seedBrokers, topic, partition); + connMap.put(conKey, conn); + } + return conn; + } + + } +} diff --git a/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto new file mode 100644 index 0000000000..ef53f61f38 --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.storage.kafka"; +option java_outer_classname = "StorageFragmentProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message KafkaFragmentProto { + required string topicName = 1; + required string brokers = 2; + required int64 startOffset = 3; + required int64 lastOffset = 4; + required int32 partitionId = 5; + required int64 length = 6; +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java new file mode 100644 index 0000000000..ef868518de --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestConstants.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +public class TestConstants { + final static int kafka_partition_num = 3; + final static String test_topic = "test-topic"; + final static String[] test_data = { "1|abc|0.2", "2|def|0.4", "3|ghi|0.6", "4|jkl|0.8", "5|mno|1.0" }; + final static String test_json_data = "{\"col1\":1, \"col2\":\"abc\", \"col3\":0.2}"; +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaStorageManager.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaStorageManager.java new file mode 100644 index 0000000000..9de2ece239 --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaStorageManager.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import kafka.producer.KeyedMessage; + +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.kafka.testUtil.EmbeddedKafka; +import org.apache.tajo.util.KeyValueSet; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestKafkaStorageManager { + static EmbeddedKafka em_kafka; + + // Start up EmbeddedKafka and Generate test data. + @BeforeClass + public static void setUpBeforeClass() throws Exception { + em_kafka = EmbeddedKafka.createEmbeddedKafka(2181, 9092); + em_kafka.start(); + em_kafka.createTopic(TestConstants.kafka_partition_num, 1, TestConstants.test_topic); + genDataForTest(); + } + + // Close EmbeddedKafka. + @AfterClass + public static void tearDownAfterClass() throws Exception { + em_kafka.close(); + em_kafka = null; + } + + // Test for getNonForwardSplit. + @Test + public void testGetNonForwardSplit() throws Exception { + KafkaStorageManager ksm = new KafkaStorageManager(CatalogProtos.StoreType.CSV); + ksm.storageInit(); + try { + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + Map option = new java.util.HashMap(); + option.put(KafkaStorageConstants.KAFKA_BROKER, em_kafka.getConnectString()); + option.put(KafkaStorageConstants.KAFKA_TOPIC, TestConstants.test_topic); + option.put(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, "10"); + meta.setOptions(new KeyValueSet(option)); + TableDesc td = new TableDesc("test_table", null, meta, null); + // for 100 message. 10 message per one fragment. 5 fragments per one page. + int fragmentCount = 0; + int pageCount = 0; + int fragmentPerPage = 5; + while (true) { + List fragmentList = ksm.getNonForwardSplit(td, fragmentCount, fragmentPerPage); + fragmentCount += fragmentList.size(); + if (fragmentList.size() == 0) + break; + pageCount += 1; + } + // 100 message = 2 page * ( 10 fragment * 5 ) + assertTrue(100 == (pageCount * fragmentCount * fragmentPerPage)); + } finally { + ksm.closeStorageManager(); + } + } + + // Test for getSplit. + @Test + public void testGetSplit() throws Exception { + KafkaStorageManager ksm = new KafkaStorageManager(CatalogProtos.StoreType.CSV); + ksm.storageInit(); + try { + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + Map option = new java.util.HashMap(); + option.put(KafkaStorageConstants.KAFKA_BROKER, em_kafka.getConnectString()); + option.put(KafkaStorageConstants.KAFKA_TOPIC, TestConstants.test_topic); + option.put(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, "10"); + meta.setOptions(new KeyValueSet(option)); + TableDesc td = new TableDesc("test_table", null, meta, null); + List fragmentList = ksm.getSplits("", td, null); + // 100 message = 10 fragments * KAFKA_FRAGMENT_SIZE + assertTrue(100 == (fragmentList.size() * 10)); + } finally { + ksm.closeStorageManager(); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static void genDataForTest() throws Exception { + kafka.javaapi.producer.Producer producer = null; + try { + producer = em_kafka.createProducer(em_kafka.getConnectString()); + // Generate 100 message. + for (int i = 0; i < 20; i++) { + List> messageList = new ArrayList>(); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[0])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[1])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[2])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[3])); + messageList.add(new KeyedMessage(TestConstants.test_topic, TestConstants.test_data[4])); + producer.send(messageList); + } + } finally { + if (null != producer) { + producer.close(); + } + } + } +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java new file mode 100644 index 0000000000..a52546e34b --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedKafka.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka.testUtil; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.airlift.testing.FileUtils.deleteRecursively; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import kafka.admin.AdminUtils; +import kafka.producer.ProducerConfig; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; + +import org.I0Itec.zkclient.ZkClient; + +import com.google.common.io.Files; + +public class EmbeddedKafka implements Closeable { + private final EmbeddedZookeeper zookeeper; + private final int port; + private final File kafkaDataDir; + private final KafkaServerStartable kafka; + + private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean stopped = new AtomicBoolean(); + + public static EmbeddedKafka createEmbeddedKafka(int zookeeperPort, int kafkaPort) throws IOException { + return new EmbeddedKafka(new EmbeddedZookeeper(zookeeperPort), kafkaPort); + } + + EmbeddedKafka(EmbeddedZookeeper zookeeper, int kafkaPort) throws IOException { + this.zookeeper = checkNotNull(zookeeper, "zookeeper is null"); + + this.port = kafkaPort; + this.kafkaDataDir = Files.createTempDir(); + + Properties properties = new Properties(); + properties.setProperty("broker.id", "0"); + properties.setProperty("host.name", "localhost"); + properties.setProperty("num.partitions", "2"); + properties.setProperty("log.flush.interval.messages", "10000"); + properties.setProperty("log.flush.interval.ms", "1000"); + properties.setProperty("log.retention.minutes", "60"); + properties.setProperty("log.segment.bytes", "1048576"); + properties.setProperty("auto.create.topics.enable", "false"); + properties.setProperty("zookeeper.connection.timeout.ms", "1000000"); + properties.setProperty("port", Integer.toString(port)); + properties.setProperty("log.dirs", kafkaDataDir.getAbsolutePath()); + properties.setProperty("zookeeper.connect", zookeeper.getConnectString()); + + KafkaConfig config = new KafkaConfig(properties); + this.kafka = new KafkaServerStartable(config); + } + + public void start() throws InterruptedException, IOException { + if (!started.getAndSet(true)) { + zookeeper.start(); + kafka.startup(); + } + } + + @Override + public void close() { + if (started.get() && !stopped.getAndSet(true)) { + kafka.shutdown(); + kafka.awaitShutdown(); + zookeeper.close(); + deleteRecursively(kafkaDataDir); + } + } + + public int getZookeeperPort() { + return zookeeper.getPort(); + } + + public int getPort() { + return port; + } + + public String getConnectString() { + return "localhost:" + Integer.toString(port); + } + + public String getZookeeperConnectString() { + return zookeeper.getConnectString(); + } + + public void createTopic(int partitions, int replication, String topic) { + checkState(started.get() && !stopped.get(), "not started!"); + + ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$); + try { + AdminUtils.createTopic(zkClient, topic, partitions, replication, new Properties()); + } finally { + zkClient.close(); + } + } + + @SuppressWarnings("rawtypes") + public kafka.javaapi.producer.Producer createProducer(String connecting) { + Properties properties = new Properties(); + properties.put("serializer.class", "kafka.serializer.StringEncoder"); + properties.put("metadata.broker.list", connecting); + kafka.javaapi.producer.Producer producer; + producer = new kafka.javaapi.producer.Producer(new ProducerConfig(properties)); + return producer; + } +} diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java new file mode 100644 index 0000000000..a6f2d73dfb --- /dev/null +++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/testUtil/EmbeddedZookeeper.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.kafka.testUtil; + +import static io.airlift.testing.FileUtils.deleteRecursively; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import com.google.common.io.Files; + +public class EmbeddedZookeeper implements Closeable { + private final int port; + private final File zkDataDir; + private final ZooKeeperServer zkServer; + private final NIOServerCnxnFactory cnxnFactory; + + private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean stopped = new AtomicBoolean(); + + public EmbeddedZookeeper() throws IOException { + this(2181); + } + + public EmbeddedZookeeper(int port) throws IOException { + this.port = port; + zkDataDir = Files.createTempDir(); + zkServer = new ZooKeeperServer(); + + FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir); + zkServer.setTxnLogFactory(ftxn); + + cnxnFactory = new NIOServerCnxnFactory(); + cnxnFactory.configure(new InetSocketAddress(port), 0); + } + + public void start() throws InterruptedException, IOException { + if (!started.getAndSet(true)) { + cnxnFactory.startup(zkServer); + } + } + + @Override + public void close() { + if (started.get() && !stopped.getAndSet(true)) { + cnxnFactory.shutdown(); + try { + cnxnFactory.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (zkServer.isRunning()) { + zkServer.shutdown(); + } + deleteRecursively(zkDataDir); + } + } + + public String getConnectString() { + return "127.0.0.1:" + Integer.toString(port); + } + + public int getPort() { + return port; + } +}