Skip to content

Commit 9c181d8

Browse files
feat: support watching config changes (#208)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent d5421c2 commit 9c181d8

File tree

8 files changed

+146
-22
lines changed

8 files changed

+146
-22
lines changed

runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/PrepareConfHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.apisix.plugin.runner.A6Conf;
2727
import org.apache.apisix.plugin.runner.A6ConfigRequest;
2828
import org.apache.apisix.plugin.runner.A6ConfigResponse;
29+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
2930
import org.apache.apisix.plugin.runner.A6Request;
3031
import org.apache.apisix.plugin.runner.A6Response;
3132
import org.apache.apisix.plugin.runner.constants.Constants;
@@ -48,6 +49,7 @@ public class PrepareConfHandler extends SimpleChannelInboundHandler<A6Request> {
4849

4950
private final Cache<Long, A6Conf> cache;
5051
private final Map<String, PluginFilter> filters;
52+
private final List<A6ConfigWatcher> watchers;
5153

5254
@Override
5355
protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
@@ -75,6 +77,9 @@ protected void channelRead0(ChannelHandlerContext ctx, A6Request request) {
7577
}
7678
A6Conf a6Conf = new A6Conf(config, chain);
7779
cache.put(token, a6Conf);
80+
for (A6ConfigWatcher watcher : watchers) {
81+
watcher.watch(token, a6Conf);
82+
}
7883
ctx.write(response);
7984
ctx.writeAndFlush(response);
8085
}

runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
import io.netty.channel.unix.DomainSocketChannel;
4848
import io.netty.handler.logging.LoggingHandler;
4949
import lombok.RequiredArgsConstructor;
50+
5051
import org.apache.apisix.plugin.runner.A6Conf;
52+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
5153
import org.apache.apisix.plugin.runner.filter.PluginFilter;
5254
import org.apache.apisix.plugin.runner.handler.PrepareConfHandler;
5355
import org.apache.apisix.plugin.runner.handler.RpcCallHandler;
@@ -67,21 +69,27 @@ public class ApplicationRunner implements CommandLineRunner {
6769

6870
private Cache<Long, A6Conf> cache;
6971

70-
private ObjectProvider<PluginFilter> beanProvider;
72+
private ObjectProvider<PluginFilter> filterProvider;
73+
private ObjectProvider<A6ConfigWatcher> watcherProvider;
7174

7275
@Autowired
73-
public ApplicationRunner(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
76+
public ApplicationRunner(Cache<Long, A6Conf> cache,
77+
ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
7478
this.cache = cache;
75-
this.beanProvider = beanProvider;
79+
this.filterProvider = filterProvider;
80+
this.watcherProvider = watcherProvider;
7681
}
7782

78-
public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache, ObjectProvider<PluginFilter> beanProvider) {
83+
public PrepareConfHandler createConfigReqHandler(Cache<Long, A6Conf> cache,
84+
ObjectProvider<PluginFilter> beanProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) {
7985
List<PluginFilter> pluginFilterList = beanProvider.orderedStream().collect(Collectors.toList());
8086
Map<String, PluginFilter> filterMap = new HashMap<>();
8187
for (PluginFilter filter : pluginFilterList) {
8288
filterMap.put(filter.name(), filter);
8389
}
84-
return new PrepareConfHandler(cache, filterMap);
90+
List<A6ConfigWatcher> configWatcherList = watcherProvider.orderedStream().collect(Collectors.toList());
91+
92+
return new PrepareConfHandler(cache, filterMap, configWatcherList);
8593
}
8694

8795
public RpcCallHandler createA6HttpHandler(Cache<Long, A6Conf> cache) {
@@ -123,7 +131,7 @@ protected void initChannel(DomainSocketChannel channel) {
123131
.addAfter("logger", "payloadEncoder", new PayloadEncoder())
124132
.addAfter("payloadEncoder", "delayedDecoder", new BinaryProtocolDecoder())
125133
.addLast("payloadDecoder", new PayloadDecoder())
126-
.addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, beanProvider))
134+
.addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, filterProvider, watcherProvider))
127135
.addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache))
128136
.addLast("exceptionCaughtHandler", new ExceptionCaughtHandler());
129137

runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6ConfigHandlerTest.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@
1717

1818
package org.apache.apisix.plugin.runner.handler;
1919

