Skip to content

Commit bdce9b9

Browse files
Joel-hansonA S Adil Mohammad
authored andcommitted
feat: Config changes and new unit tests
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 41dfb5d commit bdce9b9

File tree

3 files changed

+128
-10
lines changed

3 files changed

+128
-10
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
308308
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
309309
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
310310
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
311+
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
311312
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
312313
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
313314

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,7 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
13001300
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
13011301
}
13021302

1303+
<<<<<<< HEAD
13031304
<<<<<<< HEAD
13041305
@Test
13051306
public void verifyJmsMessageWithNullHeaders() throws Exception {
@@ -1350,6 +1351,9 @@ public void verifyJmsMessageNoHeaderCopied_WhenCopyDisabledHavingNullHeader() th
13501351
assertThat(kafkaMessage.headers()).isEmpty();
13511352
=======
13521353
public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
1354+
=======
1355+
public void testMaxPollTimeTerminates() throws Exception {
1356+
>>>>>>> bc3d17e (feat: Config changes and new unit tests)
13531357
connectTask = getSourceTaskWithEmptyKafkaOffset();
13541358

13551359
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
@@ -1358,7 +1362,7 @@ public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
13581362
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
13591363
connectorConfigProps.put("mq.message.receive.timeout", "150");
13601364
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
1361-
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms
1365+
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200");
13621366
connectorConfigProps.put("mq.batch.size", "5000");
13631367

13641368
final JMSWorker shared = new JMSWorker();
@@ -1369,26 +1373,136 @@ public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
13691373

13701374
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
13711375

1372-
List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1376+
final List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
13731377
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1374-
messages = createAListOfMessages(getJmsContext(), 10, "msg ");
13751378
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
1376-
messages = createAListOfMessages(getJmsContext(), 10, "msg ");
13771379
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
13781380

1381+
final List<SourceRecord> kafkaMessages = connectTask.poll();
1382+
1383+
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
1384+
assertThat(stateMsgs1.size()).isEqualTo(1);
1385+
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
1386+
assertThat(sourceMsgs.size()).isEqualTo(0);
1387+
assertEquals(30, kafkaMessages.size());
1388+
}
1389+
1390+
@Test
1391+
public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
1392+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1393+
1394+
final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
1395+
connectorConfigProps.put("mq.message.body.jms", "true");
1396+
connectorConfigProps.put("mq.record.builder",
1397+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1398+
connectorConfigProps.put("mq.message.receive.timeout", "100");
1399+
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
1400+
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms
1401+
connectorConfigProps.put("mq.batch.size", "5000");
1402+
1403+
final JMSWorker shared = new JMSWorker();
1404+
shared.configure(getPropertiesConfig(connectorConfigProps));
1405+
final JMSWorker dedicated = new JMSWorker();
1406+
dedicated.configure(getPropertiesConfig(connectorConfigProps));
1407+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1408+
1409+
connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
1410+
1411+
final List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
1412+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
13791413
final long start = System.nanoTime();
13801414
final List<SourceRecord> kafkaMessages = connectTask.poll();
13811415
final long durationMs = (System.nanoTime() - start) / 1_000_000;
13821416

1383-
System.out.println(durationMs);
13841417
// Poll should end close to 200ms
1385-
assertThat(durationMs >= 180 && durationMs <= 500).isTrue();
1418+
assertThat(durationMs <= 210).isTrue();
13861419

13871420
final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
13881421
assertThat(stateMsgs1.size()).isEqualTo(1);
13891422
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
13901423
assertThat(sourceMsgs.size()).isEqualTo(0);
1424+
<<<<<<< HEAD
13911425
assertEquals(30, kafkaMessages.size());
13921426
>>>>>>> def7c02 (feat: added new config mq.receive.max.poll.time.ms)
1427+
=======
1428+
assertEquals(10, kafkaMessages.size());
1429+
}
1430+
1431+
@Test
1432+
public void testPollEndsWhenBatchSizeReached() throws Exception {
1433+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1434+
1435+
final Map<String, String> config = createExactlyOnceConnectorProperties();
1436+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1437+
config.put("mq.message.receive.timeout", "100");
1438+
config.put("mq.receive.subsequent.timeout.ms", "10");
1439+
config.put("mq.receive.max.poll.time.ms", "1000");
1440+
config.put("mq.batch.size", "10");
1441+
1442+
final JMSWorker shared = new JMSWorker();
1443+
shared.configure(getPropertiesConfig(config));
1444+
final JMSWorker dedicated = new JMSWorker();
1445+
dedicated.configure(getPropertiesConfig(config));
1446+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1447+
connectTask.start(config, shared, dedicated, sequenceStateClient);
1448+
1449+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 12, "msg "));
1450+
1451+
final long start = System.nanoTime();
1452+
connectTask.poll();
1453+
final long durationMs = (System.nanoTime() - start) / 1_000_000;
1454+
1455+
assertThat(durationMs < 1000).isTrue();
1456+
}
1457+
1458+
@Test
1459+
public void testPollWithMaxPollTimeZeroBehavesAsDefault() throws Exception {
1460+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1461+
final Map<String, String> config = createExactlyOnceConnectorProperties();
1462+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1463+
config.put("mq.message.receive.timeout", "400");
1464+
config.put("mq.receive.max.poll.time.ms", "0");
1465+
config.put("mq.batch.size", "100");
1466+
1467+
final JMSWorker shared = new JMSWorker();
1468+
shared.configure(getPropertiesConfig(config));
1469+
final JMSWorker dedicated = new JMSWorker();
1470+
dedicated.configure(getPropertiesConfig(config));
1471+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1472+
connectTask.start(config, shared, dedicated, sequenceStateClient);
1473+
1474+
// putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 3, "msg "));
1475+
1476+
final long start = System.nanoTime();
1477+
final List<SourceRecord> records = connectTask.poll();
1478+
final long durationMs = (System.nanoTime() - start) / 1_000_000;
1479+
1480+
assertThat(durationMs >= 400 && durationMs <= 450).isTrue();
1481+
assertEquals(0, records.size());
1482+
}
1483+
1484+
@Test
1485+
public void testPollWithShortMaxPollTime() throws Exception {
1486+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1487+
final Map<String, String> config = createExactlyOnceConnectorProperties();
1488+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1489+
config.put("mq.receive.max.poll.time.ms", "50");
1490+
config.put("mq.message.receive.timeout", "1");
1491+
config.put("mq.receive.subsequent.timeout.ms", "0");
1492+
config.put("mq.batch.size", "5000");
1493+
1494+
final JMSWorker shared = new JMSWorker();
1495+
shared.configure(getPropertiesConfig(config));
1496+
final JMSWorker dedicated = new JMSWorker();
1497+
dedicated.configure(getPropertiesConfig(config));
1498+
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
1499+
connectTask.start(config, shared, dedicated, sequenceStateClient);
1500+
1501+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 100, "msg "));
1502+
1503+
final List<SourceRecord> records = connectTask.poll();
1504+
1505+
assertThat(records.size() < 100);
1506+
>>>>>>> bc3d17e (feat: Config changes and new unit tests)
13931507
}
13941508
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,11 @@ public class MQSourceConnector extends SourceConnector {
188188
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
189189

190190
public static final String CONFIG_MAX_POLL_TIME = "mq.receive.max.poll.time.ms";
191-
public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. "
192-
+ "If not set or set to 0, polling continues until batch size or receive timeout conditions are met.";
191+
public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to poll for messages during a single Kafka Connect poll cycle. "
192+
+ "Acts as a hard upper bound on how long the task will try to accumulate a batch. "
193+
+ "If set to 0 or not defined, polling continues until either a message receive returns null or the batch size is met. "
194+
+ "Note: It is recommended to keep this value less than or equal to both 'mq.message.receive.timeout' "
195+
+ "and 'mq.receive.subsequent.timeout.ms' to avoid unexpected delays due to long blocking receive calls.";
193196
public static final String CONFIG_DISPLAY_MAX_POLL_TIME = "Max poll time (ms)";
194197
public static final long CONFIG_MAX_POLL_TIME_DEFAULT = 0L;
195198

@@ -615,7 +618,7 @@ null, new ReadableFile(),
615618
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
616619
CONFIGDEF.define(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT,
617620
ConfigDef.Type.LONG,
618-
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT,
621+
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT),
619622
ConfigDef.Importance.LOW,
620623
CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT,
621624
CONFIG_GROUP_MQ,
@@ -656,7 +659,7 @@ null, new ReadableFile(),
656659
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
657660
CONFIGDEF.define(CONFIG_MAX_POLL_TIME,
658661
ConfigDef.Type.LONG,
659-
CONFIG_MAX_POLL_TIME_DEFAULT,
662+
CONFIG_MAX_POLL_TIME_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_POLL_TIME_DEFAULT),
660663
ConfigDef.Importance.LOW,
661664
CONFIG_DOCUMENTATION_MAX_POLL_TIME,
662665
CONFIG_GROUP_MQ,

0 commit comments

Comments
 (0)