Skip to content

Commit 38e57d4

Browse files
authored
Finish the span when subscription is canceled (#134)
* Finish the span when subscription is canceled * Fix licensing headers
1 parent 0fbe461 commit 38e57d4

File tree

2 files changed

+190
-3
lines changed

2 files changed

+190
-3
lines changed

opentracing-spring-web/src/main/java/io/opentracing/contrib/spring/web/webfilter/TracingSubscriber.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors. Copyright 2019 The OpenTracing Authors.
2+
* Copyright 2013-2020 the original author or authors. Copyright 2020 The OpenTracing Authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,8 +55,20 @@ class TracingSubscriber implements CoreSubscriber<Void> {
5555

5656
@Override
5757
public void onSubscribe(final Subscription subscription) {
58-
spanDecorators.forEach(spanDecorator -> safelyCall(() -> spanDecorator.onRequest(exchange, span)));
59-
subscriber.onSubscribe(subscription);
58+
subscriber.onSubscribe(new Subscription() {
59+
@Override
60+
public void request(long n) {
61+
spanDecorators.forEach(spanDecorator -> safelyCall(() -> spanDecorator.onRequest(exchange, span)));
62+
subscription.request(n);
63+
}
64+
65+
@Override
66+
public void cancel() {
67+
span.finish();
68+
exchange.getAttributes().remove(TracingWebFilter.SERVER_SPAN_CONTEXT);
69+
subscription.cancel();
70+
}
71+
});
6072
}
6173

6274
@Override
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Copyright 2016-2020 The OpenTracing Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*/
14+
package io.opentracing.contrib.spring.web.webfilter;
15+
16+
import io.opentracing.Scope;
17+
import io.opentracing.Span;
18+
import io.opentracing.mock.MockTracer;
19+
import io.opentracing.util.ThreadLocalScopeManager;
20+
import org.junit.Before;
21+
import org.junit.Test;
22+
import org.junit.runner.RunWith;
23+
import org.mockito.Mock;
24+
import org.mockito.junit.MockitoJUnitRunner;
25+
import org.springframework.http.HttpMethod;
26+
import org.springframework.http.server.reactive.ServerHttpRequest;
27+
import org.springframework.web.server.ServerWebExchange;
28+
import reactor.core.Disposable;
29+
import reactor.core.publisher.Mono;
30+
import reactor.core.publisher.SignalType;
31+
import reactor.core.scheduler.Schedulers;
32+
33+
import java.util.Arrays;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
37+
import static org.junit.Assert.assertEquals;
38+
import static org.mockito.ArgumentMatchers.any;
39+
import static org.mockito.ArgumentMatchers.eq;
40+
import static org.mockito.BDDMockito.given;
41+
import static org.mockito.Mockito.never;
42+
import static org.mockito.Mockito.verify;
43+
44+
@RunWith(MockitoJUnitRunner.class)
45+
public class TracingSubscriberTest {
46+
47+
private final MockTracer tracer = new MockTracer(new ThreadLocalScopeManager());
48+
49+
@Mock
50+
private ServerWebExchange exchange;
51+
52+
@Mock
53+
private ServerHttpRequest request;
54+
55+
@Mock
56+
private WebFluxSpanDecorator spanDecorator;
57+
58+
@Before
59+
public void resetTracer() {
60+
tracer.reset();
61+
}
62+
63+
@Before
64+
public void mockExchange() {
65+
given(exchange.getRequest()).willReturn(request);
66+
given(request.getMethodValue()).willReturn(HttpMethod.GET.name());
67+
}
68+
69+
@Test
70+
public void testSpanIsFinishedWhenMonoHasCompleted() throws InterruptedException {
71+
Span span = tracer.buildSpan("a span").start();
72+
73+
AtomicReference<SignalType> finalSignalType = new AtomicReference<>();
74+
CountDownLatch finalSignalCountDownLatch = new CountDownLatch(1);
75+
76+
Mono<Void> source = Mono.just("5")
77+
.then()
78+
.doFinally(signalType -> {
79+
finalSignalType.set(signalType);
80+
finalSignalCountDownLatch.countDown();
81+
})
82+
.subscribeOn(Schedulers.single());
83+
84+
try (Scope scope = tracer.activateSpan(span)) {
85+
new TracingOperator(source, exchange, tracer, Arrays.asList(spanDecorator))
86+
.subscribe();
87+
finalSignalCountDownLatch.await();
88+
}
89+
90+
assertEquals(1, tracer.finishedSpans().size());
91+
assertEquals(SignalType.ON_COMPLETE, finalSignalType.get());
92+
93+
Span finishedSpan = tracer.finishedSpans().get(0);
94+
verify(spanDecorator).onRequest(exchange, finishedSpan);
95+
verify(spanDecorator).onResponse(exchange, finishedSpan);
96+
verify(spanDecorator, never()).onError(any(ServerWebExchange.class), any(Throwable.class), any(Span.class));
97+
}
98+
99+
@Test
100+
public void testSpanIsFinishedWhenMonoHasError() throws InterruptedException {
101+
Span span = tracer.buildSpan("a span").start();
102+
103+
AtomicReference<SignalType> finalSignalType = new AtomicReference<>();
104+
CountDownLatch finalSignalCountDownLatch = new CountDownLatch(2);
105+
106+
Mono<Void> source = Mono.error(new Exception("An error"))
107+
.then()
108+
.doOnError(error -> finalSignalCountDownLatch.countDown())
109+
.doFinally(signalType -> {
110+
finalSignalType.set(signalType);
111+
finalSignalCountDownLatch.countDown();
112+
})
113+
.subscribeOn(Schedulers.single());
114+
115+
try (Scope scope = tracer.activateSpan(span)) {
116+
new TracingOperator(source, exchange, tracer, Arrays.asList(spanDecorator))
117+
.subscribe();
118+
finalSignalCountDownLatch.await();
119+
}
120+
121+
assertEquals(1, tracer.finishedSpans().size());
122+
assertEquals(SignalType.ON_ERROR, finalSignalType.get());
123+
124+
Span finishedSpan = tracer.finishedSpans().get(0);
125+
verify(spanDecorator).onRequest(exchange, finishedSpan);
126+
verify(spanDecorator).onError(eq(exchange), any(Throwable.class), eq(finishedSpan));
127+
verify(spanDecorator, never()).onResponse(any(ServerWebExchange.class), any(Span.class));
128+
}
129+
130+
@Test
131+
public void testSpanIsFinishedWhenMonoHasCanceled() throws InterruptedException {
132+
Span span = tracer.buildSpan("a span").start();
133+
134+
AtomicReference<SignalType> finalSignalType = new AtomicReference<>();
135+
CountDownLatch finalSignalCountDownLatch = new CountDownLatch(1);
136+
137+
CountDownLatch canBeDisposedCountDownLatch = new CountDownLatch(1);
138+
CountDownLatch disposedCountDownLatch = new CountDownLatch(1);
139+
140+
Mono<Void> source = Mono.just("5")
141+
.doOnNext(str -> {
142+
try {
143+
canBeDisposedCountDownLatch.countDown();
144+
disposedCountDownLatch.await();
145+
} catch (InterruptedException ignored) {
146+
}
147+
})
148+
.then()
149+
.doFinally(signalType -> {
150+
finalSignalType.set(signalType);
151+
finalSignalCountDownLatch.countDown();
152+
})
153+
.subscribeOn(Schedulers.single());
154+
155+
156+
try (Scope scope = tracer.activateSpan(span)) {
157+
Disposable disposable = new TracingOperator(source, exchange, tracer, Arrays.asList(spanDecorator))
158+
.subscribe();
159+
160+
canBeDisposedCountDownLatch.await();
161+
disposable.dispose();
162+
disposedCountDownLatch.countDown();
163+
finalSignalCountDownLatch.await();
164+
}
165+
166+
167+
assertEquals(1, tracer.finishedSpans().size());
168+
assertEquals(SignalType.CANCEL, finalSignalType.get());
169+
170+
Span finishedSpan = tracer.finishedSpans().get(0);
171+
verify(spanDecorator).onRequest(exchange, finishedSpan);
172+
verify(spanDecorator, never()).onError(any(ServerWebExchange.class), any(Throwable.class), any(Span.class));
173+
verify(spanDecorator, never()).onResponse(any(ServerWebExchange.class), any(Span.class));
174+
}
175+
}

0 commit comments

Comments
 (0)