|
22 | 22 | import java.util.Map;
|
23 | 23 | import java.util.concurrent.CompletableFuture;
|
24 | 24 | import java.util.concurrent.ScheduledFuture;
|
| 25 | +import java.util.concurrent.atomic.AtomicBoolean; |
25 | 26 | import java.util.concurrent.atomic.AtomicReference;
|
26 | 27 |
|
27 | 28 | import org.junit.jupiter.api.BeforeEach;
|
@@ -662,6 +663,75 @@ public void receiptNotReceived() {
|
662 | 663 | verifyNoMoreInteractions(future);
|
663 | 664 | }
|
664 | 665 |
|
| 666 | + @Test |
| 667 | + void unsubscribeWithReceipt() { |
| 668 | + this.session.afterConnected(this.connection); |
| 669 | + assertThat(this.session.isConnected()).isTrue(); |
| 670 | + Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
| 671 | + |
| 672 | + Receiptable receipt = subscription.unsubscribe(); |
| 673 | + assertThat(receipt).isNotNull(); |
| 674 | + assertThat(receipt.getReceiptId()).isNull(); |
| 675 | + |
| 676 | + Message<byte[]> message = this.messageCaptor.getValue(); |
| 677 | + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
| 678 | + assertThat(accessor.getCommand()).isEqualTo(StompCommand.UNSUBSCRIBE); |
| 679 | + |
| 680 | + StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); |
| 681 | + assertThat(stompHeaders).hasSize(1); |
| 682 | + assertThat(stompHeaders.getId()).isEqualTo(subscription.getSubscriptionId()); |
| 683 | + } |
| 684 | + |
| 685 | + @Test |
| 686 | + void unsubscribeWithCustomHeaderAndReceipt() { |
| 687 | + this.session.afterConnected(this.connection); |
| 688 | + this.session.setTaskScheduler(mock()); |
| 689 | + this.session.setAutoReceipt(true); |
| 690 | + |
| 691 | + StompHeaders subHeaders = new StompHeaders(); |
| 692 | + subHeaders.setDestination("/topic/foo"); |
| 693 | + Subscription subscription = this.session.subscribe(subHeaders, mock()); |
| 694 | + |
| 695 | + StompHeaders custom = new StompHeaders(); |
| 696 | + custom.set("x-cust", "value"); |
| 697 | + |
| 698 | + Receiptable receipt = subscription.unsubscribe(custom); |
| 699 | + assertThat(receipt).isNotNull(); |
| 700 | + assertThat(receipt.getReceiptId()).isNotNull(); |
| 701 | + |
| 702 | + Message<byte[]> message = this.messageCaptor.getValue(); |
| 703 | + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
| 704 | + assertThat(accessor.getCommand()).isEqualTo(StompCommand.UNSUBSCRIBE); |
| 705 | + |
| 706 | + StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); |
| 707 | + assertThat(stompHeaders.getId()).isEqualTo(subscription.getSubscriptionId()); |
| 708 | + assertThat(stompHeaders.get("x-cust")).containsExactly("value"); |
| 709 | + assertThat(stompHeaders.getReceipt()).isEqualTo(receipt.getReceiptId()); |
| 710 | + } |
| 711 | + |
| 712 | + @Test |
| 713 | + void receiptReceivedOnUnsubscribe() { |
| 714 | + this.session.afterConnected(this.connection); |
| 715 | + TaskScheduler scheduler = mock(); |
| 716 | + this.session.setTaskScheduler(scheduler); |
| 717 | + this.session.setAutoReceipt(true); |
| 718 | + |
| 719 | + Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
| 720 | + Receiptable receipt = subscription.unsubscribe(); |
| 721 | + |
| 722 | + StompHeaderAccessor ack = StompHeaderAccessor.create(StompCommand.RECEIPT); |
| 723 | + ack.setReceiptId(receipt.getReceiptId()); |
| 724 | + ack.setLeaveMutable(true); |
| 725 | + Message<byte[]> receiptMessage = MessageBuilder.createMessage(new byte[0], ack.getMessageHeaders()); |
| 726 | + |
| 727 | + AtomicBoolean called = new AtomicBoolean(false); |
| 728 | + receipt.addReceiptTask(() -> called.set(true)); |
| 729 | + |
| 730 | + this.session.handleMessage(receiptMessage); |
| 731 | + |
| 732 | + assertThat(called.get()).isTrue(); |
| 733 | + } |
| 734 | + |
665 | 735 | @Test
|
666 | 736 | void disconnect() {
|
667 | 737 | this.session.afterConnected(this.connection);
|
|
0 commit comments