Skip to content

Commit 98ae51d

Browse files
authored
Merge pull request #571 from barspi/tspace-monotonic
Tspace monotonic
2 parents c6f02d4 + dcad985 commit 98ae51d

File tree

4 files changed

+76
-70
lines changed

4 files changed

+76
-70
lines changed

jpos/src/main/java/org/jpos/space/TSpace.java

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
*/
1818

1919
package org.jpos.space;
20+
2021
import org.jpos.util.Loggeable;
2122
import java.io.PrintStream;
2223
import java.io.Serializable;
23-
import java.time.Duration;
24-
import java.time.Instant;
2524
import java.util.*;
2625
import java.util.concurrent.TimeUnit;
2726

@@ -40,8 +39,9 @@ public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
4039
private static final long GCLONG = 60*1000;
4140
private static final long NRD_RESOLUTION = 500L;
4241
private static final int MAX_ENTRIES_IN_DUMP = 1000;
42+
private static final long ONE_MILLION = 1_000_000L; // multiplier millis --> nanos
4343
private final Set[] expirables;
44-
private long lastLongGC = Instant.now().toEpochMilli();
44+
private long lastLongGC = System.nanoTime();
4545

4646
public TSpace () {
4747
super();
@@ -70,7 +70,7 @@ public void out (K key, V value, long timeout) {
7070
throw new NullPointerException ("key=" + key + ", value=" + value);
7171
Object v = value;
7272
if (timeout > 0) {
73-
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
73+
v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
7474
}
7575
synchronized (this) {
7676
List l = getList(key);
@@ -112,17 +112,18 @@ public synchronized V in (Object key) {
112112

113113
@Override
114114
public synchronized V in (Object key, long timeout) {
115-
Object obj;
116-
Instant now = Instant.now();
117-
long duration;
118-
while ((obj = inp (key)) == null &&
119-
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
115+
V obj;
116+
long now = System.nanoTime();
117+
long to = now + timeout * ONE_MILLION;
118+
long waitFor;
119+
while ( (obj = inp (key)) == null &&
120+
(waitFor = (to - System.nanoTime())) >= 0 )
120121
{
121122
try {
122-
this.wait (timeout - duration);
123+
this.wait(Math.max(waitFor / ONE_MILLION, 1L));
123124
} catch (InterruptedException e) { }
124125
}
125-
return (V) obj;
126+
return obj;
126127
}
127128

128129
@Override
@@ -138,17 +139,18 @@ public synchronized V rd (Object key) {
138139

139140
@Override
140141
public synchronized V rd (Object key, long timeout) {
141-
Object obj;
142-
Instant now = Instant.now();
143-
long duration;
144-
while ((obj = rdp (key)) == null &&
145-
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
142+
V obj;
143+
long now = System.nanoTime();
144+
long to = now + (timeout * ONE_MILLION);
145+
long waitFor;
146+
while ( (obj = rdp (key)) == null &&
147+
(waitFor = (to - System.nanoTime())) >= 0 )
146148
{
147149
try {
148-
this.wait (timeout - duration);
150+
this.wait(Math.max(waitFor / ONE_MILLION, 1L));
149151
} catch (InterruptedException e) { }
150152
}
151-
return (V) obj;
153+
return obj;
152154
}
153155

154156
@Override
@@ -162,17 +164,19 @@ public synchronized void nrd (Object key) {
162164

163165
@Override
164166
public synchronized V nrd (Object key, long timeout) {
165-
Object obj;
166-
Instant now = Instant.now();
167-
long duration;
168-
while ((obj = rdp (key)) != null &&
169-
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
167+
V obj;
168+
long now = System.nanoTime();
169+
long to = now + (timeout * ONE_MILLION);
170+
long waitFor;
171+
while ( (obj = rdp (key)) != null &&
172+
(waitFor = (to - System.nanoTime())) >= 0 )
170173
{
171174
try {
172-
this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
175+
this.wait(Math.min(NRD_RESOLUTION,
176+
Math.max(waitFor / ONE_MILLION, 1L)));
173177
} catch (InterruptedException ignored) { }
174178
}
175-
return (V) obj;
179+
return obj;
176180
}
177181

178182
@Override
@@ -186,9 +190,9 @@ public void run () {
186190

187191
public void gc () {
188192
gc(0);
189-
if (Instant.now().toEpochMilli() - lastLongGC > GCLONG) {
193+
if (System.nanoTime() - lastLongGC > GCLONG) {
190194
gc(1);
191-
lastLongGC = Instant.now().toEpochMilli();
195+
lastLongGC = System.nanoTime();
192196
}
193197
}
194198

@@ -336,7 +340,7 @@ public void push (K key, V value, long timeout) {
336340
throw new NullPointerException ("key=" + key + ", value=" + value);
337341
Object v = value;
338342
if (timeout > 0) {
339-
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
343+
v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
340344
}
341345
synchronized (this) {
342346
List l = getList(key);
@@ -373,7 +377,7 @@ public void put (K key, V value, long timeout) {
373377
throw new NullPointerException ("key=" + key + ", value=" + value);
374378
Object v = value;
375379
if (timeout > 0) {
376-
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
380+
v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
377381
}
378382
synchronized (this) {
379383
List l = new LinkedList();
@@ -399,14 +403,15 @@ public boolean existAny (K[] keys) {
399403

400404
@Override
401405
public boolean existAny (K[] keys, long timeout) {
402-
Instant now = Instant.now();
403-
long duration;
404-
while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
406+
long now = System.nanoTime();
407+
long to = now + (timeout * ONE_MILLION);
408+
long waitFor;
409+
while ((waitFor = (to - System.nanoTime())) >= 0) {
405410
if (existAny (keys))
406411
return true;
407412
synchronized (this) {
408413
try {
409-
wait (timeout - duration);
414+
this.wait(Math.max(waitFor / ONE_MILLION, 1L));
410415
} catch (InterruptedException e) { }
411416
}
412417
}
@@ -517,19 +522,24 @@ private void unregisterExpirable(Object k) {
517522

518523
static class Expirable implements Comparable, Serializable {
519524

520-
static final long serialVersionUID = 0xA7F22BF5;
525+
private static final long serialVersionUID = 0xA7F22BF5;
521526

522527
Object value;
528+
529+
/**
530+
* When to expire, in the future, as given by monotonic System.nanoTime().<br>
531+
* IMPORTANT: always use a nanosec offset from System.nanoTime()!
532+
*/
523533
long expires;
524534

525-
public Expirable (Object value, long expires) {
535+
Expirable (Object value, long expires) {
526536
super();
527537
this.value = value;
528538
this.expires = expires;
529539
}
530540

531-
public boolean isExpired () {
532-
return expires < Instant.now().toEpochMilli();
541+
boolean isExpired () {
542+
return (System.nanoTime() - expires) > 0;
533543
}
534544

535545
@Override
@@ -540,20 +550,16 @@ public String toString() {
540550
+ ",expired=" + isExpired ();
541551
}
542552

543-
public Object getValue() {
553+
Object getValue() {
544554
return isExpired() ? null : value;
545555
}
546556

547557
@Override
548-
public int compareTo (Object obj) {
549-
Expirable other = (Expirable) obj;
550-
long otherExpires = other.expires;
551-
if (otherExpires == expires)
552-
return 0;
553-
else if (expires < otherExpires)
554-
return -1;
555-
else
556-
return 1;
558+
public int compareTo (Object other) {
559+
long diff = this.expires - ((Expirable)other).expires;
560+
return diff > 0 ? 1 :
561+
diff < 0 ? -1 :
562+
0;
557563
}
558564
}
559565

jpos/src/test/java/org/jpos/space/TSpacePerformanceTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.jpos.space;
2020

2121
import java.time.Duration;
22-
import java.time.Instant;
2322
import java.util.ArrayList;
2423
import java.util.List;
2524
import java.util.concurrent.ExecutorService;
@@ -184,9 +183,9 @@ public void testDeadLockWithNotify() throws Throwable {
184183
for (int i=0; i<size; i++)
185184
es.execute(new WriteSpaceWithNotifyTask("WriteTask2-"+i,sp2,sp1));
186185

187-
Instant stamp = Instant.now();
186+
Duration stamp = Duration.ofNanos(System.nanoTime());
188187
while (((ThreadPoolExecutor)es).getActiveCount() > 0) {
189-
if (Duration.between(stamp, Instant.now()).toMillis() < 10000){
188+
if (Duration.ofNanos(System.nanoTime()).minus(stamp).toMillis() < 10000){
190189
ISOUtil.sleep(100);
191190
continue;
192191
}

jpos/src/test/java/org/jpos/space/TSpaceTest.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.io.ByteArrayOutputStream;
3131
import java.io.PrintStream;
32+
import java.time.Duration;
3233
import java.util.AbstractSet;
3334
import java.util.List;
3435
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@
4243
@SuppressWarnings("unchecked")
4344
@ExtendWith(MockitoExtension.class)
4445
public class TSpaceTest {
46+
static final long EXPIRE_OFFSET = Duration.ofDays(9999).toNanos();
4547

4648
@Test
4749
public void testConstructor() throws Throwable {
@@ -103,31 +105,33 @@ public void testExpirableConstructor() throws Throwable {
103105

104106
@Test
105107
public void testExpirableGetValue() throws Throwable {
106-
String result = (String) new TSpace.Expirable("", 9184833384926L).getValue();
108+
String result = (String) new TSpace.Expirable("", System.nanoTime() + EXPIRE_OFFSET).getValue();
107109
assertEquals("", result, "result");
108110
}
109111

110112
@Test
111113
public void testExpirableGetValue1() throws Throwable {
112-
Object result = new TSpace.Expirable(null, 9184833384926L).getValue();
114+
Object result = new TSpace.Expirable(null, System.nanoTime() + EXPIRE_OFFSET).getValue();
113115
assertNull(result, "result");
114116
}
115117

116118
@Test
117119
public void testExpirableGetValue2() throws Throwable {
118-
Object result = new TSpace.Expirable(new Object(), 100L).getValue();
120+
// using negative offset to ensure expiration (literally, object is born already expired)
121+
Object result = new TSpace.Expirable(new Object(), System.nanoTime() - EXPIRE_OFFSET).getValue();
119122
assertNull(result, "result");
120123
}
121124

122125
@Test
123126
public void testExpirableIsExpired() throws Throwable {
124-
boolean result = new TSpace.Expirable("", 9184833384926L).isExpired();
127+
boolean result = new TSpace.Expirable("", System.nanoTime() + EXPIRE_OFFSET).isExpired();
125128
assertFalse(result, "result");
126129
}
127130

128131
@Test
129132
public void testExpirableIsExpired1() throws Throwable {
130-
boolean result = new TSpace.Expirable(new Object(), 100L).isExpired();
133+
// using negative offset to ensure expiration (literally, object is born already expired)
134+
boolean result = new TSpace.Expirable(new Object(), System.nanoTime() - EXPIRE_OFFSET).isExpired();
131135
assertTrue(result, "result");
132136
}
133137

@@ -140,7 +144,7 @@ public void testExpirableToString() throws Throwable {
140144
@Test
141145
public void testExpirableToStringThrowsNullPointerException() throws Throwable {
142146
try {
143-
new TSpace.Expirable(null, 100L).toString();
147+
new TSpace.Expirable(null,100L).toString();
144148
fail("Expected NullPointerException to be thrown");
145149
} catch (NullPointerException ex) {
146150
if (isJavaVersionAtMost(JAVA_14)) {
@@ -153,7 +157,6 @@ public void testExpirableToStringThrowsNullPointerException() throws Throwable {
153157

154158
@Test
155159
public void testGc() throws Throwable {
156-
157160
TSpace tSpace = new TSpace();
158161
tSpace.gc();
159162
assertEquals(0, tSpace.entries.size(), "tSpace.entries.size()");
@@ -207,6 +210,7 @@ public void testInp1() throws Throwable {
207210
"tSpace.entries.get(\"\").get(0) had \"testString\" removed");
208211
assertEquals("testString", result, "result");
209212
assertEquals(3, tSpace.entries.size(), "tSpace.entries.size()");
213+
tSpace.dump(System.out, ">>");
210214
}
211215

212216
@Test
@@ -239,12 +243,10 @@ public void testNotifyReaders() {
239243
final Space sp = new TSpace();
240244
final AtomicInteger ai = new AtomicInteger(10);
241245
for (int i=0; i<10; i++) {
242-
new Thread() {
243-
public void run() {
244-
if (sp.rd("TEST", 5000L) != null)
245-
ai.decrementAndGet();
246-
}
247-
}.start();
246+
new Thread(()->{
247+
if (sp.rd("TEST", 5000L) != null)
248+
ai.decrementAndGet();
249+
}).start();
248250
}
249251
sp.out("TEST", Boolean.TRUE);
250252
ISOUtil.sleep(500L);

jpos/src/test/java/org/jpos/space/TSpaceTestCase.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.jpos.space;
2020

2121
import java.time.Duration;
22-
import java.time.Instant;
2322
import java.util.HashSet;
2423
import java.util.Set;
2524

@@ -289,28 +288,28 @@ public void run() {
289288
sp.out("KA", Boolean.TRUE);
290289
}
291290
}.start();
292-
Instant now = Instant.now();
291+
Duration now = Duration.ofNanos(System.nanoTime());
293292
assertTrue(sp.existAny(new String[] { "KA", "KB" }, 2000L), "existAnyWithTimeout ([KA,KB], delay)");
294-
long elapsed = Duration.between(now, Instant.now()).toMillis();
293+
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
295294
assertTrue(elapsed > 900L, "delay was > 1000");
296295
}
297296

298297
@Test
299298
public void testNRD() {
300-
Instant now = Instant.now();
299+
Duration now = Duration.ofNanos(System.nanoTime());
301300
sp.out("NRD", "NRDTEST", 1000L);
302301
sp.nrd("NRD");
303-
long elapsed = Duration.between(now, Instant.now()).toMillis();
302+
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
304303
assertTrue(elapsed >= 1000L, "Invalid elapsed time " + elapsed);
305304
}
306305
@Test
307306
public void testNRDWithDelay() {
308-
Instant now = Instant.now();
307+
Duration now = Duration.ofNanos(System.nanoTime());
309308
sp.out("NRD", "NRDTEST", 1000L);
310309
Object obj = sp.nrd("NRD", 500L);
311310
assertNotNull(obj, "Object should not be null");
312311
obj = sp.nrd("NRD", 5000L);
313-
long elapsed = Duration.between(now, Instant.now()).toMillis();
312+
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
314313
assertTrue(elapsed >= 1000L && elapsed <= 2000L, "Invalid elapsed time " + elapsed);
315314
assertNull(obj, "Object should be null");
316315
}

0 commit comments

Comments
 (0)