20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.TimeUnit;
25+
import org.junit.jupiter.api.Assertions;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.DisplayName;
28+
import org.junit.jupiter.api.Test;
29+
2030
import com.google.common.cache.Cache;
2131
import com.google.common.cache.CacheBuilder;
2232
import com.google.flatbuffers.FlatBufferBuilder;
@@ -27,19 +37,11 @@
2737
import org.apache.apisix.plugin.runner.A6Conf;
2838
import org.apache.apisix.plugin.runner.A6ConfigRequest;
2939
import org.apache.apisix.plugin.runner.A6ConfigResponse;
40+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
3041
import org.apache.apisix.plugin.runner.HttpRequest;
3142
import org.apache.apisix.plugin.runner.HttpResponse;
3243
import org.apache.apisix.plugin.runner.filter.PluginFilter;
3344
import org.apache.apisix.plugin.runner.filter.PluginFilterChain;
34-
import org.junit.jupiter.api.Assertions;
35-
import org.junit.jupiter.api.BeforeEach;
36-
import org.junit.jupiter.api.DisplayName;
37-
import org.junit.jupiter.api.Test;
38-
39-
import java.util.HashMap;
40-
import java.util.List;
41-
import java.util.Map;
42-
import java.util.concurrent.TimeUnit;
4345

4446
@DisplayName("test add filter")
4547
class A6ConfigHandlerTest {
@@ -48,8 +50,12 @@ class A6ConfigHandlerTest {
4850

4951
Map<String, PluginFilter> filters;
5052

53+
List<A6ConfigWatcher> watchers;
54+
5155
PrepareConfHandler prepareConfHandler;
5256

57+
TestWatcher tWatcher = new TestWatcher() ;
58+
5359
@BeforeEach
5460
void setUp() {
5561
filters = new HashMap<>();
@@ -96,8 +102,10 @@ public Boolean requiredBody() {
96102
return null;
97103
}
98104
});
105+
watchers = new ArrayList<>();
106+
watchers.add(tWatcher);
99107
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
100-
prepareConfHandler = new PrepareConfHandler(cache, filters);
108+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
101109
}
102110

103111
@Test
@@ -124,6 +132,8 @@ void testAddFilter1() {
124132
Assertions.assertEquals(config.getChain().getFilters().size(), 1);
125133
Assertions.assertEquals(config.getChain().getIndex(), 0);
126134
Assertions.assertEquals(config.get("FooFilter"), "Bar");
135+
Assertions.assertEquals(tWatcher.getConfig(), config.getConfig());
136+
Assertions.assertEquals(tWatcher.getToken(), response.getConfToken());
127137
}
128138

129139
@Test

runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.apisix.plugin.runner.A6Conf;
2929
import org.apache.apisix.plugin.runner.A6ConfigRequest;
3030
import org.apache.apisix.plugin.runner.A6ConfigResponse;
31+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
3132
import org.apache.apisix.plugin.runner.A6ErrResponse;
3233
import org.apache.apisix.plugin.runner.HttpRequest;
3334
import org.apache.apisix.plugin.runner.HttpResponse;
@@ -41,6 +42,7 @@
4142

4243
import java.io.ByteArrayOutputStream;
4344
import java.io.PrintStream;
45+
import java.util.ArrayList;
4446
import java.util.HashMap;
4547
import java.util.List;
4648
import java.util.Map;
@@ -58,6 +60,8 @@ class A6HttpCallHandlerTest {
5860

5961
Map<String, PluginFilter> filters;
6062

63+
List<A6ConfigWatcher> watchers;
64+
6165
EmbeddedChannel channel;
6266

6367
PrepareConfHandler prepareConfHandler;
@@ -149,6 +153,7 @@ public Boolean requiredBody() {
149153
return null;
150154
}
151155
});
156+
watchers = new ArrayList<>();
152157
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
153158
FlatBufferBuilder builder = new FlatBufferBuilder();
154159

@@ -167,14 +172,14 @@ public Boolean requiredBody() {
167172
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
168173

169174
A6ConfigRequest request = new A6ConfigRequest(req);
170-
prepareConfHandler = new PrepareConfHandler(cache, filters);
175+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
171176
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
172177
channel.writeInbound(request);
173178
channel.finish();
174179
A6ConfigResponse response = channel.readOutbound();
175180
confToken = response.getConfToken();
176181

177-
prepareConfHandler = new PrepareConfHandler(cache, filters);
182+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
178183
rpcCallHandler = new RpcCallHandler(cache);
179184
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
180185
}

runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/ExtraInfoTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.apisix.plugin.runner.A6Conf;
2727
import org.apache.apisix.plugin.runner.A6ConfigRequest;
2828
import org.apache.apisix.plugin.runner.A6ConfigResponse;
29+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
2930
import org.apache.apisix.plugin.runner.ExtraInfoRequest;
3031
import org.apache.apisix.plugin.runner.ExtraInfoResponse;
3132
import org.apache.apisix.plugin.runner.HttpRequest;
@@ -41,6 +42,7 @@
4142

