From 42f371a37056223983f7798d9e0c7d8d1f3ffff0 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 20 Oct 2025 15:38:28 -0600 Subject: [PATCH 01/24] feat: add useSerializedMutations hook with timing strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a new hook for managing optimistic mutations with pluggable timing strategies (debounce, queue, throttle) using TanStack Pacer. Key features: - Auto-merge mutations and serialize persistence according to strategy - Track and rollback superseded pending transactions to prevent memory leaks - Proper cleanup of pending/executing transactions on unmount - Queue strategy uses AsyncQueuer for true sequential processing Breaking changes from initial design: - Renamed from useSerializedTransaction to useSerializedMutations (more accurate name) - Each mutate() call creates mutations that are auto-merged, not separate transactions Addresses feedback: - HIGH: Rollback superseded transactions to prevent orphaned isPersisted promises - HIGH: cleanup() now properly rolls back all pending/executing transactions - HIGH: Queue strategy properly serializes commits using AsyncQueuer with concurrency: 1 Example usage: ```tsx const mutate = useSerializedMutations({ mutationFn: async ({ transaction }) => { await api.save(transaction.mutations) }, strategy: debounceStrategy({ wait: 500 }) }) ``` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- SERIALIZED_TRANSACTION_PLAN.md | 351 ++++++++++++++++++ .../react/todo/src/components/TodoApp.tsx | 66 +++- examples/react/todo/src/routes/electric.tsx | 37 ++ examples/react/todo/src/routes/query.tsx | 32 ++ examples/react/todo/src/routes/trailbase.tsx | 7 + packages/db/package.json | 3 +- packages/db/src/index.ts | 2 + packages/db/src/serialized-mutations.ts | 163 ++++++++ .../db/src/strategies/debounceStrategy.ts | 45 +++ packages/db/src/strategies/index.ts | 17 + packages/db/src/strategies/queueStrategy.ts | 74 ++++ .../db/src/strategies/throttleStrategy.ts | 62 ++++ packages/db/src/strategies/types.ts | 130 +++++++ packages/react-db/src/index.ts | 1 + .../react-db/src/useSerializedMutations.ts | 116 ++++++ pnpm-lock.yaml | 9 + 16 files changed, 1099 insertions(+), 16 deletions(-) create mode 100644 SERIALIZED_TRANSACTION_PLAN.md create mode 100644 packages/db/src/serialized-mutations.ts create mode 100644 packages/db/src/strategies/debounceStrategy.ts create mode 100644 packages/db/src/strategies/index.ts create mode 100644 packages/db/src/strategies/queueStrategy.ts create mode 100644 packages/db/src/strategies/throttleStrategy.ts create mode 100644 packages/db/src/strategies/types.ts create mode 100644 packages/react-db/src/useSerializedMutations.ts diff --git a/SERIALIZED_TRANSACTION_PLAN.md b/SERIALIZED_TRANSACTION_PLAN.md new file mode 100644 index 000000000..daa34c28b --- /dev/null +++ b/SERIALIZED_TRANSACTION_PLAN.md @@ -0,0 +1,351 @@ +# Implementation Plan for `useSerializedTransaction` with TanStack Pacer + +Based on [GitHub issue #35](https://github.com/TanStack/db/issues/35), using @tanstack/pacer for strategy implementation across all 5 framework integrations. + +## Overview + +Create a framework-agnostic core in `@tanstack/db` that manages optimistic transactions with pluggable queuing strategies powered by TanStack Pacer. Each framework package wraps the core with framework-specific reactive primitives. + +## Architecture Pattern + +The core transaction logic stays in one place (`@tanstack/db`) while each framework provides its own wrapper using framework-specific reactive primitives. + +```typescript +// Core in @tanstack/db (framework-agnostic) +createSerializedTransaction(config) // Returns { mutate, cleanup } + +// React wrapper +useSerializedTransaction(config) // Uses React hooks, returns mutate function + +// Solid wrapper +useSerializedTransaction(config) // Uses Solid signals, matches useLiveQuery pattern + +// Svelte/Vue wrappers +useSerializedTransaction(config) // Framework-specific implementations + +// Angular wrapper +injectSerializedTransaction(config) // Uses Angular DI, follows injectLiveQuery pattern +``` + +## Available Strategies (Based on Pacer Utilities) + +### 1. **debounceStrategy({ wait, leading?, trailing? })** +- Uses Pacer's `Debouncer` class +- Waits for pause in activity before committing +- **Best for:** Search inputs, auto-save fields + +### 2. **queueStrategy({ wait?, maxSize?, addItemsTo?, getItemsFrom? })** +- Uses Pacer's `Queuer` class +- Processes all transactions in order (FIFO/LIFO) +- FIFO: `{ addItemsTo: 'back', getItemsFrom: 'front' }` +- LIFO: `{ addItemsTo: 'back', getItemsFrom: 'back' }` +- **Best for:** Sequential operations that must all complete + +### 3. **throttleStrategy({ wait, leading?, trailing? })** +- Uses Pacer's `Throttler` class +- Evenly spaces transaction executions over time +- **Best for:** Sliders, scroll handlers, progress bars + +### 4. **batchStrategy({ maxSize?, wait?, getShouldExecute? })** +- Uses Pacer's `Batcher` class +- Groups multiple mutations into batches +- Triggers on size or time threshold +- **Best for:** Bulk operations, reducing network calls + +## File Structure + +``` +packages/db/src/ + ├── serialized-transaction.ts # Core framework-agnostic logic + └── strategies/ + ├── index.ts # Export all strategies + ├── debounceStrategy.ts # Wraps Pacer Debouncer + ├── queueStrategy.ts # Wraps Pacer Queuer + ├── throttleStrategy.ts # Wraps Pacer Throttler + ├── batchStrategy.ts # Wraps Pacer Batcher + └── types.ts # Strategy type definitions + +packages/db/package.json # Add @tanstack/pacer dependency + +packages/react-db/src/ + └── useSerializedTransaction.ts # React hook wrapper + +packages/solid-db/src/ + └── useSerializedTransaction.ts # Solid wrapper (matches useLiveQuery pattern) + +packages/svelte-db/src/ + └── useSerializedTransaction.svelte.ts # Svelte wrapper + +packages/vue-db/src/ + └── useSerializedTransaction.ts # Vue wrapper + +packages/angular-db/src/ + └── injectSerializedTransaction.ts # Angular wrapper (DI pattern) + +packages/*/tests/ + └── serialized-transaction.test.ts # Tests per package +``` + +## Core API Design + +```typescript +// Framework-agnostic core (packages/db) +import { debounceStrategy } from '@tanstack/db' + +const { mutate, cleanup } = createSerializedTransaction({ + mutationFn: async ({ transaction }) => { + await api.save(transaction.mutations) + }, + strategy: debounceStrategy({ wait: 500 }), + metadata?: Record, +}) + +// mutate() executes mutations according to strategy and returns Transaction +const transaction = mutate(() => { + collection.update(id, draft => { draft.value = newValue }) +}) + +// Await persistence and handle errors +try { + await transaction.isPersisted.promise + console.log('Transaction committed successfully') +} catch (error) { + console.error('Transaction failed:', error) +} + +// cleanup() when done (frameworks handle this automatically) +cleanup() +``` + +## React Hook Wrapper + +```typescript +// packages/react-db +import { debounceStrategy } from '@tanstack/react-db' + +const mutate = useSerializedTransaction({ + mutationFn: async ({ transaction }) => { + await api.save(transaction.mutations) + }, + strategy: debounceStrategy({ wait: 1000 }), +}) + +// Usage in component +const handleChange = async (value) => { + const tx = mutate(() => { + collection.update(id, draft => { draft.value = value }) + }) + + // Optional: await persistence or handle errors + try { + await tx.isPersisted.promise + } catch (error) { + console.error('Update failed:', error) + } +} +``` + +## Example: Slider with Different Strategies + +```typescript +// Debounce - wait for user to stop moving slider +const mutate = useSerializedTransaction({ + mutationFn: async ({ transaction }) => { + await api.updateVolume(transaction.mutations) + }, + strategy: debounceStrategy({ wait: 500 }), +}) + +// Throttle - update every 200ms while sliding +const mutate = useSerializedTransaction({ + mutationFn: async ({ transaction }) => { + await api.updateVolume(transaction.mutations) + }, + strategy: throttleStrategy({ wait: 200 }), +}) + +// Debounce with leading/trailing - save first + final value only +const mutate = useSerializedTransaction({ + mutationFn: async ({ transaction }) => { + await api.updateVolume(transaction.mutations) + }, + strategy: debounceStrategy({ wait: 0, leading: true, trailing: true }), +}) + +// Queue - save every change in order (FIFO) +const mutate = useSerializedTransaction({ + mutationFn: async ({ transaction }) => { + await api.updateVolume(transaction.mutations) + }, + strategy: queueStrategy({ + wait: 200, + addItemsTo: 'back', + getItemsFrom: 'front' + }), +}) +``` + +## Implementation Steps + +### Phase 1: Core Package (@tanstack/db) +1. Add `@tanstack/pacer` dependency to packages/db/package.json +2. Create strategy type definitions in strategies/types.ts +3. Implement strategy factories: + - `debounceStrategy.ts` - wraps Pacer Debouncer + - `queueStrategy.ts` - wraps Pacer Queuer + - `throttleStrategy.ts` - wraps Pacer Throttler + - `batchStrategy.ts` - wraps Pacer Batcher +4. Create core `createSerializedTransaction()` function +5. Export strategies + core function from packages/db/src/index.ts + +### Phase 2: Framework Wrappers +6. **React** - Create `useSerializedTransaction` using useRef/useEffect/useCallback +7. **Solid** - Create `useSerializedTransaction` using createSignal/onCleanup (matches `useLiveQuery` pattern) +8. **Svelte** - Create `useSerializedTransaction` using Svelte stores +9. **Vue** - Create `useSerializedTransaction` using ref/onUnmounted +10. **Angular** - Create `injectSerializedTransaction` using inject/DestroyRef (matches `injectLiveQuery` pattern) + +### Phase 3: Testing & Documentation +11. Write tests for core logic in packages/db +12. Write tests for each framework wrapper +13. Update README with examples +14. Add TypeScript examples to docs + +## Strategy Type System + +```typescript +export type Strategy = + | DebounceStrategy + | QueueStrategy + | ThrottleStrategy + | BatchStrategy + +interface BaseStrategy { + _type: TName // Discriminator for type narrowing + execute: (fn: () => void) => void | Promise + cleanup: () => void +} + +export function debounceStrategy(opts: { + wait: number + leading?: boolean + trailing?: boolean +}): DebounceStrategy + +export function queueStrategy(opts?: { + wait?: number + maxSize?: number + addItemsTo?: 'front' | 'back' + getItemsFrom?: 'front' | 'back' +}): QueueStrategy + +export function throttleStrategy(opts: { + wait: number + leading?: boolean + trailing?: boolean +}): ThrottleStrategy + +export function batchStrategy(opts?: { + maxSize?: number + wait?: number + getShouldExecute?: (items: any[]) => boolean +}): BatchStrategy +``` + +## Technical Implementation Details + +### Core createSerializedTransaction + +The core function will: +1. Accept a strategy and mutationFn +2. Create a wrapper around `createTransaction` from existing code +3. Use the strategy's `execute()` method to control when transactions are committed +4. Return `{ mutate, cleanup }` where: + - `mutate(callback): Transaction` - executes mutations according to strategy and returns the Transaction object + - `cleanup()` - cleans up strategy resources + +**Important:** The `mutate()` function returns a `Transaction` object so callers can: +- Await `transaction.isPersisted.promise` to know when persistence completes +- Handle errors via try/catch or `.catch()` +- Access transaction state and metadata + +### Strategy Factories + +Each strategy factory returns an object with: +- `execute(fn)` - wraps the function with Pacer's utility +- `cleanup()` - cleans up the Pacer instance + +Example for debounceStrategy: +```typescript +// NOTE: Import path needs validation - Pacer may export from main entry point +// Likely: import { Debouncer } from '@tanstack/pacer' or similar +import { Debouncer } from '@tanstack/pacer' // TODO: Validate actual export path + +export function debounceStrategy(opts: { + wait: number + leading?: boolean + trailing?: boolean +}) { + const debouncer = new Debouncer(opts) + + return { + _type: 'debounce' as const, + execute: (fn: () => void) => { + debouncer.execute(fn) + }, + cleanup: () => { + debouncer.cancel() + } + } +} +``` + +### React Hook Implementation + +```typescript +export function useSerializedTransaction(config) { + // Include strategy in dependencies to handle strategy changes + const { mutate, cleanup } = useMemo(() => { + return createSerializedTransaction(config) + }, [config.mutationFn, config.metadata, config.strategy]) + + // Cleanup on unmount or when dependencies change + useEffect(() => { + return () => cleanup() + }, [cleanup]) + + // Use useCallback to provide stable reference + const stableMutate = useCallback(mutate, [mutate]) + + return stableMutate +} +``` + +**Key fixes:** +- Include `config.strategy` in `useMemo` dependencies to handle strategy changes +- Properly cleanup when strategy changes (via useEffect cleanup) +- Return stable callback reference via `useCallback` + +## Benefits + +- ✅ Leverages battle-tested TanStack Pacer utilities +- ✅ Reduces backend write contention +- ✅ Framework-agnostic core promotes consistency +- ✅ Type-safe, composable API +- ✅ Aligns with TanStack ecosystem patterns +- ✅ Supports all 5 framework integrations +- ✅ Simple, declarative API for users +- ✅ Easy to add custom strategies + +## Open Questions + +1. Should we support custom strategies? (i.e., users passing their own strategy objects) +2. Do we need lifecycle callbacks like `onSuccess`, `onError` for each mutate call? +3. Should batching strategy automatically merge mutations or keep them separate? +4. Rate limiting strategy - useful or skip for now? + +## Notes + +- ❌ Dropped merge strategy for now (more complex to design, less clear use case) +- The pattern follows existing TanStack patterns where core is framework-agnostic +- Similar to how `useLiveQuery` wraps core query logic per framework diff --git a/examples/react/todo/src/components/TodoApp.tsx b/examples/react/todo/src/components/TodoApp.tsx index 1a1e152d8..356b1a9a5 100644 --- a/examples/react/todo/src/components/TodoApp.tsx +++ b/examples/react/todo/src/components/TodoApp.tsx @@ -1,7 +1,8 @@ import React, { useState } from "react" import { Link } from "@tanstack/react-router" +import { debounceStrategy, useSerializedMutations } from "@tanstack/react-db" import type { FormEvent } from "react" -import type { Collection } from "@tanstack/react-db" +import type { Collection, Transaction } from "@tanstack/react-db" import type { SelectConfig, SelectTodo } from "@/db/validation" import { getComplementaryColor } from "@/lib/color" @@ -12,6 +13,7 @@ interface TodoAppProps { todoCollection: Collection configCollection: Collection title: string + configMutationFn?: (params: { transaction: Transaction }) => Promise } export function TodoApp({ @@ -20,9 +22,19 @@ export function TodoApp({ todoCollection, configCollection, title, + configMutationFn, }: TodoAppProps) { const [newTodo, setNewTodo] = useState(``) + // Use serialized mutations with debounce strategy for color picker if mutationFn provided + // Waits for 2500ms of inactivity before persisting - only the final value is saved + const mutateConfig = configMutationFn + ? useSerializedMutations({ + mutationFn: configMutationFn, + strategy: debounceStrategy({ wait: 2500 }), + }) + : undefined + // Define a type-safe helper function to get config values const getConfigValue = (key: string): string | undefined => { for (const config of configData) { @@ -35,23 +47,47 @@ export function TodoApp({ // Define a helper function to update config values const setConfigValue = (key: string, value: string): void => { - for (const config of configData) { - if (config.key === key) { - configCollection.update(config.id, (draft) => { - draft.value = value + if (mutateConfig) { + // Use serialized mutations for updates (optimistic + batched persistence) + mutateConfig(() => { + for (const config of configData) { + if (config.key === key) { + configCollection.update(config.id, (draft) => { + draft.value = value + }) + return + } + } + + // If the config doesn't exist yet, create it + configCollection.insert({ + id: Math.round(Math.random() * 1000000), + key, + value, + created_at: new Date(), + updated_at: new Date(), }) - return + }) + } else { + // Use naked collection calls (collection handlers will be invoked) + for (const config of configData) { + if (config.key === key) { + configCollection.update(config.id, (draft) => { + draft.value = value + }) + return + } } - } - // If the config doesn't exist yet, create it - configCollection.insert({ - id: Math.round(Math.random() * 1000000), - key, - value, - created_at: new Date(), - updated_at: new Date(), - }) + // If the config doesn't exist yet, create it + configCollection.insert({ + id: Math.round(Math.random() * 1000000), + key, + value, + created_at: new Date(), + updated_at: new Date(), + }) + } } const backgroundColor = getConfigValue(`backgroundColor`) diff --git a/examples/react/todo/src/routes/electric.tsx b/examples/react/todo/src/routes/electric.tsx index 8d4f068f6..c361d1686 100644 --- a/examples/react/todo/src/routes/electric.tsx +++ b/examples/react/todo/src/routes/electric.tsx @@ -5,6 +5,8 @@ import { electricTodoCollection, } from "../lib/collections" import { TodoApp } from "../components/TodoApp" +import { api } from "../lib/api" +import type { Transaction } from "@tanstack/react-db" export const Route = createFileRoute(`/electric`)({ component: ElectricPage, @@ -31,6 +33,40 @@ function ElectricPage() { q.from({ config: electricConfigCollection }) ) + // Electric collections use txid to track sync + const configMutationFn = async ({ + transaction, + }: { + transaction: Transaction + }) => { + const txids: Array = [] + + // Handle inserts + const inserts = transaction.mutations.filter((m) => m.type === `insert`) + for (const mutation of inserts) { + const response = await api.config.create(mutation.modified) + txids.push(response.txid) + } + + // Handle updates + const updates = transaction.mutations.filter((m) => m.type === `update`) + for (const mutation of updates) { + if (!(`id` in mutation.original)) { + throw new Error(`Original config not found for update`) + } + const response = await api.config.update( + mutation.original.id, + mutation.changes + ) + txids.push(response.txid) + } + + // Wait for all txids to sync back to the collection + await Promise.all( + txids.map((txid) => electricConfigCollection.utils.awaitTxid(txid)) + ) + } + return ( ) } diff --git a/examples/react/todo/src/routes/query.tsx b/examples/react/todo/src/routes/query.tsx index 129ab0cd4..99aa80197 100644 --- a/examples/react/todo/src/routes/query.tsx +++ b/examples/react/todo/src/routes/query.tsx @@ -2,6 +2,8 @@ import { createFileRoute } from "@tanstack/react-router" import { useLiveQuery } from "@tanstack/react-db" import { queryConfigCollection, queryTodoCollection } from "../lib/collections" import { TodoApp } from "../components/TodoApp" +import { api } from "../lib/api" +import type { Transaction } from "@tanstack/react-db" export const Route = createFileRoute(`/query`)({ component: QueryPage, @@ -28,6 +30,35 @@ function QueryPage() { q.from({ config: queryConfigCollection }) ) + // Query collections automatically refetch after handler completes + const configMutationFn = async ({ + transaction, + }: { + transaction: Transaction + }) => { + // Handle inserts + const inserts = transaction.mutations.filter((m) => m.type === `insert`) + await Promise.all( + inserts.map(async (mutation) => { + await api.config.create(mutation.modified) + }) + ) + + // Handle updates + const updates = transaction.mutations.filter((m) => m.type === `update`) + await Promise.all( + updates.map(async (mutation) => { + if (!(`id` in mutation.original)) { + throw new Error(`Original config not found for update`) + } + await api.config.update(mutation.original.id, mutation.changes) + }) + ) + + // Trigger refetch to get confirmed server state + await queryConfigCollection.utils.refetch() + } + return ( ) } diff --git a/examples/react/todo/src/routes/trailbase.tsx b/examples/react/todo/src/routes/trailbase.tsx index 4ed7458f7..4ef397e66 100644 --- a/examples/react/todo/src/routes/trailbase.tsx +++ b/examples/react/todo/src/routes/trailbase.tsx @@ -31,6 +31,13 @@ function TrailBasePage() { q.from({ config: trailBaseConfigCollection }) ) + // Note: TrailBase collections use recordApi internally, which is not exposed + // as a collection utility. For this example, we're not using serialized + // transactions with TrailBase - the color picker will use naked collection + // calls which invoke the collection's built-in handlers automatically. + // In a real app, you could expose the recordApi via utils to enable + // serialized transactions with TrailBase collections. + return ( , +> extends Omit, `autoCommit`> { + /** + * Strategy for controlling mutation execution timing + * Examples: debounceStrategy, queueStrategy, throttleStrategy, batchStrategy + */ + strategy: Strategy +} + +/** + * Creates a serialized mutations manager with pluggable timing strategies. + * + * This function provides a way to control when and how optimistic mutations + * are persisted to the backend, using strategies like debouncing, queuing, + * or throttling. Each call to `mutate` creates mutations that are auto-merged + * and persisted according to the strategy. + * + * The returned `mutate` function returns a Transaction object that can be + * awaited to know when persistence completes or to handle errors. + * + * @param config - Configuration including mutationFn and strategy + * @returns Object with mutate function and cleanup + * + * @example + * ```ts + * // Debounced mutations for auto-save + * const { mutate, cleanup } = createSerializedMutations({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: debounceStrategy({ wait: 500 }) + * }) + * + * // Each mutate call returns a transaction + * const tx = mutate(() => { + * collection.update(id, draft => { draft.value = newValue }) + * }) + * + * // Await persistence or handle errors + * await tx.isPersisted.promise + * + * // Cleanup when done + * cleanup() + * ``` + * + * @example + * ```ts + * // Queue strategy for sequential processing + * const { mutate } = createSerializedMutations({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: queueStrategy({ + * wait: 200, + * addItemsTo: 'back', + * getItemsFrom: 'front' + * }) + * }) + * ``` + */ +export function createSerializedMutations< + T extends object = Record, +>( + config: SerializedMutationsConfig +): { + mutate: (callback: () => void) => Transaction + cleanup: () => void +} { + const { strategy, ...transactionConfig } = config + + // Track pending transactions that haven't been committed yet + const pendingTransactions = new Set>() + // Track the currently executing transaction (being committed) + let executingTransaction: Transaction | null = null + + /** + * Executes a mutation callback and returns the transaction. + * The strategy controls when the transaction is actually committed. + */ + function mutate(callback: () => void): Transaction { + // Create transaction with autoCommit disabled + // The strategy will control when commit() is called + const transaction = createTransaction({ + ...transactionConfig, + autoCommit: false, + }) + + // Execute the mutation callback to populate the transaction + transaction.mutate(callback) + + // Add to pending set + pendingTransactions.add(transaction) + + // Use the strategy to control when to commit + strategy.execute(() => { + // Rollback all other pending transactions that were superseded + for (const pendingTx of pendingTransactions) { + if (pendingTx !== transaction) { + pendingTx.rollback() + pendingTransactions.delete(pendingTx) + } + } + + // Remove from pending and mark as executing + pendingTransactions.delete(transaction) + executingTransaction = transaction + + // Commit the transaction according to the strategy's timing + transaction + .commit() + .then(() => { + if (executingTransaction === transaction) { + executingTransaction = null + } + }) + .catch(() => { + // Errors are handled via transaction.isPersisted.promise + // This catch prevents unhandled promise rejections + if (executingTransaction === transaction) { + executingTransaction = null + } + }) + + return transaction + }) + + return transaction + } + + /** + * Cleanup strategy resources and rollback any pending transactions + * Should be called when the serialized mutations manager is no longer needed + */ + function cleanup() { + // Cancel the strategy timer/queue + strategy.cleanup() + + // Rollback all pending transactions + for (const tx of pendingTransactions) { + tx.rollback() + } + pendingTransactions.clear() + + // Rollback executing transaction if any + if (executingTransaction) { + executingTransaction.rollback() + executingTransaction = null + } + } + + return { + mutate, + cleanup, + } +} diff --git a/packages/db/src/strategies/debounceStrategy.ts b/packages/db/src/strategies/debounceStrategy.ts new file mode 100644 index 000000000..57edc323c --- /dev/null +++ b/packages/db/src/strategies/debounceStrategy.ts @@ -0,0 +1,45 @@ +import { Debouncer } from "@tanstack/pacer/debouncer" +import type { DebounceStrategy, DebounceStrategyOptions } from "./types" +import type { Transaction } from "../transactions" + +/** + * Creates a debounce strategy that delays transaction execution until after + * a period of inactivity. + * + * Ideal for scenarios like search inputs or auto-save fields where you want + * to wait for the user to stop typing before persisting changes. + * + * @param options - Configuration for the debounce behavior + * @returns A debounce strategy instance + * + * @example + * ```ts + * const mutate = useSerializedTransaction({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: debounceStrategy({ wait: 500 }) + * }) + * ``` + */ +export function debounceStrategy( + options: DebounceStrategyOptions +): DebounceStrategy { + const debouncer = new Debouncer( + (callback: () => Transaction) => callback(), + options + ) + + return { + _type: `debounce`, + options, + execute: >( + fn: () => Transaction + ) => { + debouncer.maybeExecute(fn as () => Transaction) + }, + cleanup: () => { + debouncer.cancel() + }, + } +} diff --git a/packages/db/src/strategies/index.ts b/packages/db/src/strategies/index.ts new file mode 100644 index 000000000..734842617 --- /dev/null +++ b/packages/db/src/strategies/index.ts @@ -0,0 +1,17 @@ +// Export all strategy factories +export { debounceStrategy } from "./debounceStrategy" +export { queueStrategy } from "./queueStrategy" +export { throttleStrategy } from "./throttleStrategy" + +// Export strategy types +export type { + Strategy, + BaseStrategy, + DebounceStrategy, + DebounceStrategyOptions, + QueueStrategy, + QueueStrategyOptions, + ThrottleStrategy, + ThrottleStrategyOptions, + StrategyOptions, +} from "./types" diff --git a/packages/db/src/strategies/queueStrategy.ts b/packages/db/src/strategies/queueStrategy.ts new file mode 100644 index 000000000..9695df9b2 --- /dev/null +++ b/packages/db/src/strategies/queueStrategy.ts @@ -0,0 +1,74 @@ +import { AsyncQueuer } from "@tanstack/pacer/async-queuer" +import type { QueueStrategy, QueueStrategyOptions } from "./types" +import type { Transaction } from "../transactions" + +/** + * Creates a queue strategy that processes all mutations in order with proper serialization. + * + * Unlike other strategies that may drop executions, queue ensures every + * mutation is processed sequentially. Each transaction commit completes before + * the next one starts. Useful when data consistency is critical and + * every operation must complete in order. + * + * @param options - Configuration for queue behavior (FIFO/LIFO, timing, size limits) + * @returns A queue strategy instance + * + * @example + * ```ts + * // FIFO queue - process in order received + * const mutate = useSerializedMutations({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: queueStrategy({ + * wait: 200, + * addItemsTo: 'back', + * getItemsFrom: 'front' + * }) + * }) + * ``` + * + * @example + * ```ts + * // LIFO queue - process most recent first + * const mutate = useSerializedMutations({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: queueStrategy({ + * wait: 200, + * addItemsTo: 'back', + * getItemsFrom: 'back' + * }) + * }) + * ``` + */ +export function queueStrategy(options?: QueueStrategyOptions): QueueStrategy { + const queuer = new AsyncQueuer({ + concurrency: 1, // Process one at a time to ensure serialization + wait: options?.wait, + maxSize: options?.maxSize, + addItemsTo: options?.addItemsTo, + getItemsFrom: options?.getItemsFrom, + started: true, // Start processing immediately + }) + + return { + _type: `queue`, + options, + execute: >( + fn: () => Transaction + ) => { + // Wrap the callback in an async function that waits for commit + queuer.addItem(async () => { + const transaction = fn() + // Wait for the transaction to be persisted before processing next item + await transaction.commit() + }) + }, + cleanup: () => { + queuer.stop() + queuer.clear() + }, + } +} diff --git a/packages/db/src/strategies/throttleStrategy.ts b/packages/db/src/strategies/throttleStrategy.ts new file mode 100644 index 000000000..83cdc7f7a --- /dev/null +++ b/packages/db/src/strategies/throttleStrategy.ts @@ -0,0 +1,62 @@ +import { Throttler } from "@tanstack/pacer/throttler" +import type { ThrottleStrategy, ThrottleStrategyOptions } from "./types" +import type { Transaction } from "../transactions" + +/** + * Creates a throttle strategy that ensures transactions are evenly spaced + * over time. + * + * Provides smooth, controlled execution patterns ideal for UI updates like + * sliders, progress bars, or scroll handlers where you want consistent + * execution timing. + * + * @param options - Configuration for throttle behavior + * @returns A throttle strategy instance + * + * @example + * ```ts + * // Throttle slider updates to every 200ms + * const mutate = useSerializedTransaction({ + * mutationFn: async ({ transaction }) => { + * await api.updateVolume(transaction.mutations) + * }, + * strategy: throttleStrategy({ wait: 200 }) + * }) + * ``` + * + * @example + * ```ts + * // Throttle with leading and trailing execution + * const mutate = useSerializedTransaction({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: throttleStrategy({ + * wait: 500, + * leading: true, + * trailing: true + * }) + * }) + * ``` + */ +export function throttleStrategy( + options: ThrottleStrategyOptions +): ThrottleStrategy { + const throttler = new Throttler( + (callback: () => Transaction) => callback(), + options + ) + + return { + _type: `throttle`, + options, + execute: >( + fn: () => Transaction + ) => { + throttler.maybeExecute(fn as () => Transaction) + }, + cleanup: () => { + throttler.cancel() + }, + } +} diff --git a/packages/db/src/strategies/types.ts b/packages/db/src/strategies/types.ts new file mode 100644 index 000000000..3514fea61 --- /dev/null +++ b/packages/db/src/strategies/types.ts @@ -0,0 +1,130 @@ +import type { Transaction } from "../transactions" + +/** + * Base strategy interface that all strategy implementations must conform to + */ +export interface BaseStrategy { + /** Type discriminator for strategy identification */ + _type: TName + + /** + * Execute a function according to the strategy's timing rules + * @param fn - The function to execute + * @returns The result of the function execution (if applicable) + */ + execute: >( + fn: () => Transaction + ) => void | Promise + + /** + * Clean up any resources held by the strategy + * Should be called when the strategy is no longer needed + */ + cleanup: () => void +} + +/** + * Options for debounce strategy + * Delays execution until after a period of inactivity + */ +export interface DebounceStrategyOptions { + /** Wait time in milliseconds before execution */ + wait: number + /** Execute immediately on the first call */ + leading?: boolean + /** Execute after the wait period on the last call */ + trailing?: boolean +} + +/** + * Debounce strategy that delays execution until activity stops + */ +export interface DebounceStrategy extends BaseStrategy<`debounce`> { + options: DebounceStrategyOptions +} + +/** + * Options for queue strategy + * Processes all executions in order (FIFO/LIFO) + */ +export interface QueueStrategyOptions { + /** Wait time between processing queue items (milliseconds) */ + wait?: number + /** Maximum queue size (items are dropped if exceeded) */ + maxSize?: number + /** Where to add new items in the queue */ + addItemsTo?: `front` | `back` + /** Where to get items from when processing */ + getItemsFrom?: `front` | `back` +} + +/** + * Queue strategy that processes all executions in order + * FIFO: { addItemsTo: 'back', getItemsFrom: 'front' } + * LIFO: { addItemsTo: 'back', getItemsFrom: 'back' } + */ +export interface QueueStrategy extends BaseStrategy<`queue`> { + options?: QueueStrategyOptions +} + +/** + * Options for throttle strategy + * Ensures executions are evenly spaced over time + */ +export interface ThrottleStrategyOptions { + /** Minimum wait time between executions (milliseconds) */ + wait: number + /** Execute immediately on the first call */ + leading?: boolean + /** Execute on the last call after wait period */ + trailing?: boolean +} + +/** + * Throttle strategy that spaces executions evenly over time + */ +export interface ThrottleStrategy extends BaseStrategy<`throttle`> { + options: ThrottleStrategyOptions +} + +/** + * Options for batch strategy + * Groups multiple executions together + */ +export interface BatchStrategyOptions { + /** Maximum items per batch */ + maxSize?: number + /** Maximum wait time before processing batch (milliseconds) */ + wait?: number + /** Custom logic to determine when to execute batch */ + getShouldExecute?: (items: Array) => boolean +} + +/** + * Batch strategy that groups multiple executions together + */ +export interface BatchStrategy extends BaseStrategy<`batch`> { + options?: BatchStrategyOptions +} + +/** + * Union type of all available strategies + */ +export type Strategy = + | DebounceStrategy + | QueueStrategy + | ThrottleStrategy + | BatchStrategy + +/** + * Extract the options type from a strategy + */ +export type StrategyOptions = T extends DebounceStrategy + ? DebounceStrategyOptions + : T extends QueueStrategy + ? QueueStrategyOptions + : T extends ThrottleStrategy + ? ThrottleStrategyOptions + : T extends BatchStrategy + ? BatchStrategyOptions + : never diff --git a/packages/react-db/src/index.ts b/packages/react-db/src/index.ts index bd98349f0..c63e4dfa5 100644 --- a/packages/react-db/src/index.ts +++ b/packages/react-db/src/index.ts @@ -1,5 +1,6 @@ // Re-export all public APIs export * from "./useLiveQuery" +export * from "./useSerializedMutations" // Re-export everything from @tanstack/db export * from "@tanstack/db" diff --git a/packages/react-db/src/useSerializedMutations.ts b/packages/react-db/src/useSerializedMutations.ts new file mode 100644 index 000000000..8df20976d --- /dev/null +++ b/packages/react-db/src/useSerializedMutations.ts @@ -0,0 +1,116 @@ +import { useCallback, useEffect, useMemo } from "react" +import { createSerializedMutations } from "@tanstack/db" +import type { SerializedMutationsConfig, Transaction } from "@tanstack/db" + +/** + * React hook for managing serialized mutations with timing strategies. + * + * Provides optimistic mutations with pluggable strategies like debouncing, + * queuing, or throttling. Each call to `mutate` creates mutations that are + * auto-merged and persisted according to the strategy. + * + * @param config - Configuration including mutationFn and strategy + * @returns A mutate function that executes mutations and returns Transaction objects + * + * @example + * ```tsx + * // Debounced auto-save + * function AutoSaveForm() { + * const mutate = useSerializedMutations({ + * mutationFn: async ({ transaction }) => { + * await api.save(transaction.mutations) + * }, + * strategy: debounceStrategy({ wait: 500 }) + * }) + * + * const handleChange = async (value: string) => { + * const tx = mutate(() => { + * formCollection.update(formId, draft => { + * draft.content = value + * }) + * }) + * + * // Optional: await persistence or handle errors + * try { + * await tx.isPersisted.promise + * console.log('Saved!') + * } catch (error) { + * console.error('Save failed:', error) + * } + * } + * + * return