44} from "rxjs/operators" ;
55import {
66 HubConnection as SignalRHubConnection ,
7- HubConnectionBuilder as SignalRHubConnectionBuilder
7+ HubConnectionBuilder as SignalRHubConnectionBuilder ,
88} from "@microsoft/signalr" ;
9- import { from as fromPromise , BehaviorSubject , Observable , Observer , timer , throwError , Subscription , merge } from "rxjs" ;
9+ import { from , BehaviorSubject , Observable , Observer , timer , throwError , Subject } from "rxjs" ;
1010
1111import {
1212 ConnectionState , ConnectionStatus , HubConnectionOptions ,
@@ -20,6 +20,7 @@ import { emptyNext } from "./utils/rxjs";
2020const errorReasonName = "error" ;
2121const disconnectedState = Object . freeze < ConnectionState > ( { status : ConnectionStatus . disconnected } ) ;
2222const connectedState = Object . freeze < ConnectionState > ( { status : ConnectionStatus . connected } ) ;
23+ const connectingState = Object . freeze < ConnectionState > ( { status : ConnectionStatus . connecting } ) ;
2324
2425// todo: rename HubClient?
2526export class HubConnection < THub > {
@@ -36,7 +37,7 @@ export class HubConnection<THub> {
3637 private desiredState$ = new BehaviorSubject < DesiredConnectionStatus > ( DesiredConnectionStatus . disconnected ) ;
3738 private internalConnStatus$ = new BehaviorSubject < InternalConnectionStatus > ( InternalConnectionStatus . disconnected ) ;
3839 private connectionBuilder = new SignalRHubConnectionBuilder ( ) ;
39- private effects$$ = Subscription . EMPTY ;
40+ private readonly _destroy$ = new Subject < void > ( ) ;
4041
4142 private waitUntilConnect$ = this . connectionState$ . pipe (
4243 distinctUntilChanged ( ( x , y ) => x . status === y . status ) ,
@@ -62,7 +63,9 @@ export class HubConnection<THub> {
6263 if ( connectionOpts . protocol ) {
6364 this . connectionBuilder = this . connectionBuilder . withHubProtocol ( connectionOpts . protocol ) ;
6465 }
66+
6567 this . hubConnection = this . connectionBuilder . build ( ) ;
68+ connectionOpts . configureSignalRHubConnection ?.( this . hubConnection ) ;
6669 this . hubConnection . onclose ( err => {
6770 this . internalConnStatus$ . next ( InternalConnectionStatus . disconnected ) ;
6871 if ( err ) {
@@ -81,42 +84,40 @@ export class HubConnection<THub> {
8184 tap ( ( ) => this . internalConnStatus$ . next ( InternalConnectionStatus . ready ) ) ,
8285 filter ( ( ) => prevConnectionStatus === InternalConnectionStatus . connected ) ,
8386 switchMap ( ( ) => this . openConnection ( ) )
84- ) )
87+ ) ) ,
88+ takeUntil ( this . _destroy$ ) ,
8589 ) ;
8690 const desiredDisconnected$ = this . desiredState$ . pipe (
8791 filter ( status => status === DesiredConnectionStatus . disconnected ) ,
88- // tap(status => console.warn(">>>> disconnected$ ", { internalConnStatus$: this.internalConnStatus$.value, desiredStatus: status })),
92+ // tap(status => console.warn(">>>> [desiredDisconnected$] desired disconnected ", { internalConnStatus$: this.internalConnStatus$.value, desiredStatus: status })),
8993 tap ( ( ) => {
90- switch ( this . internalConnStatus$ . value ) {
91- case InternalConnectionStatus . connected :
92- this . _disconnect ( ) ;
93- break ;
94- case InternalConnectionStatus . ready :
95- this . _connectionState$ . next ( disconnectedState ) ;
96- break ;
97- // default:
98- // console.error("desiredDisconnected$ - State unhandled", this.internalConnStatus$.value);
99- // break;
94+ if ( this . _connectionState$ . value . status !== ConnectionStatus . disconnected ) {
95+ // console.warn(">>>> [desiredDisconnected$] _disconnect");
96+ // note: ideally delayWhen disconnect first, though for some reason obs not bubbling
97+ this . _disconnect ( )
10098 }
101- } )
99+ } ) ,
100+ tap ( ( ) => this . _connectionState$ . next ( disconnectedState ) ) ,
101+ takeUntil ( this . _destroy$ ) ,
102102 ) ;
103103
104- const reconnectOnDisconnect = this . _connectionState$ . pipe (
104+ const reconnectOnDisconnect$ = this . _connectionState$ . pipe (
105105 // tap(x => console.warn(">>>> _connectionState$ state changed", x)),
106106 filter ( x => x . status === ConnectionStatus . disconnected && x . reason === errorReasonName ) ,
107107 // tap(x => console.warn(">>>> reconnecting...", x)),
108- switchMap ( ( ) => this . connect ( ) )
108+ switchMap ( ( ) => this . connect ( ) ) ,
109+ takeUntil ( this . _destroy$ ) ,
109110 ) ;
110111
111- this . effects$$ = merge (
112+ [
112113 desiredDisconnected$ ,
113- reconnectOnDisconnect ,
114+ reconnectOnDisconnect$ ,
114115 connection$
115- ) . subscribe ( ) ;
116+ ] . forEach ( ( x : Observable < unknown > ) => x . subscribe ( ) ) ;
116117 }
117118
118119 connect ( data ?: ( ) => Dictionary < string > ) : Observable < void > {
119- // console.info("triggered connect", data);
120+ // console.warn("[ connect] init ", data);
120121 this . desiredState$ . next ( DesiredConnectionStatus . connected ) ;
121122 if ( this . internalConnStatus$ . value === InternalConnectionStatus . connected ) {
122123 console . warn ( `${ this . source } session already connected` ) ;
@@ -142,7 +143,7 @@ export class HubConnection<THub> {
142143 }
143144
144145 disconnect ( ) : Observable < void > {
145- // console.info("triggered disconnect");
146+ // console.warn("[ disconnect] init ");
146147 this . desiredState$ . next ( DesiredConnectionStatus . disconnected ) ;
147148 return this . untilDisconnects$ ( ) ;
148149 }
@@ -188,25 +189,26 @@ export class HubConnection<THub> {
188189 }
189190
190191 send ( methodName : keyof THub | "StreamUnsubscribe" , ...args : unknown [ ] ) : Observable < void > {
191- return fromPromise ( this . hubConnection . send ( methodName . toString ( ) , ...args ) ) ;
192+ return from ( this . hubConnection . send ( methodName . toString ( ) , ...args ) ) ;
192193 }
193194
194195 invoke < TResult > ( methodName : keyof THub , ...args : unknown [ ] ) : Observable < TResult > {
195- return fromPromise < Promise < TResult > > ( this . hubConnection . invoke ( methodName . toString ( ) , ...args ) ) ;
196+ return from < Promise < TResult > > ( this . hubConnection . invoke ( methodName . toString ( ) , ...args ) ) ;
196197 }
197198
198199 dispose ( ) : void {
199200 this . disconnect ( ) ;
200201 this . desiredState$ . complete ( ) ;
201202 this . _connectionState$ . complete ( ) ;
202203 this . internalConnStatus$ . complete ( ) ;
203- this . effects$$ . unsubscribe ( ) ;
204+ this . _destroy$ . next ( ) ;
205+ this . _destroy$ . complete ( ) ;
204206 }
205207
206208 private _disconnect ( ) : Observable < void > {
207- // console.info("triggered _disconnect", this.internalConnStatus$.value);
208- return this . internalConnStatus $. value === InternalConnectionStatus . connected
209- ? fromPromise ( this . hubConnection . stop ( ) )
209+ // console.warn("[ _disconnect] init ", this.internalConnStatus$.value, this._connectionState $.value);
210+ return this . _connectionState $. value . status !== ConnectionStatus . disconnected
211+ ? from ( this . hubConnection . stop ( ) )
210212 : emptyNext ( ) ;
211213 }
212214
@@ -225,11 +227,12 @@ export class HubConnection<THub> {
225227 }
226228
227229 private openConnection ( ) {
228- // console.info("triggered openConnection");
230+ // console.warn("[ openConnection] ");
229231 return emptyNext ( ) . pipe (
230232 // tap(x => console.warn(">>>> openConnection - attempting to connect", x)),
231- switchMap ( ( ) => fromPromise ( this . hubConnection . start ( ) ) ) ,
232- // tap(x => console.warn(">>>> openConnection - connection established", x)),
233+ tap ( ( ) => this . _connectionState$ . next ( connectingState ) ) ,
234+ switchMap ( ( ) => from ( this . hubConnection . start ( ) ) ) ,
235+ // tap(x => console.warn(">>>> [openConnection] - connection established", x)),
233236 retryWhen ( errors => errors . pipe (
234237 scan ( ( errorCount : number ) => ++ errorCount , 0 ) ,
235238 delayWhen ( ( retryCount : number ) => {
0 commit comments