Skip to content
This repository was archived by the owner on Aug 13, 2020. It is now read-only.

Commit 1ed3ea0

Browse files
authored
Merge pull request #625 from CJSCommonPlatform/release-4.1.x-fix-for-stream-status-update
fix for source update
2 parents 2f2a884 + 0c9e2b1 commit 1ed3ea0

File tree

3 files changed

+68
-2
lines changed

3 files changed

+68
-2
lines changed

event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/repository/streamstatus/StreamStatusJdbcRepository.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ public class StreamStatusJdbcRepository {
3131
/**
3232
* Statements
3333
*/
34-
private static final String SELECT_BY_STREAM_ID_AND_SOURCE = "SELECT stream_id, version, source FROM stream_status WHERE stream_id=? AND source=? FOR UPDATE";
34+
private static final String SELECT_BY_STREAM_ID_AND_SOURCE = "SELECT stream_id, version, source FROM stream_status WHERE stream_id=? AND source in (?,'unknown') FOR UPDATE";
35+
private static final String COUNT_BY_STREAM_ID = "SELECT count(*) FROM stream_status WHERE stream_id=?";
3536
private static final String INSERT = "INSERT INTO stream_status (version, stream_id, source) VALUES (?, ?, ?)";
3637
private static final String INSERT_ON_CONFLICT_DO_NOTHING = new StringBuilder().append(INSERT).append(" ON CONFLICT DO NOTHING").toString();
3738
private static final String UPDATE = "UPDATE stream_status SET version=?,source=? WHERE stream_id=? and source in (?,'unknown')";
38-
39+
private static final String UPDATE_UNKNOWN_SOURCE = "UPDATE stream_status SET source=? WHERE stream_id=? and source = 'unknown'";
3940

4041
@Inject
4142
JdbcRepositoryHelper jdbcRepositoryHelper;
@@ -126,6 +127,25 @@ public Optional<StreamStatus> findByStreamIdAndSource(final UUID streamId, final
126127
}
127128
}
128129

130+
/**
131+
* Returns a count of records for a given stream streamId.
132+
*
133+
* @param streamId streamId of the stream.
134+
* @return a int.
135+
*/
136+
public int countByStreamId(final UUID streamId) {
137+
try (final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, COUNT_BY_STREAM_ID)) {
138+
ps.setObject(1, streamId);
139+
final ResultSet resultSet = ps.executeQuery();
140+
while (resultSet.next()) {
141+
return resultSet.getInt(1);
142+
}
143+
} catch (SQLException e) {
144+
throw new JdbcRepositoryException(format("Exception while looking up status of the stream: %s", streamId), e);
145+
}
146+
return 0;
147+
}
148+
129149
private Optional<StreamStatus> streamStatusFrom(final PreparedStatementWrapper ps) throws SQLException {
130150
final ResultSet resultSet = ps.executeQuery();
131151
return resultSet.next()
@@ -137,4 +157,14 @@ private Optional<StreamStatus> streamStatusFrom(final PreparedStatementWrapper p
137157
protected StreamStatus entityFrom(final ResultSet rs) throws SQLException {
138158
return new StreamStatus((UUID) rs.getObject(PRIMARY_KEY_ID), rs.getLong(COL_VERSION), rs.getString(SOURCE));
139159
}
160+
161+
public void updateSource(final UUID streamId, final String source) {
162+
try (final PreparedStatementWrapper ps = jdbcRepositoryHelper.preparedStatementWrapperOf(dataSource, UPDATE_UNKNOWN_SOURCE)) {
163+
ps.setString(1, source);
164+
ps.setObject(2, streamId);
165+
ps.executeUpdate();
166+
} catch (SQLException e) {
167+
throw new JdbcRepositoryException(format("Exception while updating unknown source of the stream: %s", streamId), e);
168+
}
169+
}
140170
}

event-buffer/event-buffer-core/src/main/java/uk/gov/justice/services/event/buffer/core/service/PostgreSQLBasedBufferInitialisationStrategy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public PostgreSQLBasedBufferInitialisationStrategy(final StreamStatusJdbcReposit
1616

1717
@Override
1818
public long initialiseBuffer(final UUID streamId, final String source) {
19+
streamStatusRepository.updateSource(streamId,source);
1920
streamStatusRepository.insertOrDoNothing(new StreamStatus(streamId, INITIAL_VERSION, source));
2021
return streamStatusRepository.findByStreamIdAndSource(streamId, source)
2122
.orElseThrow(() -> new IllegalStateException("stream status cannot be empty"))

event-buffer/event-buffer-core/src/test/java/uk/gov/justice/services/event/buffer/core/repository/streamstatus/StreamStatusJdbcRepositoryIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class StreamStatusJdbcRepositoryIT {
2525

2626
private static final String LIQUIBASE_STREAM_STATUS_CHANGELOG_XML = "liquibase/event-buffer-changelog.xml";
2727

28+
private static final long INITIAL_VERSION = 0L;
2829

2930
private StreamStatusJdbcRepository jdbcRepository;
3031

@@ -53,7 +54,25 @@ public void initDatabase() throws Exception {
5354
}
5455
}
5556

57+
@Test
58+
public void shouldNotCreateNewStreamStatusRecordWhenANewSourceIsIntroduced() throws Exception {
59+
final String source = "unknown";
60+
final UUID streamId = randomUUID();
61+
62+
initialiseBuffer(streamId, "unknown");
63+
final StreamStatus streamStatus = new StreamStatus(streamId, 2L, source);
64+
65+
jdbcRepository.update(streamStatus);
66+
67+
initialiseBuffer(streamId, "sjp");
68+
final int count = jdbcRepository.countByStreamId(streamId);
69+
assertThat(count, is(1));
5670

71+
final Optional<StreamStatus> result = jdbcRepository.findByStreamIdAndSource(streamId, "sjp");
72+
73+
assertThat(result.get().getSource(), is("sjp"));
74+
assertThat(result.get().getVersion(), is(2L));
75+
}
5776

5877
@Test
5978
public void shouldUpdateSourceWhenUnknown() throws Exception {
@@ -187,4 +206,20 @@ private StreamStatus streamStatusOf(final UUID id, final Long version, final Str
187206
return new StreamStatus(id, version, source);
188207
}
189208

209+
private long initialiseBuffer(final UUID streamId, final String source) {
210+
jdbcRepository.updateSource(streamId,source);
211+
final Optional<StreamStatus> currentStatus = jdbcRepository.findByStreamIdAndSource(streamId, source);
212+
213+
if (!currentStatus.isPresent()) {
214+
//this is to address race condition
215+
//in case of primary key violation the exception gets thrown, event goes back into topic and the transaction gets retried
216+
jdbcRepository
217+
.insert(new StreamStatus(streamId, INITIAL_VERSION, source));
218+
return INITIAL_VERSION;
219+
220+
} else {
221+
return currentStatus.get().getVersion();
222+
}
223+
}
224+
190225
}

0 commit comments

Comments
 (0)