Skip to content

Commit 221fb10

Browse files
committed
Support duplicate context duplication.
Motivation: Duplicating a duplicated context is supported but the duplication semantic is not defined. Changes: This update the duplicated context duplication by doing a copy of each local in the duplicated duplicate. This introduce a duplicator for each local that is responsible for copying the object when it is not null.
1 parent d5f613c commit 221fb10

File tree

10 files changed

+108
-24
lines changed

10 files changed

+108
-24
lines changed

vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
3838

3939
static final boolean DISABLE_TIMINGS = SysProps.DISABLE_CONTEXT_TIMINGS.getBoolean();
4040

41-
private final VertxInternal owner;
41+
private final VertxImpl owner;
4242
private final JsonObject config;
4343
private final DeploymentContext deployment;
4444
private final CloseFuture closeFuture;
@@ -51,7 +51,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
5151
final WorkerPool workerPool;
5252
final WorkerTaskQueue executeBlockingTasks;
5353

54-
public ContextImpl(VertxInternal vertx,
54+
public ContextImpl(VertxImpl vertx,
5555
Object[] locals,
5656
EventLoopExecutor eventLoop,
5757
ThreadingModel threadingModel,
@@ -113,7 +113,7 @@ public EventLoop nettyEventLoop() {
113113
return eventLoop.eventLoop;
114114
}
115115

116-
public VertxInternal owner() {
116+
public VertxImpl owner() {
117117
return owner;
118118
}
119119

vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,27 @@
1212

1313
import io.vertx.core.spi.context.storage.ContextLocal;
1414

15+
import java.util.function.Function;
16+
1517
/**
1618
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
1719
*/
1820
public class ContextLocalImpl<T> implements ContextLocal<T> {
1921

22+
public static <T> ContextLocal<T> create(Class<T> type, Function<T, T> duplicator) {
23+
synchronized (LocalSeq.class) {
24+
int idx = LocalSeq.locals.size();
25+
ContextLocal<T> local = new ContextLocalImpl<>(idx, duplicator);
26+
LocalSeq.locals.add(local);
27+
return local;
28+
}
29+
}
30+
2031
final int index;
32+
final Function<T, T> duplicator;
2133

22-
public ContextLocalImpl(int index) {
34+
public ContextLocalImpl(int index, Function<T, T> duplicator) {
2335
this.index = index;
24-
}
25-
26-
public ContextLocalImpl() {
27-
this.index = LocalSeq.next();
36+
this.duplicator = duplicator;
2837
}
2938
}

vertx-core/src/main/java/io/vertx/core/impl/DuplicatedContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ public boolean isWorkerContext() {
142142

143143
@Override
144144
public ContextInternal duplicate() {
145-
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
145+
DuplicatedContext duplicate = new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
146+
delegate.owner().duplicate(this, duplicate);
147+
return duplicate;
146148
}
147149

148150
@Override

vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
*/
1111
package io.vertx.core.impl;
1212

13+
import io.vertx.core.internal.ContextInternal;
14+
import io.vertx.core.spi.context.storage.ContextLocal;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
1318
import java.util.concurrent.atomic.AtomicInteger;
1419

1520
/**
@@ -18,20 +23,22 @@
1823
public class LocalSeq {
1924

2025
// 0 : reserved slot for local context map
21-
private static final AtomicInteger seq = new AtomicInteger(1);
26+
static final List<ContextLocal<?>> locals = new ArrayList<>();
27+
28+
static {
29+
reset();
30+
}
2231

2332
/**
2433
* Hook for testing purposes
2534
*/
26-
public static void reset() {
27-
seq.set((1));
28-
}
29-
30-
static int get() {
31-
return seq.get();
35+
public synchronized static void reset() {
36+
// 0 : reserved slot for local context map
37+
locals.clear();
38+
locals.add(ContextInternal.LOCAL_MAP);
3239
}
3340

34-
static int next() {
35-
return seq.getAndIncrement();
41+
synchronized static ContextLocal<?>[] get() {
42+
return locals.toArray(new ContextLocal[0]);
3643
}
3744
}

vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import io.vertx.core.net.impl.*;
4646
import io.vertx.core.impl.transports.NioTransport;
4747
import io.vertx.core.spi.context.executor.EventExecutorProvider;
48+
import io.vertx.core.spi.context.storage.AccessMode;
49+
import io.vertx.core.spi.context.storage.ContextLocal;
4850
import io.vertx.core.spi.file.FileResolver;
4951
import io.vertx.core.file.impl.FileSystemImpl;
5052
import io.vertx.core.file.impl.WindowsFileSystem;
@@ -143,7 +145,8 @@ private static ThreadFactory virtualThreadFactory() {
143145
private final FileResolver fileResolver;
144146
private final EventExecutorProvider eventExecutorProvider;
145147
private final Map<ServerID, NetServerInternal> sharedNetServers = new HashMap<>();
146-
private final int contextLocals;
148+
private final ContextLocal<?>[] contextLocals;
149+
private final List<ContextLocal<?>> contextLocalsList;
147150
final WorkerPool workerPool;
148151
final WorkerPool internalWorkerPool;
149152
final WorkerPool virtualThreaWorkerPool;
@@ -202,6 +205,7 @@ private static ThreadFactory virtualThreadFactory() {
202205
ThreadFactory virtualThreadFactory = virtualThreadFactory();
203206

204207
contextLocals = LocalSeq.get();
208+
contextLocalsList = Collections.unmodifiableList(Arrays.asList(contextLocals));
205209
closeFuture = new CloseFuture(log);
206210
maxEventLoopExecTime = maxEventLoopExecuteTime;
207211
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
@@ -563,10 +567,10 @@ public boolean cancelTimer(long id) {
563567
}
564568

565569
private Object[] createContextLocals() {
566-
if (contextLocals == 0) {
570+
if (contextLocals.length == 0) {
567571
return EMPTY_CONTEXT_LOCALS;
568572
} else {
569-
return new Object[contextLocals];
573+
return new Object[contextLocals.length];
570574
}
571575
}
572576

@@ -936,6 +940,11 @@ public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
936940
return hostnameResolver.nettyAddressResolverGroup();
937941
}
938942

943+
@Override
944+
public List<ContextLocal<?>> contextLocals() {
945+
return contextLocalsList;
946+
}
947+
939948
@Override
940949
public FileResolver fileResolver() {
941950
return fileResolver;
@@ -1317,6 +1326,17 @@ public <C> C createSharedResource(String resourceKey, String resourceName, Close
13171326
return SharedResourceHolder.createSharedResource(this, resourceKey, resourceName, closeFuture, supplier);
13181327
}
13191328

1329+
void duplicate(ContextBase src, ContextBase dst) {
1330+
for (int i = 0;i < contextLocals.length;i++) {
1331+
ContextLocalImpl<?> contextLocal = (ContextLocalImpl<?>) contextLocals[i];
1332+
Object local = AccessMode.CONCURRENT.get(src.locals, i);
1333+
if (local != null) {
1334+
local = ((Function)contextLocal.duplicator).apply(local);
1335+
}
1336+
AccessMode.CONCURRENT.put(dst.locals, i, local);
1337+
}
1338+
}
1339+
13201340
/**
13211341
* Reads the version from the {@code vertx-version.txt} file.
13221342
*

vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
*/
3737
public interface ContextInternal extends Context {
3838

39-
ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);
39+
ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0, ConcurrentHashMap::new);
4040

4141
/**
4242
* @return the current context

vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.vertx.core.net.NetServerOptions;
2424
import io.vertx.core.net.impl.NetServerInternal;
2525
import io.vertx.core.net.impl.ServerID;
26+
import io.vertx.core.spi.context.storage.ContextLocal;
2627
import io.vertx.core.spi.transport.Transport;
2728
import io.vertx.core.spi.cluster.ClusterManager;
2829
import io.vertx.core.spi.file.FileResolver;
@@ -33,6 +34,7 @@
3334
import java.lang.ref.Cleaner;
3435
import java.net.InetAddress;
3536
import java.net.InetSocketAddress;
37+
import java.util.List;
3638
import java.util.Map;
3739
import java.util.concurrent.Callable;
3840
import java.util.concurrent.ExecutorService;
@@ -305,6 +307,11 @@ default <T> Future<T> executeBlockingInternal(Callable<T> blockingCodeHandler) {
305307
*/
306308
AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup();
307309

310+
/**
311+
* @return an immutable list of this vertx instance context locals
312+
*/
313+
List<ContextLocal<?>> contextLocals();
314+
308315
BlockedThreadChecker blockedThreadChecker();
309316

310317
CloseFuture closeFuture();

vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.vertx.core.net.NetServerOptions;
3131
import io.vertx.core.net.impl.NetServerInternal;
3232
import io.vertx.core.net.impl.ServerID;
33+
import io.vertx.core.spi.context.storage.ContextLocal;
3334
import io.vertx.core.spi.transport.Transport;
3435
import io.vertx.core.shareddata.SharedData;
3536
import io.vertx.core.spi.VerticleFactory;
@@ -42,9 +43,9 @@
4243
import java.lang.ref.Cleaner;
4344
import java.net.InetAddress;
4445
import java.net.InetSocketAddress;
46+
import java.util.List;
4547
import java.util.Map;
4648
import java.util.Set;
47-
import java.util.concurrent.Callable;
4849
import java.util.concurrent.ExecutorService;
4950
import java.util.concurrent.TimeUnit;
5051
import java.util.function.Function;
@@ -385,6 +386,11 @@ public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
385386
return delegate.nettyAddressResolverGroup();
386387
}
387388

389+
@Override
390+
public List<ContextLocal<?>> contextLocals() {
391+
return delegate.contextLocals();
392+
}
393+
388394
@Override
389395
public BlockedThreadChecker blockedThreadChecker() {
390396
return delegate.blockedThreadChecker();

vertx-core/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.vertx.core.internal.ContextInternal;
1515
import io.vertx.core.impl.ContextLocalImpl;
1616

17+
import java.util.function.Function;
1718
import java.util.function.Supplier;
1819

1920
/**
@@ -35,7 +36,16 @@ public interface ContextLocal<T> {
3536
* @return the context local storage
3637
*/
3738
static <T> ContextLocal<T> registerLocal(Class<T> type) {
38-
return new ContextLocalImpl<>();
39+
return ContextLocalImpl.create(type, Function.identity());
40+
}
41+
42+
/**
43+
* Registers a context local storage.
44+
*
45+
* @return the context local storage
46+
*/
47+
static <T> ContextLocal<T> registerLocal(Class<T> type, Function<T, T> duplicator) {
48+
return ContextLocalImpl.create(type, duplicator);
3949
}
4050

4151
/**

vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,4 +1204,27 @@ public void testInterruptTask(ContextInternal context, Consumer<Runnable> actor)
12041204
assertTrue((System.currentTimeMillis() - now) < 2000);
12051205
assertTrue(interrupted.get());
12061206
}
1207+
1208+
@Test
1209+
public void testNestedDuplicate() {
1210+
ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate();
1211+
ctx.putLocal("foo", "bar");
1212+
Object expected = new Object();
1213+
ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected);
1214+
ContextInternal duplicate = ctx.duplicate();
1215+
assertEquals("bar", duplicate.getLocal("foo"));
1216+
assertEquals(expected, duplicate.getLocal(contextLocal));
1217+
ctx.removeLocal("foo");
1218+
ctx.removeLocal(contextLocal, AccessMode.CONCURRENT);
1219+
assertEquals("bar", duplicate.getLocal("foo"));
1220+
assertEquals(expected, duplicate.getLocal(contextLocal));
1221+
}
1222+
1223+
@Test
1224+
public void testContextLocals() {
1225+
List<ContextLocal<?>> locals = ((VertxInternal) vertx).contextLocals();
1226+
assertSame(ContextInternal.LOCAL_MAP, locals.get(0));
1227+
assertSame(contextLocal, locals.get(1));
1228+
assertSame(locals, ((VertxInternal) vertx).contextLocals());
1229+
}
12071230
}

0 commit comments

Comments
 (0)