|
15 | 15 | */ |
16 | 16 | package com.ibm.eventstreams.connect.mqsink; |
17 | 17 |
|
| 18 | +import java.util.ArrayList; |
| 19 | +import java.util.List; |
18 | 20 | import java.util.concurrent.TimeoutException; |
19 | 21 |
|
| 22 | +import javax.jms.Connection; |
| 23 | +import javax.jms.Destination; |
20 | 24 | import javax.jms.JMSContext; |
| 25 | +import javax.jms.JMSException; |
| 26 | +import javax.jms.Message; |
| 27 | +import javax.jms.MessageConsumer; |
| 28 | +import javax.jms.Session; |
21 | 29 |
|
22 | 30 | import org.junit.ClassRule; |
23 | 31 | import org.testcontainers.containers.GenericContainer; |
24 | 32 | import org.testcontainers.containers.output.WaitingConsumer; |
25 | 33 |
|
26 | 34 | import com.ibm.mq.jms.MQConnectionFactory; |
| 35 | +import com.ibm.msg.client.jms.JmsConnectionFactory; |
| 36 | +import com.ibm.msg.client.jms.JmsFactoryFactory; |
27 | 37 | import com.ibm.msg.client.wmq.WMQConstants; |
28 | 38 |
|
29 | 39 |
|
30 | 40 | /** |
31 | 41 | * Helper class for integration tests that have a dependency on JMSContext. |
32 | | - * |
| 42 | + * |
33 | 43 | * It starts a queue manager in a test container, and uses it to create |
34 | 44 | * a JMSContext instance, that can be used in tests. |
35 | 45 | */ |
36 | 46 | public abstract class AbstractJMSContextIT { |
37 | 47 |
|
38 | 48 | private static final String QMGR_NAME = "MYQMGR"; |
39 | | - |
| 49 | + private static final String CHANNEL_NAME = "DEV.APP.SVRCONN"; |
| 50 | + |
40 | 51 | @ClassRule |
41 | 52 | public static GenericContainer<?> MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") |
42 | 53 | .withEnv("LICENSE", "accept") |
43 | 54 | .withEnv("MQ_QMGR_NAME", QMGR_NAME) |
44 | 55 | .withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false") |
45 | 56 | .withExposedPorts(1414); |
46 | | - |
| 57 | + |
47 | 58 | private JMSContext jmsContext; |
48 | | - |
49 | | - |
| 59 | + |
| 60 | + |
50 | 61 | /** |
51 | | - * Returns a JMS context pointing at a developer queue manager running in a |
52 | | - * test container. |
| 62 | + * Returns a JMS context pointing at a developer queue manager running in a |
| 63 | + * test container. |
53 | 64 | */ |
54 | 65 | public JMSContext getJmsContext() throws Exception { |
55 | 66 | if (jmsContext == null) { |
56 | 67 | waitForQueueManagerStartup(); |
57 | | - |
| 68 | + |
58 | 69 | MQConnectionFactory mqcf = new MQConnectionFactory(); |
59 | 70 | mqcf.setTransportType(WMQConstants.WMQ_CM_CLIENT); |
60 | | - mqcf.setChannel("DEV.APP.SVRCONN"); |
| 71 | + mqcf.setChannel(CHANNEL_NAME); |
61 | 72 | mqcf.setQueueManager(QMGR_NAME); |
62 | | - mqcf.setConnectionNameList("localhost(" + getMQPort().toString() + ")"); |
63 | | - |
| 73 | + mqcf.setConnectionNameList(getConnectionName()); |
| 74 | + |
64 | 75 | jmsContext = mqcf.createContext(); |
65 | 76 | } |
66 | | - |
| 77 | + |
67 | 78 | return jmsContext; |
68 | 79 | } |
69 | | - |
70 | | - |
| 80 | + |
| 81 | + |
71 | 82 | /** |
72 | 83 | * Gets the host port that has been mapped to the default MQ 1414 port in the test container. |
73 | 84 | */ |
74 | | - private Integer getMQPort() { |
75 | | - return MQ_CONTAINER.getMappedPort(1414); |
| 85 | + public Integer getMQPort() { |
| 86 | + return MQ_CONTAINER.getMappedPort(1414); |
| 87 | + } |
| 88 | + |
| 89 | + public String getQmgrName() { |
| 90 | + return QMGR_NAME; |
| 91 | + } |
| 92 | + public String getChannelName() { |
| 93 | + return CHANNEL_NAME; |
| 94 | + } |
| 95 | + public String getConnectionName() { |
| 96 | + return "localhost(" + getMQPort().toString() + ")"; |
76 | 97 | } |
77 | 98 |
|
78 | | - /** |
| 99 | + |
| 100 | + /** |
79 | 101 | * Waits until we see a log line in the queue manager test container that indicates |
80 | 102 | * the queue manager is ready. |
81 | 103 | */ |
82 | 104 | private void waitForQueueManagerStartup() throws TimeoutException { |
83 | 105 | WaitingConsumer logConsumer = new WaitingConsumer(); |
84 | 106 | MQ_CONTAINER.followOutput(logConsumer); |
85 | 107 | logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I")); |
86 | | - } |
| 108 | + } |
| 109 | + |
| 110 | + |
| 111 | + /** |
| 112 | + * Retrieves all messages from the specified MQ queue (destructively). Used in |
| 113 | + * tests to verify that the expected messages were put to the test queue. |
| 114 | + */ |
| 115 | + public List<Message> getAllMessagesFromQueue(String queueName) throws JMSException { |
| 116 | + Connection connection = null; |
| 117 | + Session session = null; |
| 118 | + Destination destination = null; |
| 119 | + MessageConsumer consumer = null; |
| 120 | + |
| 121 | + JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); |
| 122 | + |
| 123 | + JmsConnectionFactory cf = ff.createConnectionFactory(); |
| 124 | + cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost"); |
| 125 | + cf.setIntProperty(WMQConstants.WMQ_PORT, getMQPort()); |
| 126 | + cf.setStringProperty(WMQConstants.WMQ_CHANNEL, getChannelName()); |
| 127 | + cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); |
| 128 | + cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, getQmgrName()); |
| 129 | + cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, false); |
| 130 | + |
| 131 | + connection = cf.createConnection(); |
| 132 | + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| 133 | + |
| 134 | + destination = session.createQueue(queueName); |
| 135 | + consumer = session.createConsumer(destination); |
| 136 | + |
| 137 | + connection.start(); |
| 138 | + |
| 139 | + List<Message> messages = new ArrayList<>(); |
| 140 | + Message message; |
| 141 | + do { |
| 142 | + message = consumer.receiveNoWait(); |
| 143 | + if (message != null) { |
| 144 | + messages.add(message); |
| 145 | + } |
| 146 | + } |
| 147 | + while (message != null); |
| 148 | + |
| 149 | + connection.close(); |
| 150 | + |
| 151 | + return messages; |
| 152 | + } |
87 | 153 | } |
0 commit comments