Skip to content

Commit a7dccef

Browse files
authored
fixes #53 (#56)
* fixes #53 * tries to test * PR review changes + tests for Elastic index refresh
1 parent 93e8b76 commit a7dccef

File tree

6 files changed

+194
-5
lines changed

6 files changed

+194
-5
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,8 @@
22
/target/
33
/.idea/
44
/lib/
5+
/.vscode/
6+
/.settings/
7+
.factorypath
8+
.classpath
9+
.project

src/main/java/com/github/dariobalinzo/ElasticSourceConnector.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.github.dariobalinzo.elastic.ElasticConnection;
2020
import com.github.dariobalinzo.elastic.ElasticConnectionBuilder;
2121
import com.github.dariobalinzo.elastic.ElasticRepository;
22+
import com.github.dariobalinzo.elastic.ElasticIndexMonitorThread;
2223
import com.github.dariobalinzo.task.ElasticSourceTask;
2324
import org.apache.kafka.common.config.ConfigDef;
2425
import org.apache.kafka.common.config.ConfigException;
@@ -32,11 +33,14 @@
3233

3334
public class ElasticSourceConnector extends SourceConnector {
3435
private static Logger logger = LoggerFactory.getLogger(ElasticSourceConnector.class);
36+
private static final long MAX_TIMEOUT = 10000L;
37+
private static final long POLL_MILISSECONDS = 5000L;
3538

3639
private ElasticSourceConnectorConfig config;
3740
private ElasticConnection elasticConnection;
3841
private ElasticRepository elasticRepository;
3942
private Map<String, String> configProperties;
43+
private ElasticIndexMonitorThread indexMonitorThread;
4044

4145
@Override
4246
public String version() {
@@ -96,6 +100,9 @@ public void start(Map<String, String> props) {
96100
}
97101

98102
elasticRepository = new ElasticRepository(elasticConnection);
103+
104+
indexMonitorThread = new ElasticIndexMonitorThread(context, POLL_MILISSECONDS, elasticRepository, config.getString(ElasticSourceConnectorConfig.INDEX_PREFIX_CONFIG));
105+
indexMonitorThread.start();
99106
}
100107

101108
@Override
@@ -121,9 +128,7 @@ private List<Map<String, String>> generateTaskFromFixedList(List<String> indices
121128
}
122129

123130
private List<Map<String, String>> findTaskFromIndexPrefix(int maxTasks) {
124-
List<String> currentIndexes = elasticRepository.catIndices(
125-
config.getString(ElasticSourceConnectorConfig.INDEX_PREFIX_CONFIG)
126-
);
131+
List<String> currentIndexes = indexMonitorThread.indexes();
127132
return groupIndicesToTasksConfig(maxTasks, currentIndexes);
128133
}
129134

@@ -143,6 +148,12 @@ private List<Map<String, String>> groupIndicesToTasksConfig(int maxTasks, List<S
143148
@Override
144149
public void stop() {
145150
logger.info("stopping elastic source");
151+
indexMonitorThread.shutdown();
152+
try {
153+
indexMonitorThread.join(MAX_TIMEOUT);
154+
} catch (InterruptedException e) {
155+
// Ignore, shouldn't be interrupted
156+
}
146157
elasticConnection.closeQuietly();
147158
}
148159

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.github.dariobalinzo.elastic;
2+
3+
import org.apache.kafka.connect.connector.ConnectorContext;
4+
import org.apache.kafka.connect.errors.ConnectException;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.Arrays;
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.TimeUnit;
13+
14+
15+
/**
16+
* Thread that monitors Elastic for changes to the set of topics.
17+
*/
18+
public class ElasticIndexMonitorThread extends Thread {
19+
private static final Logger log = LoggerFactory.getLogger(ElasticIndexMonitorThread.class);
20+
private static final long timeout = 10000L;
21+
22+
private final ConnectorContext context;
23+
private final CountDownLatch shutdownLatch;
24+
private final long pollMs;
25+
private final ElasticRepository elasticRepository;
26+
private final String prefix;
27+
private List<String> indexes;
28+
29+
public ElasticIndexMonitorThread(ConnectorContext context, long pollMs, ElasticRepository elasticRepository, String prefix) {
30+
this.context = context;
31+
this.shutdownLatch = new CountDownLatch(1);
32+
this.pollMs = pollMs;
33+
this.elasticRepository = elasticRepository;
34+
this.prefix = prefix;
35+
this.indexes = new ArrayList<>();
36+
}
37+
38+
public static long getTimeout() {
39+
return timeout;
40+
}
41+
42+
@Override
43+
public void run() {
44+
while (shutdownLatch.getCount() > 0) {
45+
try {
46+
if (updateIndexes()) {
47+
context.requestTaskReconfiguration();
48+
}
49+
} catch (Exception e) {
50+
context.raiseError(e);
51+
throw e;
52+
}
53+
54+
try {
55+
boolean shuttingDown = shutdownLatch.await(pollMs, TimeUnit.MILLISECONDS);
56+
if (shuttingDown) {
57+
return;
58+
}
59+
} catch (InterruptedException e) {
60+
log.error("Unexpected InterruptedException, ignoring: ", e);
61+
}
62+
}
63+
}
64+
65+
public synchronized List<String> indexes() {
66+
67+
long started = System.currentTimeMillis();
68+
long now = started;
69+
while (indexes.size() == 0 && now - started < timeout) {
70+
try {
71+
wait(timeout - (now - started));
72+
} catch (InterruptedException e) {
73+
// Ignore
74+
}
75+
now = System.currentTimeMillis();
76+
}
77+
if (indexes.size() == 0) {
78+
throw new ConnectException("Indexes could not be updated quickly enough.");
79+
}
80+
return indexes;
81+
}
82+
83+
public void shutdown() {
84+
shutdownLatch.countDown();
85+
}
86+
87+
private synchronized boolean updateIndexes() {
88+
final List<String> indexes;
89+
try {
90+
indexes = elasticRepository.catIndices(this.prefix);
91+
log.debug("Got the following topics: " + Arrays.toString(indexes.toArray()));
92+
} catch (RuntimeException e) {
93+
log.error("Error while trying to get updated topics list, ignoring and waiting for next table poll interval", e);
94+
return false;
95+
}
96+
97+
if (!indexes.equals(this.indexes)) {
98+
log.debug("After filtering we got topics: " + Arrays.toString(indexes.toArray()));
99+
List<String> previousIndexes = this.indexes;
100+
this.indexes = indexes;
101+
notifyAll();
102+
// Only return true if the table list wasn't previously null, i.e. if this was not the
103+
// first table lookup
104+
return previousIndexes.size() > 0;
105+
}
106+
return false;
107+
}
108+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.github.dariobalinzo;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.mockito.Mockito.atLeast;
5+
6+
import java.io.IOException;
7+
8+
import com.github.dariobalinzo.elastic.ElasticIndexMonitorThread;
9+
10+
import org.apache.kafka.connect.connector.ConnectorContext;
11+
import org.junit.Before;
12+
import org.junit.Test;
13+
import org.mockito.Mock;
14+
import org.mockito.Mockito;
15+
import org.mockito.MockitoAnnotations;
16+
17+
18+
public class ElasticIndexMonitorThreadTest extends TestContainersContext {
19+
20+
@Mock
21+
private ConnectorContext context;
22+
23+
@Before
24+
public void init() {
25+
MockitoAnnotations.initMocks(this);
26+
}
27+
28+
@Test
29+
public void shouldRefreshIndexesList() throws InterruptedException, IOException {
30+
//given
31+
long pollInterval = 1000L;
32+
deleteTestIndex();
33+
34+
insertMockData(10, TEST_INDEX);
35+
refreshIndex();
36+
37+
ElasticIndexMonitorThread indexMonitorThread = new ElasticIndexMonitorThread(context, pollInterval, repository, TEST_INDEX);
38+
indexMonitorThread.start();
39+
40+
assertEquals(1, indexMonitorThread.indexes().size());
41+
42+
//when another index is created in Elastic
43+
insertMockData(10, TEST_INDEX + '2');
44+
refreshIndex();
45+
46+
long waitRefresh = pollInterval + (long)(Math.random() * 1000);
47+
Thread.sleep(waitRefresh);
48+
49+
//then
50+
Mockito.verify(context, atLeast(1)).requestTaskReconfiguration();
51+
assertEquals(2, indexMonitorThread.indexes().size());
52+
53+
indexMonitorThread.shutdown();
54+
}
55+
}

src/test/java/com/github/dariobalinzo/TestContainersContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class TestContainersContext {
5757
public static void setupElastic() {
5858
// Create the elasticsearch container.
5959
container = new ElasticsearchContainer(ELASTICSEARCH_IMAGE);
60+
container.addEnv("ES_JAVA_OPTS", "-Xms512m -Xmx512m");
6061
container.start();
6162

6263
HttpHost httpHost = HttpHost.create(container.getHttpHostAddress());

src/test/java/com/github/dariobalinzo/ElasticSourceConnectorTest.java renamed to src/test/java/com/github/dariobalinzo/elastic/ElasticSourceConnectorTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.github.dariobalinzo;
17+
package com.github.dariobalinzo.elastic;
1818

1919

20+
import com.github.dariobalinzo.ElasticSourceConnector;
21+
import com.github.dariobalinzo.TestContainersContext;
2022
import com.github.dariobalinzo.task.ElasticSourceTaskConfig;
23+
2124
import org.junit.Test;
2225

2326
import java.io.IOException;
@@ -39,6 +42,13 @@ public void shouldGetAListOfTasks() throws IOException {
3942
insertMockData(3, TEST_INDEX + 3);
4043
insertMockData(4, TEST_INDEX + 4);
4144

45+
try {
46+
Thread.sleep(1000);
47+
} catch (InterruptedException e) {
48+
// TODO Auto-generated catch block
49+
e.printStackTrace();
50+
}
51+
4252
//when
4353
int maxTasks = 3;
4454
List<Map<String, String>> taskList = connector.taskConfigs(maxTasks);
@@ -66,5 +76,4 @@ public void shouldGetTaskFromFixedList() {
6676
assertEquals(maxTasks, taskList.size());
6777
connector.stop();
6878
}
69-
7079
}

0 commit comments

Comments
 (0)