Skip to content

Commit a9a748d

Browse files
authored
Merge pull request #5 from lerna-stack/fix-hangup-caused-by-sequential-execution-id-extractor
SequentialExecutionIdExtractor に関連するバッチ処理が終了しない問題を解決する
2 parents 3674527 + 2126589 commit a9a748d

File tree

5 files changed

+152
-12
lines changed

5 files changed

+152
-12
lines changed

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# 変更履歴
2+
3+
nablarch-fw-batch-parallelizable に関する注目すべき変更はこのファイルで文書化されます。
4+
5+
このファイルの書き方に関する推奨事項については、[Keep a Changelog](https://keepachangelog.com/ja/1.0.0/) を確認してください。
6+
[Semantic Versioning](https://semver.org/spec/v2.0.0.html) を採用しています。
7+
8+
## Unreleased
9+
10+
### FIXED
11+
- `ControllableParallelExecutor.sequentialExecutionId``null` を返すとバッチ処理が終了しない [#4](https://github.com/lerna-stack/nablarch-fw-batch-parallelizable/issues/4)
12+
13+
## [v1.2.0] - 2021-3-25
14+
[v1.2.0]: https://github.com/lerna-stack/nablarch-fw-batch-parallelizable/releases/tag/v1.2.0
15+
16+
- Initial release

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ I/Oバウンドな処理の場合はこの値を⼤きくするにともないCP
238238
- [アーキテクチャ](docs/architecture.md)
239239
- [開発ガイド](docs/developers-guide.md)
240240

241+
## Changelog
242+
注目すべき変更は [CHANGELOG](CHANGELOG.md) から確認できます。
243+
241244
## License
242245

243246
`nablarch-fw-batch-parallelizable` is released under the terms of the [Apache License Version 2.0](LICENSE.txt).

src/main/java/lerna/nablarch/batch/parallelizable/handler/ControllableParallelExecutionHandler.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -404,14 +404,40 @@ public MultiStatus handle(Object data, ExecutionContext context) {
404404

405405
final Source<Object, NotUsed> partitionedSource =
406406
DataReaderSource.create(context).toMat(
407-
PartitionHub.ofStateful(Object.class, () -> (info, d) -> {
408-
final int executionId = Math.abs(executor.sequentialExecutionId(d).getAsInt() % parallelism);
409-
// 下流でエラーが発生して killSwitch が発動した場合は size が減少して executionId と consumer の ID がずれる。
410-
// そのような状況になると同じ executionId の要素が同時実行されてしまうので、size が変ったときは全ての要素を無視する。
411-
// consumerId として負の値を返すと無視される
412-
return info.size() == parallelism ? info.consumerIdByIdx(executionId) : -1;
413-
}, parallelism, handleDataBufferSize),
414-
Keep.right()
407+
PartitionHub.ofStateful(Object.class, () -> (info, d) -> {
408+
// Consumer identifier として 負の値を返すと要素はドロップされる
409+
// See also https://doc.akka.io/api/akka/2.6.9/akka/stream/javadsl/PartitionHub$.html
410+
final int CONSUMER_IDENTIFIER_TO_DROP_THE_ELEMENT = -1;
411+
try {
412+
final ControllableParallelExecutor.SequentialExecutionIdExtractor executionIdExtractor =
413+
executor.sequentialExecutionId(d);
414+
if (executionIdExtractor == null) {
415+
// SequentialExecutionIdExtractor が null の場合には、データを振り分けることができない。
416+
// 自動的に復旧することは難しいため、Stream 全体を中止し、処理を継続できないことをユーザに通知する。
417+
final String executorInfo =
418+
String.format("%s@%s", executor.getClass().getName(), Integer.toHexString(executor.hashCode()));
419+
final String message =
420+
String.format(
421+
"%s: ControllableParallelExecutor.sequentialExecutionId(element) should not return null.",
422+
executorInfo
423+
);
424+
killSwitch.abort(new IllegalStateException(message));
425+
return CONSUMER_IDENTIFIER_TO_DROP_THE_ELEMENT;
426+
}
427+
final int executionId = Math.abs(executionIdExtractor.getAsInt() % parallelism);
428+
// 下流でエラーが発生して killSwitch が発動した場合は size が減少して executionId と consumer の ID がずれる。
429+
// そのような状況になると同じ executionId の要素が同時実行されてしまうので、size が変ったときは全ての要素を無視する。
430+
return info.size() == parallelism ? info.consumerIdByIdx(executionId) : CONSUMER_IDENTIFIER_TO_DROP_THE_ELEMENT;
431+
} catch (Throwable cause) {
432+
// 次のような理由でデータを振り分ける処理がうまくいかない場合がある。
433+
// * SequentialExecutionIdExtractor を取得する際に例外が発生した
434+
// * SequentialExecutionIdExtractor.getAsInt で例外が発生した
435+
// 自動的に復旧することは難しいため、Stream 全体を中止し、処理を継続できないことをユーザに通知する。
436+
killSwitch.abort(cause);
437+
return CONSUMER_IDENTIFIER_TO_DROP_THE_ELEMENT;
438+
}
439+
}, parallelism, handleDataBufferSize),
440+
Keep.right()
415441
).run(actorSystem);
416442

417443
final Supplier<Flow<TransactionalSession, Result, CompletionStage<MultiStatus>>> handleFlow =

src/main/java/lerna/nablarch/batch/parallelizable/handler/ControllableParallelExecutor.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,27 @@ public interface ControllableParallelExecutor<IN> extends Handler<IN, Result>, D
2525
/**
2626
* 入力データをどの単位で逐次処理するかを定義するためのメソッド。
2727
*
28+
* <p>
29+
* このメソッドの実装で例外を throw してはならない。
30+
* このメソッドで例外が throw された場合、バッチ処理は中止される。
31+
* このメソッドで受け取った element 以外の要素も処理されない可能性がある。
32+
*
2833
* @param element 処理対象の入力データ
29-
* @return 逐次処理する{@link SequentialExecutionIdExtractor}
34+
* @return 逐次処理する {@link SequentialExecutionIdExtractor}。
35+
* null を返してはならない。このメソッドが null を返す場合、バッチ処理は中止される。
36+
* このメソッドで受け取った element 以外の要素も処理されない可能性がある。
3037
*/
3138
SequentialExecutionIdExtractor sequentialExecutionId(IN element);
3239

3340
/**
34-
* <p>
3541
* 逐次処理する単位を定義する関数。
36-
* </p>
42+
*
3743
* <p>
3844
* 入力データの種類を識別する ID は int で表される。同じ ID を持つ入力データはそれぞれが並列に処理されず、逐次的に処理されることが保証される。
39-
* </p>
45+
*
46+
* <p>
47+
* {@link #getAsInt} で例外を throw してはならない。
48+
* 例外が throw された場合、バッチ処理は中止される。
4049
*
4150
* @since 2018/04/09
4251
*/

src/test/java/lerna/nablarch/batch/parallelizable/handler/ControllableParallelExecutionHandlerTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,92 @@ public Result handle(String s, ExecutionContext executionContext) {
210210
assertThat(result.isSuccess(), is(false));
211211
}
212212

213+
@Test
214+
public void ハンドラのSequentialExecutionIdExtractorがnullであれば例外IllegalStateExceptionが親ハンドラにスローされる() {
215+
int parallelism = 5;
216+
217+
ControllableParallelExecutionHandler handler = new ControllableParallelExecutionHandler();
218+
handler.setParallelism(parallelism);
219+
handler.setConnectionFactory((s) -> new TestTransactionManagerConnectionBase());
220+
handler.setTransactionFactory(s -> new TestTransactionBase());
221+
222+
ControllableParallelExecutor<String> executor = new TestControllableParallelExecutorBase() {
223+
@Override
224+
public SequentialExecutionIdExtractor sequentialExecutionId(String element) {
225+
return null;
226+
}
227+
};
228+
229+
ExecutionContext context = new ExecutionContext();
230+
context.addHandler(handler);
231+
context.addHandler(executor);
232+
233+
// 親ハンドラは handleNext から例外を catch できる
234+
final IllegalStateException exception =
235+
assertThrows(IllegalStateException.class, () -> context.handleNext("input"));
236+
final String executorInfo =
237+
String.format("%s@%s", executor.getClass().getName(), Integer.toHexString(executor.hashCode()));
238+
final String expectMessage =
239+
String.format("%s: ControllableParallelExecutor.sequentialExecutionId(element) should not return null.", executorInfo);
240+
assertThat(exception.getMessage(), is(expectMessage));
241+
242+
}
243+
244+
@Test
245+
public void ハンドラのSequentialExecutionIdExtractor取得で例外が発生したら親ハンドラにその例外がリスローされる() {
246+
int parallelism = 5;
247+
248+
ControllableParallelExecutionHandler handler = new ControllableParallelExecutionHandler();
249+
handler.setParallelism(parallelism);
250+
handler.setConnectionFactory((s) -> new TestTransactionManagerConnectionBase());
251+
handler.setTransactionFactory(s -> new TestTransactionBase());
252+
253+
ExecutionContext context = new ExecutionContext();
254+
context.addHandler(handler);
255+
context.addHandler(new TestControllableParallelExecutorBase() {
256+
@Override
257+
public SequentialExecutionIdExtractor sequentialExecutionId(String element) {
258+
throw new ArithmeticException(String.format("Could not create SequentialExecutionIdExtractor from the element(%s).", element));
259+
}
260+
});
261+
262+
// 親ハンドラは handleNext から例外を catch できる
263+
final ArithmeticException exception =
264+
assertThrows(ArithmeticException.class, () -> context.handleNext("input"));
265+
assertThat(exception.getMessage(), is("Could not create SequentialExecutionIdExtractor from the element(1)."));
266+
267+
}
268+
269+
@Test
270+
public void ハンドラのSequentialExecutionIdExtractorで例外が発生したら親ハンドラにその例外がリスローされる() {
271+
int parallelism = 5;
272+
273+
ControllableParallelExecutionHandler handler = new ControllableParallelExecutionHandler();
274+
handler.setParallelism(parallelism);
275+
handler.setConnectionFactory((s) -> new TestTransactionManagerConnectionBase());
276+
handler.setTransactionFactory(s -> new TestTransactionBase());
277+
278+
ExecutionContext context = new ExecutionContext();
279+
context.addHandler(handler);
280+
context.addHandler(new TestControllableParallelExecutorBase() {
281+
@Override
282+
public SequentialExecutionIdExtractor sequentialExecutionId(String element) {
283+
return new SequentialExecutionIdExtractor() {
284+
@Override
285+
public int getAsInt() {
286+
throw new ArithmeticException(String.format("Could not calculate ID from the element(%s).", element));
287+
}
288+
};
289+
}
290+
});
291+
292+
// 親ハンドラは handleNext から例外を catch できる
293+
final ArithmeticException exception =
294+
assertThrows(ArithmeticException.class, () -> context.handleNext("input"));
295+
assertThat(exception.getMessage(), is("Could not calculate ID from the element(1)."));
296+
297+
}
298+
213299
@Test
214300
public void executionId毎にcommitIntervalを迎えたときと終了時に1度コミットされる() {
215301
int numberOfData = 100;

0 commit comments

Comments
 (0)