Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public final int put(final List<O> ops, final int from, final int to) {
}
final var availablePermits = incomingOpsLimiter.availablePermits();
var n = to - from;
Loggers.MSG.info("Processing batch window: from - {}, to - {}, total - {} \n", from, to, n);
n = Math.min(availablePermits, n);
if(n > 0) {
Loggers.MSG.info("Acquiring N operations: {}", n);
if(incomingOpsLimiter.tryAcquire(n)) {
incomingOps.add(new ArrayList<>(ops).subList(from, from + n));
scheduledOpCount.add(n);
Expand All @@ -94,13 +96,15 @@ public final int put(final List<O> ops) {
final void prepareAndExecuteBatch(final List<O> ops) {
// should copy the ops into the other buffer as far as invoker will clean the source buffer after put(...) exit
final var n = ops.size();
Loggers.MSG.info("Processing batch size: {}", n);
final var opsRangeCopy = new ArrayList<O>(n);
O op;
for(var i = 0; i < n; i ++) {
op = ops.get(i);
prepare(op);
opsRangeCopy.add(op);
}
Loggers.MSG.info("Batch size has been processed: {}", n);
execute(opsRangeCopy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public final void run() {
try {
while(true) {
final var ops = inQueue.poll();
Loggers.MSG.info("queue head is: {}", ops);
if(null == ops) {
final var state = stateSupplier.get();
Loggers.MSG.info("curent state for queue head {} is {}", ops, state);
if(SHUTDOWN.equals(state)) {
Loggers.MSG.debug("{}: the state is shutdown and nothing to do more, exit", workerName);
break;
Expand All @@ -54,6 +56,7 @@ public final void run() {
LockSupport.parkNanos(1);
}
} else {
Loggers.MSG.info("Semaphore releases operations: {}", ops.size());
inQueueLimiter.release(ops.size());
batchAction.accept(ops);
}
Expand Down