|
30 | 30 | import java.util.List; |
31 | 31 | import java.util.Objects; |
32 | 32 | import java.util.Optional; |
| 33 | +import java.util.concurrent.ExecutionException; |
33 | 34 | import java.util.concurrent.Flow; |
| 35 | +import java.util.concurrent.Future; |
34 | 36 | import java.util.concurrent.ScheduledExecutorService; |
35 | 37 | import java.util.concurrent.ScheduledFuture; |
36 | 38 | import java.util.concurrent.TimeUnit; |
| 39 | +import java.util.concurrent.TimeoutException; |
37 | 40 | import java.util.concurrent.atomic.AtomicBoolean; |
38 | 41 | import java.util.concurrent.atomic.AtomicLong; |
39 | 42 | import java.util.concurrent.atomic.AtomicReference; |
@@ -683,30 +686,26 @@ public void sendRequest(@NonNull final PublishStreamRequest request) { |
683 | 686 | } |
684 | 687 | final long startMs = System.currentTimeMillis(); |
685 | 688 |
|
686 | | - // Schedule timeout task to detect unresponsive block nodes |
687 | | - final ScheduledFuture<?> timeoutTask = executorService.schedule( |
688 | | - () -> { |
689 | | - if (getConnectionState() == ConnectionState.ACTIVE) { |
690 | | - logWithContext( |
691 | | - logger, |
692 | | - DEBUG, |
693 | | - this, |
694 | | - "Pipeline onNext() timed out after {}ms", |
695 | | - pipelineOperationTimeout.toMillis()); |
696 | | - blockStreamMetrics.recordPipelineOperationTimeout(); |
697 | | - handleStreamFailure(); |
698 | | - } |
699 | | - }, |
700 | | - pipelineOperationTimeout.toMillis(), |
701 | | - TimeUnit.MILLISECONDS); |
702 | | - |
| 689 | + Future<?> future = executorService.submit(() -> pipeline.onNext(request)); |
703 | 690 | try { |
704 | | - pipeline.onNext(request); |
705 | | - } finally { |
706 | | - // Cancel timeout if operation completes (successfully or not) |
707 | | - if (!timeoutTask.isDone()) { |
708 | | - timeoutTask.cancel(false); |
| 691 | + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); |
| 692 | + } catch (TimeoutException e) { |
| 693 | + future.cancel(true); // Cancel the task if it times out |
| 694 | + if (getConnectionState() == ConnectionState.ACTIVE) { |
| 695 | + logWithContext( |
| 696 | + logger, |
| 697 | + DEBUG, |
| 698 | + this, |
| 699 | + "Pipeline onNext() timed out after {}ms", |
| 700 | + pipelineOperationTimeout.toMillis()); |
| 701 | + blockStreamMetrics.recordPipelineOperationTimeout(); |
| 702 | + handleStreamFailure(); |
709 | 703 | } |
| 704 | + } catch (InterruptedException e) { |
| 705 | + Thread.currentThread().interrupt(); // Restore interrupt status |
| 706 | + throw new RuntimeException("Interrupted while waiting for pipeline.onNext()", e); |
| 707 | + } catch (ExecutionException e) { |
| 708 | + throw new RuntimeException("Error executing pipeline.onNext()", e.getCause()); |
710 | 709 | } |
711 | 710 |
|
712 | 711 | final long durationMs = System.currentTimeMillis() - startMs; |
@@ -801,32 +800,26 @@ private void closePipeline(final boolean callOnComplete) { |
801 | 800 | try { |
802 | 801 | final ConnectionState state = getConnectionState(); |
803 | 802 | if (state == ConnectionState.CLOSING && callOnComplete) { |
804 | | - // Schedule timeout task to detect unresponsive block nodes during close |
805 | | - final ScheduledFuture<?> timeoutTask = executorService.schedule( |
806 | | - () -> { |
807 | | - logWithContext( |
808 | | - logger, |
809 | | - DEBUG, |
810 | | - this, |
811 | | - "Pipeline onComplete() timed out after {}ms", |
812 | | - pipelineOperationTimeout.toMillis()); |
813 | | - blockStreamMetrics.recordPipelineOperationTimeout(); |
814 | | - // Connection is already closing, just log the timeout |
815 | | - }, |
816 | | - pipelineOperationTimeout.toMillis(), |
817 | | - TimeUnit.MILLISECONDS); |
818 | | - |
| 803 | + Future<?> future = executorService.submit(pipeline::onComplete); |
819 | 804 | try { |
820 | | - pipeline.onComplete(); |
| 805 | + future.get(pipelineOperationTimeout.toMillis(), TimeUnit.MILLISECONDS); |
821 | 806 | logWithContext(logger, DEBUG, this, "Request pipeline successfully closed."); |
822 | | - } finally { |
823 | | - // Cancel timeout if operation completes (successfully or not) |
824 | | - if (!timeoutTask.isDone()) { |
825 | | - timeoutTask.cancel(false); |
826 | | - } |
| 807 | + } catch (TimeoutException e) { |
| 808 | + future.cancel(true); // Cancel the task if it times out |
| 809 | + logWithContext( |
| 810 | + logger, |
| 811 | + DEBUG, |
| 812 | + this, |
| 813 | + "Pipeline onComplete() timed out after {}ms", |
| 814 | + pipelineOperationTimeout.toMillis()); |
| 815 | + blockStreamMetrics.recordPipelineOperationTimeout(); |
| 816 | + // Connection is already closing, just log the timeout |
| 817 | + } catch (InterruptedException e) { |
| 818 | + Thread.currentThread().interrupt(); // Restore interrupt status |
| 819 | + logWithContext(logger, DEBUG, this, "Interrupted while waiting for pipeline.onComplete()"); |
| 820 | + } catch (ExecutionException e) { |
| 821 | + logWithContext(logger, DEBUG, this, "Error executing pipeline.onComplete()", e.getCause()); |
827 | 822 | } |
828 | | - |
829 | | - logWithContext(logger, DEBUG, this, "Request pipeline successfully closed."); |
830 | 823 | } |
831 | 824 | } catch (final Exception e) { |
832 | 825 | logger.warn(formatLogMessage("Error while completing request pipeline.", this), e); |
|
0 commit comments