Skip to content

Commit a10fa8e

Browse files
committed
Fix KavaThreadPoolProvider not scheduling future tasks correctly. Reduce CPU usage.
1 parent a4a8b35 commit a10fa8e

File tree

7 files changed

+186
-37
lines changed

7 files changed

+186
-37
lines changed

CHANGES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
[1.6.3]
2+
- Fix KavaThreadPoolProvider not scheduling future tasks correctly. Reduce CPU usage.
3+
14
[1.6.2]
25
- Add KavaThreadPoolProvider for console runtimes
36

core/src/main/java/org/mini2Dx/miniscript/core/threadpool/KavaThreadPoolProvider.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
package org.mini2Dx.miniscript.core.threadpool;
2525

2626
import org.mini2Dx.miniscript.core.ThreadPoolProvider;
27-
import org.mini2Dx.miniscript.core.util.ReadWritePriorityQueue;
27+
import org.mini2Dx.miniscript.core.util.DelayedReadWritePriorityQueue;
2828

2929
import java.util.concurrent.Future;
3030
import java.util.concurrent.ScheduledFuture;
@@ -38,7 +38,7 @@ public class KavaThreadPoolProvider implements Runnable, ThreadPoolProvider {
3838
private final AtomicBoolean running = new AtomicBoolean(true);
3939
private final Thread [] threads;
4040

41-
private final ReadWritePriorityQueue<ScheduledTask> scheduledTaskQueue = new ReadWritePriorityQueue<>();
41+
private final DelayedReadWritePriorityQueue scheduledTaskQueue = new DelayedReadWritePriorityQueue();
4242

4343
public KavaThreadPoolProvider() {
4444
this(Runtime.getRuntime().availableProcessors() + 1);
@@ -102,6 +102,9 @@ public void run() {
102102
final ScheduledTask scheduledTask;
103103
try {
104104
scheduledTask = scheduledTaskQueue.take();
105+
if(scheduledTask.getScheduledStartTimeNanos() > System.nanoTime()) {
106+
107+
}
105108
} catch (InterruptedException e) {
106109
e.printStackTrace();
107110
continue;

core/src/main/java/org/mini2Dx/miniscript/core/threadpool/ScheduledTask.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class ScheduledTask implements Comparable<ScheduledTask> {
3333
private ScheduledTaskFuture future;
3434

3535
private Runnable runnable;
36-
private long scheduledStartTime;
36+
private long scheduledStartTimeNanos;
3737

3838
private long repeatInterval;
3939
private TimeUnit repeatUnit;
@@ -48,7 +48,7 @@ public void dispose() {
4848
disposed = true;
4949

5050
runnable = null;
51-
scheduledStartTime = 0L;
51+
scheduledStartTimeNanos = 0L;
5252

5353
repeatUnit = null;
5454
repeatInterval = 0L;
@@ -58,25 +58,25 @@ public void dispose() {
5858
POOL.add(this);
5959
}
6060

61-
public static ScheduledTask allocate(Runnable runnable, long scheduledStartTime) {
61+
public static ScheduledTask allocate(Runnable runnable, long scheduledStartTimeNanos) {
6262
ScheduledTask task = POOL.poll();
6363
if(task == null) {
6464
task = new ScheduledTask();
6565
}
6666
task.disposed = false;
6767
task.runnable = runnable;
68-
task.scheduledStartTime = scheduledStartTime;
68+
task.scheduledStartTimeNanos = scheduledStartTimeNanos;
6969
return task;
7070
}
7171

72-
public static ScheduledTask allocate(Runnable runnable, long scheduledStartTime, long repeatInterval, TimeUnit repeatUnit) {
72+
public static ScheduledTask allocate(Runnable runnable, long scheduledStartTimeNanos, long repeatInterval, TimeUnit repeatUnit) {
7373
ScheduledTask task = POOL.poll();
7474
if(task == null) {
7575
task = new ScheduledTask();
7676
}
7777
task.disposed = false;
7878
task.runnable = runnable;
79-
task.scheduledStartTime = scheduledStartTime;
79+
task.scheduledStartTimeNanos = scheduledStartTimeNanos;
8080
task.repeatUnit = repeatUnit;
8181
task.repeatInterval = repeatInterval;
8282
return task;
@@ -86,8 +86,8 @@ public Runnable getRunnable() {
8686
return runnable;
8787
}
8888

89-
public long getScheduledStartTime() {
90-
return scheduledStartTime;
89+
public long getScheduledStartTimeNanos() {
90+
return scheduledStartTimeNanos;
9191
}
9292

9393
public boolean isRepeating() {
@@ -104,7 +104,7 @@ public TimeUnit getRepeatUnit() {
104104

105105
@Override
106106
public int compareTo(ScheduledTask o) {
107-
return Long.compare(scheduledStartTime, o.scheduledStartTime);
107+
return Long.compare(scheduledStartTimeNanos, o.scheduledStartTimeNanos);
108108
}
109109

110110
public ScheduledTaskFuture getFuture() {

core/src/main/java/org/mini2Dx/miniscript/core/threadpool/ScheduledTaskFuture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public boolean isDone() {
120120

121121
public void setScheduledTask(ScheduledTask scheduledTask) {
122122
this.scheduledTask = scheduledTask;
123-
this.scheduledTimeNanos = scheduledTask.getScheduledStartTime();
123+
this.scheduledTimeNanos = scheduledTask.getScheduledStartTimeNanos();
124124
}
125125

126126
public void setExecutingThread(Thread thread) {

core/src/main/java/org/mini2Dx/miniscript/core/util/AbstractConcurrentBlockingQueue.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
public class AbstractConcurrentBlockingQueue<E> extends AbstractConcurrentQueue<E> implements BlockingQueue<E> {
1313
private final int maxCapacity;
14-
private final Object monitor = new Object();
14+
protected final Object waitingForItemsMonitor = new Object();
15+
protected final Object waitingForRemovalMonitor = new Object();
1516

1617
public AbstractConcurrentBlockingQueue(int maxCapacity, Queue<E> internalQueue) {
1718
super(internalQueue);
@@ -21,53 +22,53 @@ public AbstractConcurrentBlockingQueue(int maxCapacity, Queue<E> internalQueue)
2122
@Override
2223
public E poll() {
2324
final E result = super.poll();
24-
synchronized(monitor) {
25-
monitor.notifyAll();
25+
synchronized(waitingForRemovalMonitor) {
26+
waitingForRemovalMonitor.notify();
2627
}
2728
return result;
2829
}
2930

3031
@Override
3132
public boolean remove(Object o) {
3233
final boolean result = super.remove(o);
33-
synchronized(monitor) {
34-
monitor.notifyAll();
34+
synchronized(waitingForRemovalMonitor) {
35+
waitingForRemovalMonitor.notify();
3536
}
3637
return result;
3738
}
3839

3940
@Override
4041
public boolean removeAll(Collection<?> c) {
4142
final boolean result = super.removeAll(c);
42-
synchronized(monitor) {
43-
monitor.notifyAll();
43+
synchronized(waitingForRemovalMonitor) {
44+
waitingForRemovalMonitor.notify();
4445
}
4546
return result;
4647
}
4748

4849
@Override
4950
public boolean offer(E e) {
5051
final boolean result = super.offer(e);
51-
synchronized(monitor) {
52-
monitor.notifyAll();
52+
synchronized(waitingForItemsMonitor) {
53+
waitingForItemsMonitor.notify();
5354
}
5455
return result;
5556
}
5657

5758
@Override
5859
public boolean add(E e) {
5960
final boolean result = super.add(e);
60-
synchronized(monitor) {
61-
monitor.notifyAll();
61+
synchronized(waitingForItemsMonitor) {
62+
waitingForItemsMonitor.notify();
6263
}
6364
return result;
6465
}
6566

6667
@Override
6768
public boolean addAll(Collection<? extends E> c) {
6869
final boolean result = super.addAll(c);
69-
synchronized(monitor) {
70-
monitor.notifyAll();
70+
synchronized(waitingForItemsMonitor) {
71+
waitingForItemsMonitor.notify();
7172
}
7273
return result;
7374
}
@@ -77,16 +78,16 @@ public void put(E e) throws InterruptedException {
7778
lock.lockWrite();
7879
while(size() >= maxCapacity) {
7980
lock.unlockWrite();
80-
synchronized(monitor) {
81-
monitor.wait();
81+
synchronized(waitingForRemovalMonitor) {
82+
waitingForRemovalMonitor.wait();
8283
}
8384
lock.lockWrite();
8485
}
8586
super.offer(e);
8687
lock.unlockWrite();
8788

88-
synchronized(monitor) {
89-
monitor.notifyAll();
89+
synchronized(waitingForItemsMonitor) {
90+
waitingForItemsMonitor.notify();
9091
}
9192
}
9293

@@ -95,8 +96,8 @@ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExcepti
9596
lock.lockWrite();
9697
if(size() >= maxCapacity) {
9798
lock.unlockWrite();
98-
synchronized(monitor) {
99-
monitor.wait(0L, (int) unit.toNanos(timeout));
99+
synchronized(waitingForRemovalMonitor) {
100+
waitingForRemovalMonitor.wait(0L, (int) unit.toNanos(timeout));
100101
}
101102
lock.lockWrite();
102103
}
@@ -114,16 +115,16 @@ public E take() throws InterruptedException {
114115
lock.lockWrite();
115116
while (isEmpty()) {
116117
lock.unlockWrite();
117-
synchronized(monitor) {
118-
monitor.wait();
118+
synchronized(waitingForItemsMonitor) {
119+
waitingForItemsMonitor.wait();
119120
}
120121
lock.lockWrite();
121122
}
122123
final E result = super.remove();
123124
lock.unlockWrite();
124125

125-
synchronized(monitor) {
126-
monitor.notifyAll();
126+
synchronized(waitingForRemovalMonitor) {
127+
waitingForRemovalMonitor.notify();
127128
}
128129
return result;
129130
}
@@ -133,13 +134,17 @@ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
133134
lock.lockWrite();
134135
if (isEmpty()) {
135136
lock.unlockWrite();
136-
synchronized(monitor) {
137-
monitor.wait(0L, (int) unit.toNanos(timeout));
137+
synchronized(waitingForItemsMonitor) {
138+
waitingForItemsMonitor.wait(0L, (int) unit.toNanos(timeout));
138139
}
139140
lock.lockWrite();
140141
}
141142
final E result = poll();
142143
lock.unlockWrite();
144+
145+
synchronized(waitingForRemovalMonitor) {
146+
waitingForRemovalMonitor.notify();
147+
}
143148
return result;
144149
}
145150

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2021 Viridian Software Ltd.
3+
*/
4+
package org.mini2Dx.miniscript.core.util;
5+
6+
import org.mini2Dx.miniscript.core.threadpool.ScheduledTask;
7+
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class DelayedReadWritePriorityQueue extends ReadWritePriorityQueue<ScheduledTask> {
11+
12+
public DelayedReadWritePriorityQueue() {
13+
this(Integer.MAX_VALUE);
14+
}
15+
16+
public DelayedReadWritePriorityQueue(int maxCapacity) {
17+
super(maxCapacity);
18+
}
19+
20+
@Override
21+
public ScheduledTask take() throws InterruptedException {
22+
ScheduledTask head = super.peek();
23+
while(head == null || head.getScheduledStartTimeNanos() > System.nanoTime()) {
24+
if(head == null) {
25+
synchronized(waitingForItemsMonitor) {
26+
waitingForItemsMonitor.wait();
27+
}
28+
} else {
29+
long currentTimeNanos = System.nanoTime();
30+
long delayMillis = Math.max (0, (head.getScheduledStartTimeNanos() - currentTimeNanos) / TimeUnit.MILLISECONDS.toNanos(1));
31+
int delayNanos = Math.max (0, (int) ((head.getScheduledStartTimeNanos() - currentTimeNanos) % TimeUnit.MILLISECONDS.toNanos(1)));
32+
33+
synchronized(waitingForItemsMonitor) {
34+
waitingForItemsMonitor.wait(delayMillis, delayNanos);
35+
}
36+
}
37+
38+
head = super.peek();
39+
}
40+
41+
lock.lockWrite();
42+
ScheduledTask result = super.peek();
43+
while (isEmpty() || result == null || result.getScheduledStartTimeNanos() > System.nanoTime()) {
44+
lock.unlockWrite();
45+
synchronized(waitingForItemsMonitor) {
46+
if(result == null) {
47+
waitingForItemsMonitor.wait();
48+
} else {
49+
long currentTimeNanos = System.nanoTime();
50+
long delayMillis = Math.max (0, (head.getScheduledStartTimeNanos() - currentTimeNanos) / TimeUnit.MILLISECONDS.toNanos(1));
51+
int delayNanos = Math.max (0, (int) ((head.getScheduledStartTimeNanos() - currentTimeNanos) % TimeUnit.MILLISECONDS.toNanos(1)));
52+
waitingForItemsMonitor.wait(delayMillis, delayNanos);
53+
}
54+
}
55+
lock.lockWrite();
56+
result = super.peek();
57+
}
58+
result = super.poll();
59+
lock.unlockWrite();
60+
61+
synchronized(waitingForRemovalMonitor) {
62+
waitingForRemovalMonitor.notify();
63+
}
64+
return result;
65+
}
66+
}

0 commit comments

Comments
 (0)