Skip to content

Commit 90aec41

Browse files
committed
Update plain jdbc example
1 parent 284f8e4 commit 90aec41

File tree

11 files changed

+357
-283
lines changed

11 files changed

+357
-283
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.roach.data.jdbc.plain;
2+
3+
import java.math.BigDecimal;
4+
5+
public class Account {
6+
String name;
7+
8+
BigDecimal amount;
9+
10+
Account(String name, BigDecimal amount) {
11+
this.name = name;
12+
this.amount = amount;
13+
}
14+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.roach.data.jdbc.plain;
2+
3+
import java.sql.Connection;
4+
import java.sql.SQLException;
5+
6+
interface ConnectionCallback<T> {
7+
T doInConnection(Connection conn) throws SQLException;
8+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.roach.data.jdbc.plain;
2+
3+
import java.lang.reflect.UndeclaredThrowableException;
4+
import java.sql.Connection;
5+
import java.sql.SQLException;
6+
7+
import javax.sql.DataSource;
8+
9+
public abstract class ConnectionTemplate {
10+
public static <T> T execute(DataSource ds,
11+
ConnectionCallback<T> action) {
12+
try (Connection conn = ds.getConnection()) {
13+
T result;
14+
try {
15+
result = action.doInConnection(conn);
16+
} catch (RuntimeException | Error ex) {
17+
throw ex;
18+
} catch (Throwable ex) {
19+
throw new UndeclaredThrowableException(ex,
20+
"TransactionCallback threw undeclared checked exception");
21+
}
22+
return result;
23+
} catch (SQLException e) {
24+
throw new DataAccessException(e);
25+
}
26+
}
27+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.roach.data.jdbc.plain;
2+
3+
public class DataAccessException extends RuntimeException {
4+
public DataAccessException(String message) {
5+
super(message);
6+
}
7+
8+
public DataAccessException(Throwable cause) {
9+
super(cause);
10+
}
11+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package io.roach.data.jdbc.plain;
2+
3+
import java.math.BigDecimal;
4+
import java.sql.Connection;
5+
import java.sql.PreparedStatement;
6+
import java.sql.ResultSet;
7+
import java.sql.SQLException;
8+
import java.util.ArrayDeque;
9+
import java.util.ArrayList;
10+
import java.util.Arrays;
11+
import java.util.Deque;
12+
import java.util.List;
13+
import java.util.concurrent.ExecutionException;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.Future;
17+
import java.util.concurrent.ThreadLocalRandom;
18+
import java.util.stream.IntStream;
19+
20+
import javax.sql.DataSource;
21+
22+
import com.zaxxer.hikari.HikariDataSource;
23+
24+
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
25+
26+
public class PlainJdbcApplication {
27+
private static class BusinessException extends RuntimeException {
28+
public BusinessException(String message) {
29+
super(message);
30+
}
31+
}
32+
33+
private static TransactionCallback<BigDecimal> transfer(List<Account> legs) {
34+
return conn -> {
35+
BigDecimal total = BigDecimal.ZERO;
36+
BigDecimal checksum = BigDecimal.ZERO;
37+
38+
for (Account leg : legs) {
39+
BigDecimal balance = readBalance(conn, leg.name);
40+
updateBalance(conn, leg.name, balance.add(leg.amount));
41+
checksum = checksum.add(leg.amount);
42+
total = total.add(leg.amount.abs());
43+
}
44+
45+
if (checksum.compareTo(BigDecimal.ZERO) != 0) {
46+
throw new BusinessException(
47+
"Sum of account legs must equal 0 (got " + checksum.toPlainString() + ")"
48+
);
49+
}
50+
51+
return total;
52+
};
53+
}
54+
55+
private static List<String> readAccountNames(Connection conn) throws SQLException {
56+
List<String> names = new ArrayList<>();
57+
try (PreparedStatement ps = conn.prepareStatement("SELECT name FROM account")) {
58+
try (ResultSet res = ps.executeQuery()) {
59+
while (res.next()) {
60+
names.add(res.getString(1));
61+
}
62+
}
63+
}
64+
return names;
65+
}
66+
67+
private static BigDecimal readBalance(Connection conn, String name) throws SQLException {
68+
try (PreparedStatement ps = conn.prepareStatement("SELECT balance FROM account WHERE name = ?")) {
69+
ps.setString(1, name);
70+
71+
try (ResultSet res = ps.executeQuery()) {
72+
if (!res.next()) {
73+
throw new BusinessException("Account not found: " + name);
74+
}
75+
return res.getBigDecimal("balance");
76+
}
77+
}
78+
}
79+
80+
private static void updateBalance(Connection conn, String name, BigDecimal balance) throws SQLException {
81+
try (PreparedStatement ps = conn
82+
.prepareStatement(
83+
"UPDATE account SET balance = ?, updated=clock_timestamp() where name = ?")) {
84+
ps.setBigDecimal(1, balance);
85+
ps.setString(2, name);
86+
if (ps.executeUpdate() != 1) {
87+
throw new DataAccessException("Rows affected != 1 for " + name);
88+
}
89+
}
90+
}
91+
92+
private static BigDecimal readTotalBalance(Connection conn) throws SQLException {
93+
try (PreparedStatement ps = conn.prepareStatement(
94+
"select sum(balance) balance from account")) {
95+
try (ResultSet res = ps.executeQuery()) {
96+
if (!res.next()) {
97+
throw new SQLException("Empty result");
98+
}
99+
return res.getBigDecimal("balance");
100+
}
101+
}
102+
}
103+
104+
public static void main(String[] args) throws Exception {
105+
int workers = Runtime.getRuntime().availableProcessors();
106+
107+
final HikariDataSource hikariDS = new HikariDataSource();
108+
hikariDS.setJdbcUrl("jdbc:postgresql://localhost:26257/roach_data?sslmode=disable");
109+
hikariDS.setUsername("root");
110+
hikariDS.setAutoCommit(true);
111+
hikariDS.setMaximumPoolSize(workers);
112+
hikariDS.setMinimumIdle(workers);
113+
114+
boolean enableProxy = Arrays.asList(args).contains("--enable-proxy");
115+
116+
DataSource ds = enableProxy ?
117+
ProxyDataSourceBuilder
118+
.create(hikariDS)
119+
.asJson()
120+
.logQueryBySlf4j()
121+
.build() : hikariDS;
122+
123+
SchemaSupport.setupSchema(ds);
124+
125+
ConnectionTemplate.execute(ds, PlainJdbcApplication::readAccountNames).forEach(name -> {
126+
System.out.printf("Balance before for %s: %s\n", name,
127+
ConnectionTemplate.execute(ds, c -> readBalance(c, name)));
128+
});
129+
130+
final BigDecimal initialBalance = ConnectionTemplate.execute(ds, PlainJdbcApplication::readTotalBalance);
131+
System.out.printf("Total balance before: %s\n", initialBalance);
132+
133+
// Run concurrently for more exiting effects
134+
final ExecutorService executorService = Executors.newFixedThreadPool(workers);
135+
final Deque<Future<BigDecimal>> futures = new ArrayDeque<>();
136+
137+
final int iterations = 200;
138+
final ThreadLocalRandom random = ThreadLocalRandom.current();
139+
140+
IntStream.rangeClosed(1, iterations).forEach(
141+
value -> futures.add(executorService.submit(() -> {
142+
List<Account> legs = new ArrayList<>();
143+
144+
IntStream.rangeClosed(1, 4).forEach(leg -> {
145+
String from = "customer:" + random.nextInt(1, 100);
146+
String to = "customer:" + random.nextInt(1, 100);
147+
if (!from.equals(to)) {
148+
BigDecimal amt = new BigDecimal("0.15");
149+
legs.add(new Account(from, amt));
150+
legs.add(new Account(to, amt.negate()));
151+
}
152+
});
153+
154+
System.out.printf("\r%,8d/%d", value, iterations);
155+
156+
return TransactionTemplate.execute(ds, transfer(legs));
157+
}
158+
)));
159+
160+
int success = 0;
161+
int fail = 0;
162+
BigDecimal total = BigDecimal.ZERO;
163+
while (!futures.isEmpty()) {
164+
System.out.printf("Awaiting completion (%d futures)\n", futures.size());
165+
try {
166+
BigDecimal tot = futures.pop().get();
167+
total = total.add(tot);
168+
success++;
169+
} catch (InterruptedException e) {
170+
Thread.currentThread().interrupt();
171+
break;
172+
} catch (ExecutionException e) {
173+
e.getCause().printStackTrace();
174+
fail++;
175+
}
176+
}
177+
178+
executorService.shutdownNow();
179+
180+
ConnectionTemplate.execute(ds, PlainJdbcApplication::readAccountNames).forEach(name -> {
181+
System.out.printf("Balance after for %s: %s\n", name,
182+
ConnectionTemplate.execute(ds, c -> readBalance(c, name)));
183+
});
184+
185+
final BigDecimal finalBalance = ConnectionTemplate.execute(ds, PlainJdbcApplication::readTotalBalance);
186+
187+
System.out.printf("Transaction success: %d\n", success);
188+
System.out.printf("Transaction fail: %d\n", fail);
189+
System.out.printf("Total turnover: %s\n", total);
190+
System.out.printf("Total balance before: %s\n", initialBalance);
191+
System.out.printf("Total balance after: %s\n", finalBalance);
192+
if (!finalBalance.equals(initialBalance)) {
193+
System.out.println("Balance invariant violation! (╯°□°)╯︵ ┻━┻");
194+
System.out.printf("Lost funds: %s\n", initialBalance.subtract(finalBalance));
195+
}
196+
}
197+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.roach.data.jdbc.plain;
2+
3+
import java.net.URL;
4+
import java.nio.file.Files;
5+
import java.nio.file.Paths;
6+
import java.sql.Statement;
7+
8+
import javax.sql.DataSource;
9+
10+
public abstract class SchemaSupport {
11+
private SchemaSupport() {
12+
}
13+
14+
public static void setupSchema(DataSource ds) throws Exception {
15+
URL sql = PlainJdbcApplication.class.getResource("/db/create.sql");
16+
17+
StringBuilder buffer = new StringBuilder();
18+
19+
Files.readAllLines(Paths.get(sql.toURI())).forEach(line -> {
20+
if (!line.startsWith("--") && !line.isEmpty()) {
21+
buffer.append(line);
22+
}
23+
if (line.endsWith(";") && buffer.length() > 0) {
24+
ConnectionTemplate.execute(ds, conn -> {
25+
try (Statement statement = conn.createStatement()) {
26+
statement.execute(buffer.toString());
27+
}
28+
buffer.setLength(0);
29+
return null;
30+
});
31+
}
32+
});
33+
}
34+
}

0 commit comments

Comments
 (0)