4243
import java.io.ByteArrayOutputStream;
4344
import java.io.PrintStream;
45+
import java.util.ArrayList;
4446
import java.util.HashMap;
4547
import java.util.List;
4648
import java.util.Map;
@@ -61,6 +63,8 @@ class ExtraInfoTest {
6163

6264
Map<String, PluginFilter> filters;
6365

66+
List<A6ConfigWatcher> watchers;
67+
6468
EmbeddedChannel channel;
6569

6670
PrepareConfHandler prepareConfHandler;
@@ -127,6 +131,7 @@ public Boolean requiredBody() {
127131
return true;
128132
}
129133
});
134+
watchers = new ArrayList<>();
130135
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
131136
FlatBufferBuilder builder = new FlatBufferBuilder();
132137

@@ -145,14 +150,14 @@ public Boolean requiredBody() {
145150
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
146151

147152
A6ConfigRequest request = new A6ConfigRequest(req);
148-
prepareConfHandler = new PrepareConfHandler(cache, filters);
153+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
149154
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
150155
channel.writeInbound(request);
151156
channel.finish();
152157
A6ConfigResponse response = channel.readOutbound();
153158
confToken = response.getConfToken();
154159

155-
prepareConfHandler = new PrepareConfHandler(cache, filters);
160+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
156161
rpcCallHandler = new RpcCallHandler(cache);
157162
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
158163
}

runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/PostFilterTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.apisix.plugin.runner.A6Conf;
2626
import org.apache.apisix.plugin.runner.A6ConfigRequest;
2727
import org.apache.apisix.plugin.runner.A6ConfigResponse;
28+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
2829
import org.apache.apisix.plugin.runner.PostRequest;
2930
import org.apache.apisix.plugin.runner.PostResponse;
3031
import org.apache.apisix.plugin.runner.filter.PluginFilter;
@@ -37,7 +38,9 @@
3738

3839
import java.io.ByteArrayOutputStream;
3940
import java.io.PrintStream;
41+
import java.util.ArrayList;
4042
import java.util.HashMap;
43+
import java.util.List;
4144
import java.util.Map;
4245
import java.util.concurrent.TimeUnit;
4346

@@ -52,6 +55,8 @@ public class PostFilterTest {
5255

5356
Map<String, PluginFilter> filters;
5457

58+
List<A6ConfigWatcher> watchers;
59+
5560
EmbeddedChannel channel;
5661

5762
PrepareConfHandler prepareConfHandler;
@@ -86,6 +91,7 @@ public void postFilter(PostRequest request, PostResponse response, PluginFilterC
8691
}
8792
}
8893
);
94+
watchers = new ArrayList<>();
8995
cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build();
9096
FlatBufferBuilder builder = new FlatBufferBuilder();
9197

@@ -100,14 +106,14 @@ public void postFilter(PostRequest request, PostResponse response, PluginFilterC
100106
io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer());
101107

102108
A6ConfigRequest request = new A6ConfigRequest(req);
103-
prepareConfHandler = new PrepareConfHandler(cache, filters);
109+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
104110
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler);
105111
channel.writeInbound(request);
106112
channel.finish();
107113
A6ConfigResponse response = channel.readOutbound();
108114
confToken = response.getConfToken();
109115

110-
prepareConfHandler = new PrepareConfHandler(cache, filters);
116+
prepareConfHandler = new PrepareConfHandler(cache, filters, watchers);
111117
rpcCallHandler = new RpcCallHandler(cache);
112118
channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler);
113119
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.apisix.plugin.runner.handler;
21+
22+
import java.util.Map;
23+
24+
import org.apache.apisix.plugin.runner.A6Conf;
25+
import org.apache.apisix.plugin.runner.A6ConfigWatcher;
26+
27+
class TestWatcher implements A6ConfigWatcher {
28+
private Map<String, String> config;
29+
private long token;
30+
31+
public Map<String, String> getConfig() {
32+
return config;
33+
}
34+
35+
public long getToken() {
36+
return token;
37+
}
38+
39+
@Override
40+
public String name() {
41+
return "test";
42+
}
43+
44+
@Override
45+
public void watch(long confToken, A6Conf a6Conf) {
46+
config = a6Conf.getConfig();
47+
token = confToken;
48+
}
49+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.apisix.plugin.runner;
21+
22+
public interface A6ConfigWatcher {
23+
/**
24+
* @return the name of config watcher
25+
*/
26+
String name();
27+
28+
/**
29+
* watch the change of the config
30+
*
31+
* @param confToken the config token
32+
* @param a6Conf the config
33+
*/
34+
default void watch(long confToken, A6Conf a6Conf) {
35+
}
36+
}

0 commit comments

Comments
 (0)