Skip to content

Commit 29c4097

Browse files
committed
Backported 778079e (#60) from master
1 parent 01fa8e7 commit 29c4097

File tree

3 files changed

+61
-3
lines changed

3 files changed

+61
-3
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,8 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac
760760

761761
private void updateClientBinlogFilenameAndPosition(Event event) {
762762
EventHeader eventHeader = event.getHeader();
763-
if (eventHeader.getEventType() == EventType.ROTATE) {
763+
EventType eventType = eventHeader.getEventType();
764+
if (eventType == EventType.ROTATE) {
764765
EventData eventData = event.getData();
765766
RotateEventData rotateEventData;
766767
if (eventData instanceof EventDeserializer.EventDataWrapper) {
@@ -771,7 +772,9 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
771772
binlogFilename = rotateEventData.getBinlogFilename();
772773
binlogPosition = rotateEventData.getBinlogPosition();
773774
} else
774-
if (eventHeader instanceof EventHeaderV4) {
775+
// do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of
776+
// client) table mapping cache could be reconstructed before hitting row mutation event
777+
if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) {
775778
EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
776779
long nextBinlogPosition = trackableEventHeader.getNextPosition();
777780
if (nextBinlogPosition > 0) {

src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public enum EventType {
172172
*/
173173
EXT_WRITE_ROWS,
174174
/**
175-
* Describes deleted rows (within a single table).
175+
* Describes updated rows (within a single table).
176176
* Used in case of RBR (5.1.18+).
177177
*/
178178
EXT_UPDATE_ROWS,
@@ -188,6 +188,12 @@ public enum EventType {
188188
ANONYMOUS_GTID,
189189
PREVIOUS_GTIDS;
190190

191+
public static boolean isRowMutation(EventType eventType) {
192+
return EventType.isWrite(eventType) ||
193+
EventType.isUpdate(eventType) ||
194+
EventType.isDelete(eventType);
195+
}
196+
191197
public static boolean isWrite(EventType eventType) {
192198
return eventType == PRE_GA_WRITE_ROWS ||
193199
eventType == WRITE_ROWS ||

src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.sql.SQLException;
5353
import java.sql.SQLSyntaxErrorException;
5454
import java.sql.Statement;
55+
import java.util.AbstractMap;
5556
import java.util.BitSet;
5657
import java.util.Calendar;
5758
import java.util.List;
@@ -61,6 +62,7 @@
6162
import java.util.concurrent.CountDownLatch;
6263
import java.util.concurrent.TimeUnit;
6364
import java.util.concurrent.TimeoutException;
65+
import java.util.concurrent.atomic.AtomicReference;
6466
import java.util.logging.Level;
6567
import java.util.logging.Logger;
6668

@@ -333,6 +335,53 @@ public void execute(Statement statement) throws SQLException {
333335
return result;
334336
}
335337

338+
@Test
339+
public void testBinlogPositionPointsToTableMapEventUntilTheEndOfLogicalGroup() throws Exception {
340+
final AtomicReference<Map.Entry<String, Long>> markHolder = new AtomicReference<Map.Entry<String, Long>>();
341+
BinaryLogClient.EventListener markEventListener = new BinaryLogClient.EventListener() {
342+
343+
private int counter;
344+
345+
@Override
346+
public void onEvent(Event event) {
347+
if (EventType.isRowMutation(event.getHeader().getEventType()) && counter++ == 1) {
348+
// coordinates of second insert
349+
markHolder.set(new AbstractMap.SimpleEntry<String, Long>(client.getBinlogFilename(),
350+
client.getBinlogPosition()));
351+
}
352+
}
353+
};
354+
client.registerEventListener(markEventListener);
355+
try {
356+
master.execute(new Callback<Statement>() {
357+
@Override
358+
public void execute(Statement statement) throws SQLException {
359+
statement.execute("insert into bikini_bottom values('SpongeBob')");
360+
statement.execute("insert into bikini_bottom values('Patrick')");
361+
statement.execute("insert into bikini_bottom values('Squidward')");
362+
}
363+
});
364+
eventListener.waitFor(WriteRowsEventData.class, 3, DEFAULT_TIMEOUT);
365+
final BinaryLogClient anotherClient = new BinaryLogClient(slave.hostname, slave.port,
366+
slave.username, slave.password);
367+
anotherClient.registerLifecycleListener(new TraceLifecycleListener());
368+
CountDownEventListener anotherClientEventListener = new CountDownEventListener();
369+
anotherClient.registerEventListener(anotherClientEventListener);
370+
Map.Entry<String, Long> mark = markHolder.get();
371+
anotherClient.setBinlogFilename(mark.getKey());
372+
anotherClient.setBinlogPosition(mark.getValue());
373+
anotherClient.connect(DEFAULT_TIMEOUT);
374+
try {
375+
// expecting Patrick & Squidward
376+
anotherClientEventListener.waitFor(WriteRowsEventData.class, 2, DEFAULT_TIMEOUT);
377+
} finally {
378+
anotherClient.disconnect();
379+
}
380+
} finally {
381+
client.unregisterEventListener(markEventListener);
382+
}
383+
}
384+
336385
@Test(enabled = false)
337386
public void testUnsupportedColumnTypeDoesNotCauseClientToFail() throws Exception {
338387
BinaryLogClient.LifecycleListener lifecycleListenerMock = mock(BinaryLogClient.LifecycleListener.class);

0 commit comments

Comments
 (0)