Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { Observable, Subscription, SubscriptionLike } from 'rxjs';
import { GraphQLError } from 'graphql';
import { ConsoleLogger, Hub, HubPayload } from '@aws-amplify/core';
import type { KeyValueStorageInterface } from '@aws-amplify/core';
import {
CustomUserAgentDetails,
DocumentType,
Expand All @@ -18,6 +19,7 @@ import {
ConnectionState,
PubSubContentObserver,
} from '../../types/PubSub';
import type { WebSocketHealthState } from '../../types';
import {
AMPLIFY_SYMBOL,
CONNECTION_INIT_TIMEOUT,
Expand Down Expand Up @@ -49,6 +51,35 @@ import {
} from './appsyncUrl';
import { awsRealTimeHeaderBasedAuth } from './authHeaders';

// Storage key for persistent keep-alive tracking
const KEEP_ALIVE_STORAGE_KEY = 'AWS_AMPLIFY_LAST_KEEP_ALIVE';

// Platform-safe storage implementation
let platformStorage: Pick<
KeyValueStorageInterface,
'setItem' | 'getItem'
> | null = null;

try {
// Try to import AsyncStorage for React Native (optional dependency)
const AsyncStorage =
// eslint-disable-next-line import/no-extraneous-dependencies
require('@react-native-async-storage/async-storage').default;
platformStorage = AsyncStorage;
} catch (e) {
// Fallback for web/other platforms - use localStorage if available
if (typeof localStorage !== 'undefined') {
platformStorage = {
setItem: (key: string, value: string) => {
localStorage.setItem(key, value);

return Promise.resolve();
},
getItem: (key: string) => Promise.resolve(localStorage.getItem(key)),
};
}
}

const dispatchApiEvent = (payload: HubPayload) => {
Hub.dispatch('api', payload, 'PubSub', AMPLIFY_SYMBOL);
};
Expand Down Expand Up @@ -106,6 +137,7 @@ export abstract class AWSWebSocketProvider {
/**
* Mark the socket closed and release all active listeners
*/

close() {
// Mark the socket closed both in status and the connection monitor
this.socketStatus = SOCKET_STATUS.CLOSED;
Expand Down Expand Up @@ -681,6 +713,15 @@ export abstract class AWSWebSocketProvider {
if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
this.maintainKeepAlive();

// Persist keep-alive timestamp for cross-session tracking
if (platformStorage) {
platformStorage
.setItem(KEEP_ALIVE_STORAGE_KEY, `${Date.now()}`)
.catch(error => {
this.logger.warn('Failed to persist keep-alive timestamp:', error);
});
}

return;
}

Expand Down Expand Up @@ -1025,4 +1066,92 @@ export abstract class AWSWebSocketProvider {
}
}
};

// WebSocket Health & Control API

/**
* Get current WebSocket health state
*/
getConnectionHealth(): WebSocketHealthState {
const timeSinceLastKeepAlive = Date.now() - this.keepAliveTimestamp;

const isHealthy =
this.connectionState === ConnectionState.Connected &&
timeSinceLastKeepAlive < DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT;

return {
isHealthy,
connectionState: this.connectionState || ConnectionState.Disconnected,
lastKeepAliveTime: this.keepAliveTimestamp,
timeSinceLastKeepAlive,
};
}

/**
* Get persistent WebSocket health state (survives app restarts)
*/
async getPersistentConnectionHealth(): Promise<WebSocketHealthState> {
let persistentKeepAliveTime = 0;

// Try to get persistent keep-alive timestamp
if (platformStorage) {
try {
const persistentKeepAlive = await platformStorage.getItem(
KEEP_ALIVE_STORAGE_KEY,
);
if (persistentKeepAlive) {
persistentKeepAliveTime = Number(persistentKeepAlive) || 0;
}
} catch (error) {
this.logger.warn(
'Failed to retrieve persistent keep-alive timestamp:',
error,
);
}
}

// Use the more recent timestamp (in-memory vs persistent)
const lastKeepAliveTime = Math.max(
this.keepAliveTimestamp,
persistentKeepAliveTime,
);

const timeSinceLastKeepAlive =
lastKeepAliveTime > 0 ? Date.now() - lastKeepAliveTime : Infinity; // If no keep-alive has been received, treat as unhealthy

// Health check includes persistent data
const isHealthy =
this.connectionState === ConnectionState.Connected &&
timeSinceLastKeepAlive < DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT;

return {
isHealthy,
connectionState: this.connectionState || ConnectionState.Disconnected,
lastKeepAliveTime: lastKeepAliveTime > 0 ? lastKeepAliveTime : undefined,
timeSinceLastKeepAlive:
lastKeepAliveTime > 0 ? timeSinceLastKeepAlive : undefined,
};
}

/**
* Check if WebSocket is currently connected
*/
isConnected(): boolean {
return this.awsRealTimeSocket?.readyState === WebSocket.OPEN;
}

/**
* Manually reconnect WebSocket
*/
async reconnect(): Promise<void> {
this.logger.info('Manual WebSocket reconnection requested');

// Close existing connection if any
if (this.isConnected()) {
await this.close();
}

// Trigger reconnection through the reconnection monitor
this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
}
}
19 changes: 19 additions & 0 deletions packages/api-graphql/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,22 @@ export interface AuthModeParams extends Record<string, unknown> {
export type GenerateServerClientParams = {
config: ResourcesConfig;
} & CommonPublicClientOptions;

// WebSocket health and control types
export interface WebSocketHealthState {
isHealthy: boolean;
connectionState: import('./PubSub').ConnectionState;
lastKeepAliveTime?: number;
timeSinceLastKeepAlive?: number;
}

export interface WebSocketControl {
reconnect(): Promise<void>;
disconnect(): void;
isConnected(): boolean;
getConnectionHealth(): WebSocketHealthState;
getPersistentConnectionHealth(): Promise<WebSocketHealthState>;
onConnectionStateChange(
callback: (state: import('./PubSub').ConnectionState) => void,
): () => void;
}