From bbe77153ffed17786627d8f8cdf192d21ec8b83e Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Mon, 3 Nov 2025 09:56:13 +0100 Subject: [PATCH 01/11] Create fetcher utility Signed-off-by: Assem Hafez --- .../get-workflow-history.types.ts | 8 + .../workflow-history-multi-page-fixture.ts | 42 +++ .../workflow-history-fetcher.test.tsx | 352 ++++++++++++++++++ .../helpers/workflow-history-fetcher.ts | 150 ++++++++ .../helpers/workflow-history-fetcher.types.ts | 24 ++ 5 files changed, 576 insertions(+) create mode 100644 src/views/workflow-history/__fixtures__/workflow-history-multi-page-fixture.ts create mode 100644 src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx create mode 100644 src/views/workflow-history/helpers/workflow-history-fetcher.ts create mode 100644 src/views/workflow-history/helpers/workflow-history-fetcher.types.ts diff --git a/src/route-handlers/get-workflow-history/get-workflow-history.types.ts b/src/route-handlers/get-workflow-history/get-workflow-history.types.ts index 811af96ea..fcc6f14d7 100644 --- a/src/route-handlers/get-workflow-history/get-workflow-history.types.ts +++ b/src/route-handlers/get-workflow-history/get-workflow-history.types.ts @@ -1,6 +1,10 @@ +import { type z } from 'zod'; + import { type GetWorkflowExecutionHistoryResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/GetWorkflowExecutionHistoryResponse'; import { type DefaultMiddlewaresContext } from '@/utils/route-handlers-middleware'; +import type getWorkflowHistoryQueryParamsSchema from './schemas/get-workflow-history-query-params-schema'; + export type RouteParams = { domain: string; cluster: string; @@ -12,6 +16,10 @@ export type RequestParams = { params: RouteParams; }; +export type WorkflowHistoryQueryParams = z.infer< + typeof getWorkflowHistoryQueryParamsSchema +>; + export type GetWorkflowHistoryResponse = GetWorkflowExecutionHistoryResponse; export type Context = DefaultMiddlewaresContext; diff --git a/src/views/workflow-history/__fixtures__/workflow-history-multi-page-fixture.ts b/src/views/workflow-history/__fixtures__/workflow-history-multi-page-fixture.ts new file mode 100644 index 000000000..132720796 --- /dev/null +++ b/src/views/workflow-history/__fixtures__/workflow-history-multi-page-fixture.ts @@ -0,0 +1,42 @@ +import { type GetWorkflowHistoryResponse } from '@/route-handlers/get-workflow-history/get-workflow-history.types'; + +import { + scheduleActivityTaskEvent, + startActivityTaskEvent, + completeActivityTaskEvent, +} from './workflow-history-activity-events'; +import { + completeDecisionTaskEvent, + scheduleDecisionTaskEvent, + startDecisionTaskEvent, +} from './workflow-history-decision-events'; + +/** + * Multi-page workflow history fixture for testing pagination + * Contains 3 pages with various events + */ +const workflowHistoryMultiPageFixture: GetWorkflowHistoryResponse[] = [ + // Page 1: Activity task scheduled and started + { + history: { events: [scheduleActivityTaskEvent, startActivityTaskEvent] }, + rawHistory: [], + archived: false, + nextPageToken: 'page2', + }, + // Page 2: Activity completed and decision task scheduled + { + history: { events: [completeActivityTaskEvent, scheduleDecisionTaskEvent] }, + rawHistory: [], + archived: false, + nextPageToken: 'page3', + }, + // Page 3: Decision task started and completed (last page) + { + history: { events: [startDecisionTaskEvent, completeDecisionTaskEvent] }, + rawHistory: [], + archived: false, + nextPageToken: '', + }, +]; + +export default workflowHistoryMultiPageFixture; diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx new file mode 100644 index 000000000..200a9ebd3 --- /dev/null +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx @@ -0,0 +1,352 @@ +import { QueryClient } from '@tanstack/react-query'; +import { HttpResponse } from 'msw'; + +import { waitFor } from '@/test-utils/rtl'; + +import { type GetWorkflowHistoryResponse } from '@/route-handlers/get-workflow-history/get-workflow-history.types'; +import mswMockEndpoints from '@/test-utils/msw-mock-handlers/helper/msw-mock-endpoints'; + +import workflowHistoryMultiPageFixture from '../../__fixtures__/workflow-history-multi-page-fixture'; +import WorkflowHistoryFetcher from '../workflow-history-fetcher'; + +describe(WorkflowHistoryFetcher.name, () => { + let queryClient: QueryClient; + + beforeEach(() => { + queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false, + staleTime: Infinity, + refetchOnWindowFocus: false, + }, + }, + }); + }); + + afterEach(() => { + queryClient.clear(); + }); + + it('should return the current query state from getCurrentState', async () => { + const { fetcher } = setup(queryClient); + + const initialState = fetcher.getCurrentState(); + expect(initialState.data).toBeUndefined(); + expect(initialState.status).toBe('pending'); + + fetcher.unmount(); + }); + + it('should call onChange callback on state changes', async () => { + const { fetcher } = setup(queryClient); + const callback = jest.fn(); + + fetcher.onChange(callback); + const initialCallCount = callback.mock.calls.length; + + fetcher.start((state) => !state?.data?.pages?.length); + + await waitFor(() => { + expect(callback.mock.calls.length).toBeGreaterThan(initialCallCount); + }); + fetcher.unmount(); + }); + + it('should return unsubscribe function', async () => { + const { fetcher } = setup(queryClient); + const callback1 = jest.fn(); + const callback2 = jest.fn(); + + const unsubscribe1 = fetcher.onChange(callback1); + fetcher.onChange(callback2); + + fetcher.start((state) => !state?.data?.pages?.length); + + await waitFor(() => { + expect(callback1.mock.calls.length).toEqual(callback2.mock.calls.length); + expect(callback1.mock.calls.length).toBeGreaterThan(1); + }); + + const countBeforeUnsubscribe = callback1.mock.calls.length; + unsubscribe1(); + + fetcher.fetchSingleNextPage(); + + await waitFor(() => { + expect(callback2.mock.calls.length).toBeGreaterThan( + countBeforeUnsubscribe + ); + }); + + fetcher.unmount(); + }); + + it('should respect shouldContinue callback', async () => { + const { fetcher } = setup(queryClient); + const shouldContinue = jest.fn(() => false); + + fetcher.start(shouldContinue); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.isFetching).toBe(false); + }); + + const state = fetcher.getCurrentState(); + expect(state.data?.pages || []).toHaveLength(0); + + fetcher.unmount(); + }); + + it('should stop after shouldContinue returns false', async () => { + const { fetcher } = setup(queryClient); + const shouldContinue = jest.fn((state) => { + return (state.data?.pages.length || 0) < 2; + }); + + fetcher.start(shouldContinue); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.isFetching).toBe(false); + expect(state.data?.pages).toHaveLength(2); + }); + + fetcher.unmount(); + }); + + it('should load all pages and auto-stop when there are no more pages', async () => { + const { fetcher } = setup(queryClient); + + fetcher.start(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.hasNextPage).toBe(false); + expect(state.data?.pages).toHaveLength(3); + }); + + fetcher.unmount(); + }); + + it('should auto-stop on error after initial success', async () => { + jest.useFakeTimers(); + + try { + const { fetcher } = setup(queryClient, { failOnPages: [2] }); + + fetcher.start(); + + // Wait for first page to load successfully + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.data?.pages).toHaveLength(1); + }); + + // Fast-forward through retry delays (3 retries * 3000ms each) + await jest.advanceTimersByTimeAsync(3 * 3000); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.isFetching).toBe(false); + expect(state.isError).toBe(true); + expect(state.data?.pages).toHaveLength(1); + }); + + fetcher.unmount(); + } finally { + jest.useRealTimers(); + } + }); + + it('should allow manual stop for loading all pages', async () => { + const { fetcher } = setup(queryClient); + + let stopped = false; + fetcher.onChange((state) => { + if (state.data?.pages.length === 1 && !stopped) { + stopped = true; + fetcher.stop(); + } + }); + + fetcher.start(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.isFetching).toBe(false); + expect(state.data?.pages).toHaveLength(1); + }); + + fetcher.unmount(); + }); + + it('should allow start again after stop', async () => { + const { fetcher } = setup(queryClient); + + let stopped = false; + fetcher.onChange((state) => { + if (state.data?.pages.length === 1 && !stopped) { + stopped = true; + fetcher.stop(); + } + }); + + fetcher.start(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.isFetching).toBe(false); + expect(state.data?.pages).toHaveLength(1); + }); + + fetcher.start(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.isFetching).toBe(false); + }); + + const finalState = fetcher.getCurrentState(); + expect(finalState.data?.pages).toHaveLength(3); + fetcher.unmount(); + }); + + it('should fetch next page when available', async () => { + const { fetcher } = setup(queryClient); + + fetcher.start((state) => !state?.data?.pages?.length); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.data?.pages).toHaveLength(1); + }); + fetcher.stop(); + + fetcher.fetchSingleNextPage(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.data?.pages).toHaveLength(2); + }); + + fetcher.unmount(); + }); + + it('should not fetch when already fetching', async () => { + const { fetcher } = setup(queryClient); + + fetcher.start((state) => !state?.data?.pages?.length); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.data?.pages).toHaveLength(1); + }); + fetcher.stop(); + + // fetching twice should not fetch again + fetcher.fetchSingleNextPage(); + fetcher.fetchSingleNextPage(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(!state.isFetchingNextPage).toBe(true); + }); + + const state = fetcher.getCurrentState(); + expect(state.data?.pages).toHaveLength(2); + + fetcher.unmount(); + }); + + it('should not fetch when no next page available', async () => { + const { fetcher } = setup(queryClient); + + fetcher.start(); + + await waitFor(() => { + const state = fetcher.getCurrentState(); + expect(state.hasNextPage).toBe(false); + }); + + const pageCountBefore = fetcher.getCurrentState().data?.pages.length; + fetcher.fetchSingleNextPage(); + + const state = fetcher.getCurrentState(); + expect(state.data?.pages.length).toBe(pageCountBefore); + fetcher.unmount(); + }); +}); + +function setup(client: QueryClient, options: { failOnPages?: number[] } = {}) { + const params = { + domain: 'test-domain', + cluster: 'test-cluster', + workflowId: 'test-workflow-id', + runId: 'test-run-id', + pageSize: 10, + }; + + mockHistoryEndpoint(workflowHistoryMultiPageFixture, options.failOnPages); + + const fetcher = new WorkflowHistoryFetcher(client, params); + + const waitForData = async () => { + let unsubscribe: (() => void) | undefined; + await new Promise((resolve) => { + unsubscribe = fetcher.onChange((state) => { + if (state.data !== undefined) { + resolve(); + } + }); + }); + unsubscribe?.(); + }; + return { + fetcher, + params, + waitForData, + }; +} + +function mockHistoryEndpoint( + responses: GetWorkflowHistoryResponse[], + failOnPages: number[] = [] +) { + mswMockEndpoints([ + { + path: '/api/domains/:domain/:cluster/workflows/:workflowId/:runId/history', + httpMethod: 'GET', + mockOnce: false, // Persist across multiple requests + httpResolver: async ({ request }) => { + const url = new URL(request.url); + const nextPage = url.searchParams.get('nextPage'); + + // Determine current page number based on nextPage param + let pageNumber = 1; + if (!nextPage || nextPage === 'null' || nextPage === 'undefined') { + pageNumber = 1; + } else if (nextPage === 'page2') { + pageNumber = 2; + } else if (nextPage === 'page3') { + pageNumber = 3; + } + + // Check if this page should fail + if (failOnPages.includes(pageNumber)) { + return HttpResponse.json( + { message: 'Request failed' }, + { status: 500 } + ); + } + + // Map page number to response index (0-indexed) + const responseIndex = pageNumber - 1; + const response = + responses[responseIndex] || responses[responses.length - 1]; + return HttpResponse.json(response); + }, + }, + ]); +} diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.ts new file mode 100644 index 000000000..cef2493cf --- /dev/null +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.ts @@ -0,0 +1,150 @@ +import { InfiniteQueryObserver, type QueryClient } from '@tanstack/react-query'; +import queryString from 'query-string'; + +import { + type WorkflowHistoryQueryParams, + type GetWorkflowHistoryResponse, +} from '@/route-handlers/get-workflow-history/get-workflow-history.types'; +import request from '@/utils/request'; +import { type RequestError } from '@/utils/request/request-error'; + +import { + type WorkflowHistoryQueryResult, + type QueryResultOnChangeCallback, + type ShouldContinueCallback, + type WorkflowHistoryQueryKey, +} from './workflow-history-fetcher.types'; + +export default class WorkflowHistoryFetcher { + private observer: InfiniteQueryObserver< + GetWorkflowHistoryResponse, + RequestError + >; + + private unsubscribe: (() => void) | null = null; + private isStarted = false; + private shouldContinue: ShouldContinueCallback = () => true; + + constructor( + private readonly queryClient: QueryClient, + private readonly params: WorkflowHistoryQueryParams + ) { + this.observer = new InfiniteQueryObserver< + GetWorkflowHistoryResponse, + RequestError + >(this.queryClient, { + ...this.buildObserverOptions(this.params), + }); + } + + onChange(callback: QueryResultOnChangeCallback): () => void { + const current = this.getCurrentState(); + if (current) callback(current); + return this.observer.subscribe((res: any) => { + callback(res); + }); + } + + start(shouldContinue: ShouldContinueCallback = () => true): void { + if (shouldContinue) { + this.shouldContinue = shouldContinue; + } + // If already started, return + if (this.isStarted) return; + this.isStarted = true; + let emitCount = 0; + const currentState = this.observer.getCurrentResult(); + const fetchedFirstPage = currentState.status !== 'pending'; + const shouldEnableQuery = + (!fetchedFirstPage && shouldContinue(currentState)) || fetchedFirstPage; + + if (shouldEnableQuery) { + this.observer.setOptions({ + ...this.buildObserverOptions(this.params), + enabled: true, + }); + } + + const emit = (res: WorkflowHistoryQueryResult) => { + emitCount++; + + // Auto stop when there are no more pages (end of history) or when there is a fresh error happens after the start. + // isError is true when the request failes and retries are exhausted. + if (res.hasNextPage === false || (res.isError && emitCount > 1)) { + this.stop(); + return; + } + + // Drive pagination based on external predicate + if (this.shouldContinue(res) && !res.isFetchingNextPage) { + res.fetchNextPage(); + } + }; + + // only start emit (fetching next pages) after the initial fetch is complete + // first page is already fetched on the first subscription below + if (fetchedFirstPage) { + emit(currentState); + } + + if (this.unsubscribe) { + this.unsubscribe(); + } + this.unsubscribe = this.observer.subscribe((res) => emit(res)); + } + + stop(): void { + this.isStarted = false; + if (this.unsubscribe) { + this.unsubscribe(); + this.unsubscribe = null; + } + } + unmount(): void { + this.stop(); + this.observer.destroy(); + } + + fetchSingleNextPage(): void { + const state = this.getCurrentState(); + + if (state.status === 'pending') { + this.observer.setOptions({ + ...this.buildObserverOptions(this.params), + enabled: true, + }); + } else if (!state.isFetchingNextPage && state.hasNextPage) + state.fetchNextPage(); + } + + getCurrentState(): WorkflowHistoryQueryResult { + return this.observer.getCurrentResult(); + } + + private buildObserverOptions(params: WorkflowHistoryQueryParams) { + return { + queryKey: [ + 'workflow_history_paginated', + params, + ] satisfies WorkflowHistoryQueryKey, + queryFn: ({ queryKey: [_, qp], pageParam }: any) => + request( + queryString.stringifyUrl({ + url: `/api/domains/${qp.domain}/${qp.cluster}/workflows/${qp.workflowId}/${qp.runId}/history`, + query: { + nextPage: pageParam, + pageSize: qp.pageSize, + waitForNewEvent: qp.waitForNewEvent ?? false, + } satisfies WorkflowHistoryQueryParams, + }) + ).then((res) => res.json()), + initialPageParam: undefined, + getNextPageParam: (lastPage: GetWorkflowHistoryResponse) => { + return lastPage.nextPageToken ? lastPage.nextPageToken : undefined; + }, + retry: 3, + retryDelay: 3000, + enabled: false, + }; + } +} diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts new file mode 100644 index 000000000..457f8b37b --- /dev/null +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts @@ -0,0 +1,24 @@ +import { + type InfiniteData, + type InfiniteQueryObserverResult, +} from '@tanstack/react-query'; + +import { + type WorkflowHistoryQueryParams, + type GetWorkflowHistoryResponse, +} from '@/route-handlers/get-workflow-history/get-workflow-history.types'; +import { type RequestError } from '@/utils/request/request-error'; + +export type WorkflowHistoryQueryKey = [string, WorkflowHistoryQueryParams]; + +export type WorkflowHistoryQueryResult = InfiniteQueryObserverResult< + InfiniteData, + RequestError +>; +export type QueryResultOnChangeCallback = ( + state: WorkflowHistoryQueryResult +) => void; + +export type ShouldContinueCallback = ( + state: WorkflowHistoryQueryResult +) => boolean; From 6b099458feb93feafa4cf6bdd10018946079591c Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Mon, 3 Nov 2025 10:26:30 +0100 Subject: [PATCH 02/11] rename query Signed-off-by: Assem Hafez --- .../workflow-history/helpers/workflow-history-fetcher.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.ts index cef2493cf..4b97cf89d 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.ts @@ -123,10 +123,7 @@ export default class WorkflowHistoryFetcher { private buildObserverOptions(params: WorkflowHistoryQueryParams) { return { - queryKey: [ - 'workflow_history_paginated', - params, - ] satisfies WorkflowHistoryQueryKey, + queryKey: ['workflow_history', params] satisfies WorkflowHistoryQueryKey, queryFn: ({ queryKey: [_, qp], pageParam }: any) => request( queryString.stringifyUrl({ From 1b5796af04c35734f5e69558438ddd1fbddf4e77 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Mon, 3 Nov 2025 11:41:28 +0100 Subject: [PATCH 03/11] Create hook for fetching history Signed-off-by: Assem Hafez --- .../use-workflow-history-fetcher.test.tsx | 196 ++++++++++++++++++ .../hooks/use-workflow-history-fetcher.ts | 87 ++++++++ 2 files changed, 283 insertions(+) create mode 100644 src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx create mode 100644 src/views/workflow-history/hooks/use-workflow-history-fetcher.ts diff --git a/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx b/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx new file mode 100644 index 000000000..9c5a83976 --- /dev/null +++ b/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx @@ -0,0 +1,196 @@ +import { QueryClient } from '@tanstack/react-query'; + +import { act, renderHook, waitFor } from '@/test-utils/rtl'; + +import workflowHistoryMultiPageFixture from '../../__fixtures__/workflow-history-multi-page-fixture'; +import { workflowPageUrlParams } from '../../__fixtures__/workflow-page-url-params'; +import WorkflowHistoryFetcher from '../../helpers/workflow-history-fetcher'; +import useWorkflowHistoryFetcher from '../use-workflow-history-fetcher'; + +jest.mock('../../helpers/workflow-history-fetcher'); + +const mockParams = { + ...workflowPageUrlParams, + pageSize: 50, + waitForNewEvent: true, +}; +let mockFetcherInstance: jest.Mocked; +let mockOnChangeCallback: jest.Mock; +let mockUnsubscribe: jest.Mock; + +function setup() { + const hookResult = renderHook(() => useWorkflowHistoryFetcher(mockParams)); + + return { + ...hookResult, + mockFetcherInstance, + mockOnChangeCallback, + mockUnsubscribe, + }; +} + +describe(useWorkflowHistoryFetcher.name, () => { + beforeEach(() => { + jest.clearAllMocks(); + + mockOnChangeCallback = jest.fn(); + mockUnsubscribe = jest.fn(); + + mockFetcherInstance = { + start: jest.fn(), + stop: jest.fn(), + unmount: jest.fn(), + fetchSingleNextPage: jest.fn(), + onChange: jest.fn((callback) => { + mockOnChangeCallback.mockImplementation(callback); + return mockUnsubscribe; + }), + getCurrentState: jest.fn(() => ({ + data: undefined, + error: null, + isError: false, + isLoading: false, + isPending: true, + isFetchingNextPage: false, + hasNextPage: false, + status: 'pending' as const, + })), + } as unknown as jest.Mocked; + + ( + WorkflowHistoryFetcher as jest.MockedClass + ).mockImplementation(() => mockFetcherInstance); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should create a WorkflowHistoryFetcher instance with correct params', () => { + setup(); + + expect(WorkflowHistoryFetcher).toHaveBeenCalledWith( + expect.any(QueryClient), + mockParams + ); + expect(WorkflowHistoryFetcher).toHaveBeenCalledTimes(1); + }); + + it('should reuse the same fetcher instance on re-renders', () => { + const { rerender } = setup(); + + rerender(); + rerender(); + + expect(WorkflowHistoryFetcher).toHaveBeenCalledTimes(1); + }); + + it('should subscribe to fetcher state changes on mount', () => { + setup(); + + expect(mockFetcherInstance.onChange).toHaveBeenCalledTimes(1); + }); + + it('should start fetcher to load first page on mount', () => { + setup(); + + expect(mockFetcherInstance.start).toHaveBeenCalledWith( + expect.any(Function) + ); + expect(mockFetcherInstance.start).toHaveBeenCalledTimes(1); + }); + + it('should return initial history query state', () => { + const { result } = setup(); + + expect(result.current.historyQuery).toBeDefined(); + expect(result.current.historyQuery.isPending).toBe(true); + }); + + it('should update historyQuery when fetcher state changes', async () => { + const { result, mockOnChangeCallback } = setup(); + + const newState = { + data: { + pages: [workflowHistoryMultiPageFixture[0]], + pageParams: [], + }, + error: null, + isError: false, + isLoading: false, + isPending: false, + isFetchingNextPage: false, + hasNextPage: true, + status: 'success' as const, + }; + + act(() => { + mockOnChangeCallback(newState); + }); + + await waitFor(() => { + expect(result.current.historyQuery.status).toBe('success'); + }); + }); + + it('should call fetcher.start() with custom shouldContinue callback passed to startLoadingHistory', () => { + const { result, mockFetcherInstance } = setup(); + const customShouldContinue = jest.fn(() => false); + + act(() => { + result.current.startLoadingHistory(customShouldContinue); + }); + + expect(mockFetcherInstance.start).toHaveBeenCalledWith( + customShouldContinue + ); + }); + + it('should call fetcher.stop() within stopLoadingHistory', () => { + const { result, mockFetcherInstance } = setup(); + + act(() => { + result.current.stopLoadingHistory(); + }); + + expect(mockFetcherInstance.stop).toHaveBeenCalledTimes(1); + }); + + it('should call fetcher.fetchSingleNextPage() within fetchSingleNextPage', () => { + const { result, mockFetcherInstance } = setup(); + + act(() => { + result.current.fetchSingleNextPage(); + }); + + expect(mockFetcherInstance.fetchSingleNextPage).toHaveBeenCalledTimes(1); + }); + + it('should unsubscribe from onChange when unmounted', () => { + const { unmount, mockUnsubscribe } = setup(); + + unmount(); + + expect(mockUnsubscribe).toHaveBeenCalledTimes(1); + }); + + it('should call fetcher.unmount() when component unmounts', () => { + const { unmount, mockFetcherInstance } = setup(); + + unmount(); + + expect(mockFetcherInstance.unmount).toHaveBeenCalledTimes(1); + }); + + it('should return all expected methods and state', () => { + const { result } = setup(); + + expect(result.current).toHaveProperty('historyQuery'); + expect(result.current).toHaveProperty('startLoadingHistory'); + expect(result.current).toHaveProperty('stopLoadingHistory'); + expect(result.current).toHaveProperty('fetchSingleNextPage'); + expect(typeof result.current.startLoadingHistory).toBe('function'); + expect(typeof result.current.stopLoadingHistory).toBe('function'); + expect(typeof result.current.fetchSingleNextPage).toBe('function'); + }); +}); diff --git a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts new file mode 100644 index 000000000..e23d7c4e6 --- /dev/null +++ b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts @@ -0,0 +1,87 @@ +import { useCallback, useEffect, useRef } from 'react'; + +import { + type InfiniteData, + type InfiniteQueryObserverResult, + useQueryClient, +} from '@tanstack/react-query'; + +import useThrottledState from '@/hooks/use-throttled-state'; +import { + type WorkflowHistoryQueryParams, + type GetWorkflowHistoryResponse, + type RouteParams, +} from '@/route-handlers/get-workflow-history/get-workflow-history.types'; +import { type RequestError } from '@/utils/request/request-error'; + +import WorkflowHistoryFetcher from '../helpers/workflow-history-fetcher'; +import { type ShouldContinueCallback } from '../helpers/workflow-history-fetcher.types'; + +export default function useWorkflowHistoryFetcher( + params: WorkflowHistoryQueryParams & RouteParams +) { + const queryClient = useQueryClient(); + const fetcherRef = useRef(null); + + if (!fetcherRef.current) { + fetcherRef.current = new WorkflowHistoryFetcher(queryClient, params); + } + + const [historyQuery, setHistoryQuery] = useThrottledState< + InfiniteQueryObserverResult< + InfiniteData, + RequestError + > + >(fetcherRef.current.getCurrentState(), 2000, { + leading: true, + trailing: true, + }); + + useEffect(() => { + if (!fetcherRef.current) return; + + const unsubscribe = fetcherRef.current.onChange((state) => { + const pagesCount = state.data?.pages?.length || 0; + // immediately set if there is the first page without throttling other wise throttle + setHistoryQuery(() => state, pagesCount <= 1); + }); + + // Fetch first page + fetcherRef.current.start((state) => !state?.data?.pages?.length); + + return () => { + unsubscribe(); + }; + }, [setHistoryQuery]); + + useEffect(() => { + return () => { + fetcherRef.current?.unmount(); + }; + }, []); + + const startLoadingHistory = useCallback( + (shouldContinue: ShouldContinueCallback = () => true) => { + if (!fetcherRef.current) return; + fetcherRef.current.start(shouldContinue); + }, + [] + ); + + const stopLoadingHistory = useCallback(() => { + if (!fetcherRef.current) return; + fetcherRef.current.stop(); + }, []); + + const fetchSingleNextPage = useCallback(() => { + if (!fetcherRef.current) return; + fetcherRef.current.fetchSingleNextPage(); + }, []); + + return { + historyQuery, + startLoadingHistory, + stopLoadingHistory, + fetchSingleNextPage, + }; +} From ae114ecc3e754a8aa148c289ccdd956fb46e863c Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Mon, 3 Nov 2025 13:09:29 +0100 Subject: [PATCH 04/11] add configurable throttleMs to the hook Signed-off-by: Assem Hafez --- .../workflow-history/hooks/use-workflow-history-fetcher.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts index e23d7c4e6..cfb375f13 100644 --- a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts +++ b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts @@ -18,7 +18,8 @@ import WorkflowHistoryFetcher from '../helpers/workflow-history-fetcher'; import { type ShouldContinueCallback } from '../helpers/workflow-history-fetcher.types'; export default function useWorkflowHistoryFetcher( - params: WorkflowHistoryQueryParams & RouteParams + params: WorkflowHistoryQueryParams & RouteParams, + throttleMs: number = 2000 ) { const queryClient = useQueryClient(); const fetcherRef = useRef(null); @@ -32,7 +33,7 @@ export default function useWorkflowHistoryFetcher( InfiniteData, RequestError > - >(fetcherRef.current.getCurrentState(), 2000, { + >(fetcherRef.current.getCurrentState(), throttleMs, { leading: true, trailing: true, }); From b9e288b837de1282afe8715b84685c5dd1dfc106 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 13:57:50 +0100 Subject: [PATCH 05/11] update fetcher based on feedback Signed-off-by: Assem Hafez --- .../workflow-history-fetcher.test.tsx | 41 +++++---------- .../helpers/workflow-history-fetcher.ts | 52 +++++++++---------- .../helpers/workflow-history-fetcher.types.ts | 31 +++++++++-- 3 files changed, 63 insertions(+), 61 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx index 200a9ebd3..aa1f59ddc 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx @@ -9,9 +9,13 @@ import mswMockEndpoints from '@/test-utils/msw-mock-handlers/helper/msw-mock-end import workflowHistoryMultiPageFixture from '../../__fixtures__/workflow-history-multi-page-fixture'; import WorkflowHistoryFetcher from '../workflow-history-fetcher'; -describe(WorkflowHistoryFetcher.name, () => { - let queryClient: QueryClient; +const RETRY_DELAY = 3000; +const RETRY_COUNT = 3; + +let queryClient: QueryClient; +let hoistedFetcher: WorkflowHistoryFetcher; +describe(WorkflowHistoryFetcher.name, () => { beforeEach(() => { queryClient = new QueryClient({ defaultOptions: { @@ -26,6 +30,7 @@ describe(WorkflowHistoryFetcher.name, () => { afterEach(() => { queryClient.clear(); + hoistedFetcher?.unmount(); }); it('should return the current query state from getCurrentState', async () => { @@ -34,8 +39,6 @@ describe(WorkflowHistoryFetcher.name, () => { const initialState = fetcher.getCurrentState(); expect(initialState.data).toBeUndefined(); expect(initialState.status).toBe('pending'); - - fetcher.unmount(); }); it('should call onChange callback on state changes', async () => { @@ -50,7 +53,6 @@ describe(WorkflowHistoryFetcher.name, () => { await waitFor(() => { expect(callback.mock.calls.length).toBeGreaterThan(initialCallCount); }); - fetcher.unmount(); }); it('should return unsubscribe function', async () => { @@ -61,6 +63,7 @@ describe(WorkflowHistoryFetcher.name, () => { const unsubscribe1 = fetcher.onChange(callback1); fetcher.onChange(callback2); + // Fetch the first page fetcher.start((state) => !state?.data?.pages?.length); await waitFor(() => { @@ -78,11 +81,9 @@ describe(WorkflowHistoryFetcher.name, () => { countBeforeUnsubscribe ); }); - - fetcher.unmount(); }); - it('should respect shouldContinue callback', async () => { + it('should not fetch any pages if shouldContinue callback returns false', async () => { const { fetcher } = setup(queryClient); const shouldContinue = jest.fn(() => false); @@ -95,8 +96,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages || []).toHaveLength(0); - - fetcher.unmount(); }); it('should stop after shouldContinue returns false', async () => { @@ -112,8 +111,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.isFetching).toBe(false); expect(state.data?.pages).toHaveLength(2); }); - - fetcher.unmount(); }); it('should load all pages and auto-stop when there are no more pages', async () => { @@ -126,8 +123,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.hasNextPage).toBe(false); expect(state.data?.pages).toHaveLength(3); }); - - fetcher.unmount(); }); it('should auto-stop on error after initial success', async () => { @@ -144,8 +139,8 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.data?.pages).toHaveLength(1); }); - // Fast-forward through retry delays (3 retries * 3000ms each) - await jest.advanceTimersByTimeAsync(3 * 3000); + // Fast-forward through retry delays + await jest.advanceTimersByTimeAsync(RETRY_COUNT * RETRY_DELAY); await waitFor(() => { const state = fetcher.getCurrentState(); @@ -153,8 +148,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.isError).toBe(true); expect(state.data?.pages).toHaveLength(1); }); - - fetcher.unmount(); } finally { jest.useRealTimers(); } @@ -178,8 +171,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.isFetching).toBe(false); expect(state.data?.pages).toHaveLength(1); }); - - fetcher.unmount(); }); it('should allow start again after stop', async () => { @@ -210,7 +201,6 @@ describe(WorkflowHistoryFetcher.name, () => { const finalState = fetcher.getCurrentState(); expect(finalState.data?.pages).toHaveLength(3); - fetcher.unmount(); }); it('should fetch next page when available', async () => { @@ -230,8 +220,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages).toHaveLength(2); }); - - fetcher.unmount(); }); it('should not fetch when already fetching', async () => { @@ -256,8 +244,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages).toHaveLength(2); - - fetcher.unmount(); }); it('should not fetch when no next page available', async () => { @@ -275,7 +261,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages.length).toBe(pageCountBefore); - fetcher.unmount(); }); }); @@ -289,8 +274,8 @@ function setup(client: QueryClient, options: { failOnPages?: number[] } = {}) { }; mockHistoryEndpoint(workflowHistoryMultiPageFixture, options.failOnPages); - const fetcher = new WorkflowHistoryFetcher(client, params); + hoistedFetcher = fetcher; const waitForData = async () => { let unsubscribe: (() => void) | undefined; @@ -325,7 +310,7 @@ function mockHistoryEndpoint( // Determine current page number based on nextPage param let pageNumber = 1; - if (!nextPage || nextPage === 'null' || nextPage === 'undefined') { + if (!nextPage) { pageNumber = 1; } else if (nextPage === 'page2') { pageNumber = 2; diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.ts index 4b97cf89d..c03a3f95b 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.ts @@ -6,20 +6,18 @@ import { type GetWorkflowHistoryResponse, } from '@/route-handlers/get-workflow-history/get-workflow-history.types'; import request from '@/utils/request'; -import { type RequestError } from '@/utils/request/request-error'; import { type WorkflowHistoryQueryResult, type QueryResultOnChangeCallback, type ShouldContinueCallback, - type WorkflowHistoryQueryKey, + type WorkflowHistoryReactQueryParams, + type WorkflowHistoryInfiniteQueryOptions, + type WorkflowHistoryInfiniteQueryObserver, } from './workflow-history-fetcher.types'; export default class WorkflowHistoryFetcher { - private observer: InfiniteQueryObserver< - GetWorkflowHistoryResponse, - RequestError - >; + private observer: WorkflowHistoryInfiniteQueryObserver; private unsubscribe: (() => void) | null = null; private isStarted = false; @@ -27,12 +25,9 @@ export default class WorkflowHistoryFetcher { constructor( private readonly queryClient: QueryClient, - private readonly params: WorkflowHistoryQueryParams + private readonly params: WorkflowHistoryReactQueryParams ) { - this.observer = new InfiniteQueryObserver< - GetWorkflowHistoryResponse, - RequestError - >(this.queryClient, { + this.observer = new InfiniteQueryObserver(this.queryClient, { ...this.buildObserverOptions(this.params), }); } @@ -40,7 +35,7 @@ export default class WorkflowHistoryFetcher { onChange(callback: QueryResultOnChangeCallback): () => void { const current = this.getCurrentState(); if (current) callback(current); - return this.observer.subscribe((res: any) => { + return this.observer.subscribe((res) => { callback(res); }); } @@ -55,8 +50,7 @@ export default class WorkflowHistoryFetcher { let emitCount = 0; const currentState = this.observer.getCurrentResult(); const fetchedFirstPage = currentState.status !== 'pending'; - const shouldEnableQuery = - (!fetchedFirstPage && shouldContinue(currentState)) || fetchedFirstPage; + const shouldEnableQuery = !fetchedFirstPage && shouldContinue(currentState); if (shouldEnableQuery) { this.observer.setOptions({ @@ -68,7 +62,7 @@ export default class WorkflowHistoryFetcher { const emit = (res: WorkflowHistoryQueryResult) => { emitCount++; - // Auto stop when there are no more pages (end of history) or when there is a fresh error happens after the start. + // Auto stop when there are no more pages (end of history) or when there is an existing error from last start (emitCount === 1 means this is the first emit in the current start). // isError is true when the request failes and retries are exhausted. if (res.hasNextPage === false || (res.isError && emitCount > 1)) { this.stop(); @@ -81,15 +75,14 @@ export default class WorkflowHistoryFetcher { } }; - // only start emit (fetching next pages) after the initial fetch is complete - // first page is already fetched on the first subscription below + // Manual emit is needed to fetch the first next page after start is called. + // While this manual emit is not needed for on the first history page as enabling the query will fetch it automatically. if (fetchedFirstPage) { emit(currentState); } - if (this.unsubscribe) { - this.unsubscribe(); - } + // remove current listener (if exists) and add new one + this.unsubscribe?.(); this.unsubscribe = this.observer.subscribe((res) => emit(res)); } @@ -107,7 +100,8 @@ export default class WorkflowHistoryFetcher { fetchSingleNextPage(): void { const state = this.getCurrentState(); - + // If the query is still pending, enable it to fetch the first page. + // Otherwise, fetch the next page if it is not already fetching and there are more pages. if (state.status === 'pending') { this.observer.setOptions({ ...this.buildObserverOptions(this.params), @@ -117,21 +111,23 @@ export default class WorkflowHistoryFetcher { state.fetchNextPage(); } - getCurrentState(): WorkflowHistoryQueryResult { + getCurrentState() { return this.observer.getCurrentResult(); } - private buildObserverOptions(params: WorkflowHistoryQueryParams) { + private buildObserverOptions( + queryParams: WorkflowHistoryReactQueryParams + ): WorkflowHistoryInfiniteQueryOptions { return { - queryKey: ['workflow_history', params] satisfies WorkflowHistoryQueryKey, - queryFn: ({ queryKey: [_, qp], pageParam }: any) => + queryKey: ['workflow_history', queryParams], + queryFn: ({ queryKey: [_, params], pageParam }) => request( queryString.stringifyUrl({ - url: `/api/domains/${qp.domain}/${qp.cluster}/workflows/${qp.workflowId}/${qp.runId}/history`, + url: `/api/domains/${params.domain}/${params.cluster}/workflows/${params.workflowId}/${params.runId}/history`, query: { nextPage: pageParam, - pageSize: qp.pageSize, - waitForNewEvent: qp.waitForNewEvent ?? false, + pageSize: params.pageSize, + waitForNewEvent: params.waitForNewEvent ?? false, } satisfies WorkflowHistoryQueryParams, }) ).then((res) => res.json()), diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts index 457f8b37b..d622f8db1 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts @@ -1,20 +1,41 @@ import { + type InfiniteQueryObserver, type InfiniteData, - type InfiniteQueryObserverResult, + type UseInfiniteQueryOptions, } from '@tanstack/react-query'; import { type WorkflowHistoryQueryParams, type GetWorkflowHistoryResponse, + type RouteParams, } from '@/route-handlers/get-workflow-history/get-workflow-history.types'; import { type RequestError } from '@/utils/request/request-error'; -export type WorkflowHistoryQueryKey = [string, WorkflowHistoryQueryParams]; +export type WorkflowHistoryReactQueryParams = RouteParams & + WorkflowHistoryQueryParams; -export type WorkflowHistoryQueryResult = InfiniteQueryObserverResult< - InfiniteData, - RequestError +export type WorkflowHistoryInfiniteQueryObserver = InfiniteQueryObserver< + GetWorkflowHistoryResponse, + RequestError, + InfiniteData, + GetWorkflowHistoryResponse, + WorkflowHistoryQueryKey, + string | undefined >; +export type WorkflowHistoryQueryKey = [string, WorkflowHistoryReactQueryParams]; + +export type WorkflowHistoryInfiniteQueryOptions = UseInfiniteQueryOptions< + GetWorkflowHistoryResponse, + RequestError, + InfiniteData, + GetWorkflowHistoryResponse, + WorkflowHistoryQueryKey, + string | undefined +>; +export type WorkflowHistoryQueryResult = ReturnType< + WorkflowHistoryInfiniteQueryObserver['getCurrentResult'] +>; + export type QueryResultOnChangeCallback = ( state: WorkflowHistoryQueryResult ) => void; From 492e54d97d8c643c72c64934f34c0ba5754eb025 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 16:24:05 +0100 Subject: [PATCH 06/11] rename unmout to destroy Signed-off-by: Assem Hafez --- .../helpers/__tests__/workflow-history-fetcher.test.tsx | 2 +- src/views/workflow-history/helpers/workflow-history-fetcher.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx index aa1f59ddc..f2d59e315 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx @@ -30,7 +30,7 @@ describe(WorkflowHistoryFetcher.name, () => { afterEach(() => { queryClient.clear(); - hoistedFetcher?.unmount(); + hoistedFetcher?.destroy(); }); it('should return the current query state from getCurrentState', async () => { diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.ts index c03a3f95b..a43dfaa31 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.ts @@ -93,7 +93,7 @@ export default class WorkflowHistoryFetcher { this.unsubscribe = null; } } - unmount(): void { + destroy(): void { this.stop(); this.observer.destroy(); } From 83e390ddee4407860e8c0a7dbcae60a0691f378c Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 13:57:50 +0100 Subject: [PATCH 07/11] update fetcher based on feedback Signed-off-by: Assem Hafez --- .../workflow-history-fetcher.test.tsx | 41 +++++---------- .../helpers/workflow-history-fetcher.ts | 52 +++++++++---------- .../helpers/workflow-history-fetcher.types.ts | 31 +++++++++-- 3 files changed, 63 insertions(+), 61 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx index 200a9ebd3..aa1f59ddc 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx @@ -9,9 +9,13 @@ import mswMockEndpoints from '@/test-utils/msw-mock-handlers/helper/msw-mock-end import workflowHistoryMultiPageFixture from '../../__fixtures__/workflow-history-multi-page-fixture'; import WorkflowHistoryFetcher from '../workflow-history-fetcher'; -describe(WorkflowHistoryFetcher.name, () => { - let queryClient: QueryClient; +const RETRY_DELAY = 3000; +const RETRY_COUNT = 3; + +let queryClient: QueryClient; +let hoistedFetcher: WorkflowHistoryFetcher; +describe(WorkflowHistoryFetcher.name, () => { beforeEach(() => { queryClient = new QueryClient({ defaultOptions: { @@ -26,6 +30,7 @@ describe(WorkflowHistoryFetcher.name, () => { afterEach(() => { queryClient.clear(); + hoistedFetcher?.unmount(); }); it('should return the current query state from getCurrentState', async () => { @@ -34,8 +39,6 @@ describe(WorkflowHistoryFetcher.name, () => { const initialState = fetcher.getCurrentState(); expect(initialState.data).toBeUndefined(); expect(initialState.status).toBe('pending'); - - fetcher.unmount(); }); it('should call onChange callback on state changes', async () => { @@ -50,7 +53,6 @@ describe(WorkflowHistoryFetcher.name, () => { await waitFor(() => { expect(callback.mock.calls.length).toBeGreaterThan(initialCallCount); }); - fetcher.unmount(); }); it('should return unsubscribe function', async () => { @@ -61,6 +63,7 @@ describe(WorkflowHistoryFetcher.name, () => { const unsubscribe1 = fetcher.onChange(callback1); fetcher.onChange(callback2); + // Fetch the first page fetcher.start((state) => !state?.data?.pages?.length); await waitFor(() => { @@ -78,11 +81,9 @@ describe(WorkflowHistoryFetcher.name, () => { countBeforeUnsubscribe ); }); - - fetcher.unmount(); }); - it('should respect shouldContinue callback', async () => { + it('should not fetch any pages if shouldContinue callback returns false', async () => { const { fetcher } = setup(queryClient); const shouldContinue = jest.fn(() => false); @@ -95,8 +96,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages || []).toHaveLength(0); - - fetcher.unmount(); }); it('should stop after shouldContinue returns false', async () => { @@ -112,8 +111,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.isFetching).toBe(false); expect(state.data?.pages).toHaveLength(2); }); - - fetcher.unmount(); }); it('should load all pages and auto-stop when there are no more pages', async () => { @@ -126,8 +123,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.hasNextPage).toBe(false); expect(state.data?.pages).toHaveLength(3); }); - - fetcher.unmount(); }); it('should auto-stop on error after initial success', async () => { @@ -144,8 +139,8 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.data?.pages).toHaveLength(1); }); - // Fast-forward through retry delays (3 retries * 3000ms each) - await jest.advanceTimersByTimeAsync(3 * 3000); + // Fast-forward through retry delays + await jest.advanceTimersByTimeAsync(RETRY_COUNT * RETRY_DELAY); await waitFor(() => { const state = fetcher.getCurrentState(); @@ -153,8 +148,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.isError).toBe(true); expect(state.data?.pages).toHaveLength(1); }); - - fetcher.unmount(); } finally { jest.useRealTimers(); } @@ -178,8 +171,6 @@ describe(WorkflowHistoryFetcher.name, () => { expect(state.isFetching).toBe(false); expect(state.data?.pages).toHaveLength(1); }); - - fetcher.unmount(); }); it('should allow start again after stop', async () => { @@ -210,7 +201,6 @@ describe(WorkflowHistoryFetcher.name, () => { const finalState = fetcher.getCurrentState(); expect(finalState.data?.pages).toHaveLength(3); - fetcher.unmount(); }); it('should fetch next page when available', async () => { @@ -230,8 +220,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages).toHaveLength(2); }); - - fetcher.unmount(); }); it('should not fetch when already fetching', async () => { @@ -256,8 +244,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages).toHaveLength(2); - - fetcher.unmount(); }); it('should not fetch when no next page available', async () => { @@ -275,7 +261,6 @@ describe(WorkflowHistoryFetcher.name, () => { const state = fetcher.getCurrentState(); expect(state.data?.pages.length).toBe(pageCountBefore); - fetcher.unmount(); }); }); @@ -289,8 +274,8 @@ function setup(client: QueryClient, options: { failOnPages?: number[] } = {}) { }; mockHistoryEndpoint(workflowHistoryMultiPageFixture, options.failOnPages); - const fetcher = new WorkflowHistoryFetcher(client, params); + hoistedFetcher = fetcher; const waitForData = async () => { let unsubscribe: (() => void) | undefined; @@ -325,7 +310,7 @@ function mockHistoryEndpoint( // Determine current page number based on nextPage param let pageNumber = 1; - if (!nextPage || nextPage === 'null' || nextPage === 'undefined') { + if (!nextPage) { pageNumber = 1; } else if (nextPage === 'page2') { pageNumber = 2; diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.ts index 4b97cf89d..c03a3f95b 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.ts @@ -6,20 +6,18 @@ import { type GetWorkflowHistoryResponse, } from '@/route-handlers/get-workflow-history/get-workflow-history.types'; import request from '@/utils/request'; -import { type RequestError } from '@/utils/request/request-error'; import { type WorkflowHistoryQueryResult, type QueryResultOnChangeCallback, type ShouldContinueCallback, - type WorkflowHistoryQueryKey, + type WorkflowHistoryReactQueryParams, + type WorkflowHistoryInfiniteQueryOptions, + type WorkflowHistoryInfiniteQueryObserver, } from './workflow-history-fetcher.types'; export default class WorkflowHistoryFetcher { - private observer: InfiniteQueryObserver< - GetWorkflowHistoryResponse, - RequestError - >; + private observer: WorkflowHistoryInfiniteQueryObserver; private unsubscribe: (() => void) | null = null; private isStarted = false; @@ -27,12 +25,9 @@ export default class WorkflowHistoryFetcher { constructor( private readonly queryClient: QueryClient, - private readonly params: WorkflowHistoryQueryParams + private readonly params: WorkflowHistoryReactQueryParams ) { - this.observer = new InfiniteQueryObserver< - GetWorkflowHistoryResponse, - RequestError - >(this.queryClient, { + this.observer = new InfiniteQueryObserver(this.queryClient, { ...this.buildObserverOptions(this.params), }); } @@ -40,7 +35,7 @@ export default class WorkflowHistoryFetcher { onChange(callback: QueryResultOnChangeCallback): () => void { const current = this.getCurrentState(); if (current) callback(current); - return this.observer.subscribe((res: any) => { + return this.observer.subscribe((res) => { callback(res); }); } @@ -55,8 +50,7 @@ export default class WorkflowHistoryFetcher { let emitCount = 0; const currentState = this.observer.getCurrentResult(); const fetchedFirstPage = currentState.status !== 'pending'; - const shouldEnableQuery = - (!fetchedFirstPage && shouldContinue(currentState)) || fetchedFirstPage; + const shouldEnableQuery = !fetchedFirstPage && shouldContinue(currentState); if (shouldEnableQuery) { this.observer.setOptions({ @@ -68,7 +62,7 @@ export default class WorkflowHistoryFetcher { const emit = (res: WorkflowHistoryQueryResult) => { emitCount++; - // Auto stop when there are no more pages (end of history) or when there is a fresh error happens after the start. + // Auto stop when there are no more pages (end of history) or when there is an existing error from last start (emitCount === 1 means this is the first emit in the current start). // isError is true when the request failes and retries are exhausted. if (res.hasNextPage === false || (res.isError && emitCount > 1)) { this.stop(); @@ -81,15 +75,14 @@ export default class WorkflowHistoryFetcher { } }; - // only start emit (fetching next pages) after the initial fetch is complete - // first page is already fetched on the first subscription below + // Manual emit is needed to fetch the first next page after start is called. + // While this manual emit is not needed for on the first history page as enabling the query will fetch it automatically. if (fetchedFirstPage) { emit(currentState); } - if (this.unsubscribe) { - this.unsubscribe(); - } + // remove current listener (if exists) and add new one + this.unsubscribe?.(); this.unsubscribe = this.observer.subscribe((res) => emit(res)); } @@ -107,7 +100,8 @@ export default class WorkflowHistoryFetcher { fetchSingleNextPage(): void { const state = this.getCurrentState(); - + // If the query is still pending, enable it to fetch the first page. + // Otherwise, fetch the next page if it is not already fetching and there are more pages. if (state.status === 'pending') { this.observer.setOptions({ ...this.buildObserverOptions(this.params), @@ -117,21 +111,23 @@ export default class WorkflowHistoryFetcher { state.fetchNextPage(); } - getCurrentState(): WorkflowHistoryQueryResult { + getCurrentState() { return this.observer.getCurrentResult(); } - private buildObserverOptions(params: WorkflowHistoryQueryParams) { + private buildObserverOptions( + queryParams: WorkflowHistoryReactQueryParams + ): WorkflowHistoryInfiniteQueryOptions { return { - queryKey: ['workflow_history', params] satisfies WorkflowHistoryQueryKey, - queryFn: ({ queryKey: [_, qp], pageParam }: any) => + queryKey: ['workflow_history', queryParams], + queryFn: ({ queryKey: [_, params], pageParam }) => request( queryString.stringifyUrl({ - url: `/api/domains/${qp.domain}/${qp.cluster}/workflows/${qp.workflowId}/${qp.runId}/history`, + url: `/api/domains/${params.domain}/${params.cluster}/workflows/${params.workflowId}/${params.runId}/history`, query: { nextPage: pageParam, - pageSize: qp.pageSize, - waitForNewEvent: qp.waitForNewEvent ?? false, + pageSize: params.pageSize, + waitForNewEvent: params.waitForNewEvent ?? false, } satisfies WorkflowHistoryQueryParams, }) ).then((res) => res.json()), diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts index 457f8b37b..d622f8db1 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.types.ts @@ -1,20 +1,41 @@ import { + type InfiniteQueryObserver, type InfiniteData, - type InfiniteQueryObserverResult, + type UseInfiniteQueryOptions, } from '@tanstack/react-query'; import { type WorkflowHistoryQueryParams, type GetWorkflowHistoryResponse, + type RouteParams, } from '@/route-handlers/get-workflow-history/get-workflow-history.types'; import { type RequestError } from '@/utils/request/request-error'; -export type WorkflowHistoryQueryKey = [string, WorkflowHistoryQueryParams]; +export type WorkflowHistoryReactQueryParams = RouteParams & + WorkflowHistoryQueryParams; -export type WorkflowHistoryQueryResult = InfiniteQueryObserverResult< - InfiniteData, - RequestError +export type WorkflowHistoryInfiniteQueryObserver = InfiniteQueryObserver< + GetWorkflowHistoryResponse, + RequestError, + InfiniteData, + GetWorkflowHistoryResponse, + WorkflowHistoryQueryKey, + string | undefined >; +export type WorkflowHistoryQueryKey = [string, WorkflowHistoryReactQueryParams]; + +export type WorkflowHistoryInfiniteQueryOptions = UseInfiniteQueryOptions< + GetWorkflowHistoryResponse, + RequestError, + InfiniteData, + GetWorkflowHistoryResponse, + WorkflowHistoryQueryKey, + string | undefined +>; +export type WorkflowHistoryQueryResult = ReturnType< + WorkflowHistoryInfiniteQueryObserver['getCurrentResult'] +>; + export type QueryResultOnChangeCallback = ( state: WorkflowHistoryQueryResult ) => void; From 6d37203fb97ccb4ecaac6ccd505d5d71f8d1e097 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 16:24:05 +0100 Subject: [PATCH 08/11] rename unmout to destroy Signed-off-by: Assem Hafez --- .../helpers/__tests__/workflow-history-fetcher.test.tsx | 2 +- src/views/workflow-history/helpers/workflow-history-fetcher.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx index aa1f59ddc..f2d59e315 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-fetcher.test.tsx @@ -30,7 +30,7 @@ describe(WorkflowHistoryFetcher.name, () => { afterEach(() => { queryClient.clear(); - hoistedFetcher?.unmount(); + hoistedFetcher?.destroy(); }); it('should return the current query state from getCurrentState', async () => { diff --git a/src/views/workflow-history/helpers/workflow-history-fetcher.ts b/src/views/workflow-history/helpers/workflow-history-fetcher.ts index c03a3f95b..a43dfaa31 100644 --- a/src/views/workflow-history/helpers/workflow-history-fetcher.ts +++ b/src/views/workflow-history/helpers/workflow-history-fetcher.ts @@ -93,7 +93,7 @@ export default class WorkflowHistoryFetcher { this.unsubscribe = null; } } - unmount(): void { + destroy(): void { this.stop(); this.observer.destroy(); } From a08af6dc2db5bbb03411c13bee00e50a50c9e372 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Mon, 3 Nov 2025 11:41:28 +0100 Subject: [PATCH 09/11] Create hook for fetching history Signed-off-by: Assem Hafez --- .../hooks/__tests__/use-workflow-history-fetcher.test.tsx | 2 +- .../workflow-history/hooks/use-workflow-history-fetcher.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx b/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx index 9c5a83976..6ab4ab21d 100644 --- a/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx +++ b/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx @@ -179,7 +179,7 @@ describe(useWorkflowHistoryFetcher.name, () => { unmount(); - expect(mockFetcherInstance.unmount).toHaveBeenCalledTimes(1); + expect(mockFetcherInstance.destroy).toHaveBeenCalledTimes(1); }); it('should return all expected methods and state', () => { diff --git a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts index cfb375f13..d14a1dad1 100644 --- a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts +++ b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts @@ -57,7 +57,7 @@ export default function useWorkflowHistoryFetcher( useEffect(() => { return () => { - fetcherRef.current?.unmount(); + fetcherRef.current?.destroy(); }; }, []); From 6e551d0f2b8bbb7aff71b21a7fef63306d3ed2b4 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 17:34:10 +0100 Subject: [PATCH 10/11] move condition into executeImmediately Signed-off-by: Assem Hafez --- .../workflow-history/hooks/use-workflow-history-fetcher.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts index d14a1dad1..c287edfd3 100644 --- a/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts +++ b/src/views/workflow-history/hooks/use-workflow-history-fetcher.ts @@ -44,7 +44,8 @@ export default function useWorkflowHistoryFetcher( const unsubscribe = fetcherRef.current.onChange((state) => { const pagesCount = state.data?.pages?.length || 0; // immediately set if there is the first page without throttling other wise throttle - setHistoryQuery(() => state, pagesCount <= 1); + const executeImmediately = pagesCount <= 1; + setHistoryQuery(() => state, executeImmediately); }); // Fetch first page From 7901d5ab49e55458194f0cc20211b1f8efce2002 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 17:49:27 +0100 Subject: [PATCH 11/11] update destroy in method Signed-off-by: Assem Hafez --- .../hooks/__tests__/use-workflow-history-fetcher.test.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx b/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx index 6ab4ab21d..8f67dcbb7 100644 --- a/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx +++ b/src/views/workflow-history/hooks/__tests__/use-workflow-history-fetcher.test.tsx @@ -39,7 +39,7 @@ describe(useWorkflowHistoryFetcher.name, () => { mockFetcherInstance = { start: jest.fn(), stop: jest.fn(), - unmount: jest.fn(), + destroy: jest.fn(), fetchSingleNextPage: jest.fn(), onChange: jest.fn((callback) => { mockOnChangeCallback.mockImplementation(callback);