diff --git a/.changeset/legal-cooks-sink.md b/.changeset/legal-cooks-sink.md new file mode 100644 index 000000000..ddddba41d --- /dev/null +++ b/.changeset/legal-cooks-sink.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Fix bug with setWindow on ordered queries that have no limit. diff --git a/.changeset/open-cups-lose.md b/.changeset/open-cups-lose.md new file mode 100644 index 000000000..44eee1a35 --- /dev/null +++ b/.changeset/open-cups-lose.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Add support for orderBy and limit in currentStateAsChanges function diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 3b75521e3..858503f6f 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -87,12 +87,25 @@ class TopKArray implements TopK { }): TopKMoveChanges { const oldOffset = this.#topKStart const oldLimit = this.#topKEnd - this.#topKStart - const oldRange: HRange = [this.#topKStart, this.#topKEnd] - this.#topKStart = offset ?? oldOffset - this.#topKEnd = this.#topKStart + (limit ?? oldLimit) + // `this.#topKEnd` can be `Infinity` if it has no limit + // but `diffHalfOpen` expects a finite range + // so we restrict it to the size of the topK if topKEnd is infinite + const oldRange: HRange = [ + this.#topKStart, + this.#topKEnd === Infinity ? this.#topKStart + this.size : this.#topKEnd, + ] - const newRange: HRange = [this.#topKStart, this.#topKEnd] + this.#topKStart = offset ?? oldOffset + this.#topKEnd = this.#topKStart + (limit ?? oldLimit) // can be `Infinity` if limit is `Infinity` + + // Also handle `Infinity` in the newRange + const newRange: HRange = [ + this.#topKStart, + this.#topKEnd === Infinity + ? Math.max(this.#topKStart + this.size, oldRange[1]) // since the new limit is Infinity we need to take everything (so we need to take the biggest (finite) topKEnd) + : this.#topKEnd, + ] const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange) const moveIns: Array> = [] diff --git a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts index 5739cfb36..149fffb93 100644 --- a/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/db-ivm/tests/operators/topKWithFractionalIndex.test.ts @@ -28,8 +28,8 @@ function checkLexicographicOrder(results: Array) { // Check that indices are in the same order as the sorted values for (let i = 0; i < sortedByValue.length - 1; i++) { - const currentIndex = sortedByValue[i].index - const nextIndex = sortedByValue[i + 1].index + const currentIndex = sortedByValue[i]!.index + const nextIndex = sortedByValue[i + 1]!.index // Indices should be in lexicographic order if (!(currentIndex < nextIndex)) { @@ -1120,5 +1120,581 @@ describe(`Operators`, () => { ) expect(moveSortedValues4).toEqual([`a`, `b`]) }) + + it(`should handle moving window from infinite limit to finite limit with same offset`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with no limit (infinite limit) + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have all 6 elements (no limit) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(6) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`, `c`, `d`, `e`, `f`]) + + // Verify windowFn was set + expect(windowFn).toBeDefined() + + // Move to finite limit of 3 (should show a, b, c) + windowFn!({ offset: 0, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show only first 3 elements + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`a`, `b`, `c`]) + + // Check that we have changes (elements d, e, f should be removed) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from infinite limit to finite limit while moving offset forward`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with no limit (infinite limit) + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have all 6 elements (no limit) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(6) + + // Move to offset 2, limit 3 (should show c, d, e) + windowFn!({ offset: 2, limit: 3 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements c, d, e + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`, `d`, `e`]) + + // Check that we have changes + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from infinite limit to finite limit while moving offset backward`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with no limit (infinite limit) and offset 3 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + offset: 3, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have elements d, e, f (no limit, offset 3) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`, `f`]) + + // Move to finite limit of 2, moving offset backward to 1 (should show b, c) + windowFn!({ offset: 1, limit: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements b, c (offset 1, limit 2) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`]) + + // Check that we have changes (elements d, e, f should be removed, b, c should be added) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from infinite limit to infinite limit with same offset (no-op)`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with no limit (infinite limit) and offset 2 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + offset: 2, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have elements c, d, e, f (no limit, offset 2) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(4) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`c`, `d`, `e`, `f`]) + + // Move to same offset, still no limit (should show same elements c, d, e, f) + windowFn!({ offset: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should show same elements c, d, e, f (offset 2, no limit) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`, `d`, `e`, `f`]) + + // Check that we have no more changes (this should be a no-op) + expect(moveResult.messageCount).toBe(initialResult.messageCount) + }) + + it(`should handle moving window from infinite limit to infinite limit while moving offset forward`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with no limit (infinite limit) and offset 0 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + offset: 0, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have all 6 elements (no limit, offset 0) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(6) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`a`, `b`, `c`, `d`, `e`, `f`]) + + // Move to offset 2, still no limit (should show c, d, e, f) + windowFn!({ offset: 2 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements c, d, e, f (offset 2, no limit) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`, `d`, `e`, `f`]) + + // Check that we have changes (elements a, b should be removed) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from infinite limit to infinite limit while moving offset backward`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with no limit (infinite limit) and offset 3 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + offset: 3, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have elements d, e, f (no limit, offset 3) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(3) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`, `f`]) + + // Move to offset 1, still no limit (should show b, c, d, e, f) + windowFn!({ offset: 1 }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements b, c, d, e, f (offset 1, no limit) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`, `d`, `e`, `f`]) + + // Check that we have changes (elements b, c should be added) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from finite limit to infinite limit with same offset`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with finite limit of 2 and offset 2 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 2, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 2 (c, d) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`c`, `d`]) + + // Move to infinite limit, keeping offset 2 (should show c, d, e, f) + windowFn!({ offset: 2, limit: Infinity }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements c, d, e, f (offset 2, no limit) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`c`, `d`, `e`, `f`]) + + // Check that we have changes (elements e, f should be added) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from finite limit to infinite limit while moving offset forward`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with finite limit of 2 and offset 1 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 1, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 1 (b, c) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`b`, `c`]) + + // Move to infinite limit, moving offset forward to 3 (should show d, e, f) + windowFn!({ offset: 3, limit: Infinity }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements d, e, f (offset 3, no limit) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`d`, `e`, `f`]) + + // Check that we have changes (elements b, c should be removed, d, e, f should be added) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) + + it(`should handle moving window from finite limit to infinite limit while moving offset backward`, () => { + const graph = new D2() + const input = graph.newInput<[number, { id: number; value: string }]>() + const tracker = new MessageTracker< + [number, [{ id: number; value: string }, string]] + >() + + let windowFn: + | ((options: { offset?: number; limit?: number }) => void) + | null = null + + // Start with finite limit of 2 and offset 3 + input.pipe( + topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + limit: 2, + offset: 3, + setWindowFn: (fn) => { + windowFn = fn + }, + }), + output((message) => { + tracker.addMessage(message) + }) + ) + + graph.finalize() + + // Initial data - a, b, c, d, e, f + input.sendData( + new MultiSet([ + [[1, { id: 1, value: `a` }], 1], + [[2, { id: 2, value: `b` }], 1], + [[3, { id: 3, value: `c` }], 1], + [[4, { id: 4, value: `d` }], 1], + [[5, { id: 5, value: `e` }], 1], + [[6, { id: 6, value: `f` }], 1], + ]) + ) + graph.run() + + // Initial result should have 2 elements starting from offset 3 (d, e) + const initialResult = tracker.getResult(compareFractionalIndex) + expect(initialResult.sortedResults.length).toBe(2) + + const initialSortedValues = initialResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(initialSortedValues).toEqual([`d`, `e`]) + + // Move to infinite limit, moving offset backward to 1 (should show b, c, d, e, f) + windowFn!({ offset: 1, limit: Infinity }) + graph.run() + + const moveResult = tracker.getResult(compareFractionalIndex) + + // Should now show elements b, c, d, e, f (offset 1, no limit) + const moveSortedValues = moveResult.sortedResults.map( + ([_key, [value, _index]]) => value.value + ) + expect(moveSortedValues).toEqual([`b`, `c`, `d`, `e`, `f`]) + + // Check that we have changes (elements b, c, f should be added, d, e should remain) + expect(moveResult.messageCount).toBeGreaterThan(0) + }) }) }) diff --git a/packages/db/src/collection/change-events.ts b/packages/db/src/collection/change-events.ts index 7a0e41acf..10c05ae71 100644 --- a/packages/db/src/collection/change-events.ts +++ b/packages/db/src/collection/change-events.ts @@ -3,15 +3,20 @@ import { toExpression, } from "../query/builder/ref-proxy" import { compileSingleRowExpression } from "../query/compiler/evaluators.js" -import { optimizeExpressionWithIndexes } from "../utils/index-optimization.js" +import { + findIndexForField, + optimizeExpressionWithIndexes, +} from "../utils/index-optimization.js" +import { ensureIndexForField } from "../indexes/auto-index.js" +import { makeComparator } from "../utils/comparison.js" import type { ChangeMessage, CurrentStateAsChangesOptions, SubscribeChangesOptions, } from "../types" -import type { Collection } from "./index.js" +import type { Collection, CollectionImpl } from "./index.js" import type { SingleRowRefProxy } from "../query/builder/ref-proxy" -import type { BasicExpression } from "../query/ir.js" +import type { BasicExpression, OrderBy } from "../query/ir.js" /** * Interface for a collection-like object that provides the necessary methods @@ -28,7 +33,7 @@ export interface CollectionLike< /** * Returns the current state of the collection as an array of changes * @param collection - The collection to get changes from - * @param options - Options including optional where filter + * @param options - Options including optional where filter, orderBy, and limit * @returns An array of changes * @example * // Get all items as changes @@ -41,7 +46,19 @@ export interface CollectionLike< * * // Get only items using a pre-compiled expression * const activeChanges = currentStateAsChanges(collection, { - * whereExpression: eq(row.status, 'active') + * where: eq(row.status, 'active') + * }) + * + * // Get items ordered by name with limit + * const topUsers = currentStateAsChanges(collection, { + * orderBy: [{ expression: row.name, compareOptions: { direction: 'asc' } }], + * limit: 10 + * }) + * + * // Get active users ordered by score (highest score first) + * const topActiveUsers = currentStateAsChanges(collection, { + * where: eq(row.status, 'active'), + * orderBy: [{ expression: row.score, compareOptions: { direction: 'desc' } }], * }) */ export function currentStateAsChanges< @@ -69,9 +86,48 @@ export function currentStateAsChanges< return result } - // TODO: handle orderBy and limit options - // by calling optimizeOrderedLimit + // Validate that limit without orderBy doesn't happen + if (options.limit !== undefined && !options.orderBy) { + throw new Error(`limit cannot be used without orderBy`) + } + + // First check if orderBy is present (optionally with limit) + if (options.orderBy) { + // Create where filter function if present + const whereFilter = options.where + ? createFilterFunctionFromExpression(options.where) + : undefined + + // Get ordered keys using index optimization when possible + const orderedKeys = getOrderedKeys( + collection, + options.orderBy, + options.limit, + whereFilter, + options.optimizedOnly + ) + + if (orderedKeys === undefined) { + // `getOrderedKeys` returned undefined because we asked for `optimizedOnly` and there was no index to use + return + } + + // Convert keys to change messages + const result: Array> = [] + for (const key of orderedKeys) { + const value = collection.get(key) + if (value !== undefined) { + result.push({ + type: `insert`, + key, + value, + }) + } + } + return result + } + // If no orderBy OR orderBy optimization failed, use where clause optimization if (!options.where) { // No filtering, return all items return collectFilteredResults() @@ -246,3 +302,121 @@ export function createFilteredCallback( } } } + +/** + * Gets ordered keys from a collection using index optimization when possible + * @param collection - The collection to get keys from + * @param orderBy - The order by clause + * @param limit - Optional limit on number of keys to return + * @param whereFilter - Optional filter function to apply while traversing + * @returns Array of keys in sorted order + */ +function getOrderedKeys( + collection: CollectionLike, + orderBy: OrderBy, + limit?: number, + whereFilter?: (item: T) => boolean, + optimizedOnly?: boolean +): Array | undefined { + // For single-column orderBy on a ref expression, try index optimization + if (orderBy.length === 1) { + const clause = orderBy[0]! + const orderByExpression = clause.expression + + if (orderByExpression.type === `ref`) { + const propRef = orderByExpression + const fieldPath = propRef.path + + // Ensure index exists for this field + ensureIndexForField( + fieldPath[0]!, + fieldPath, + collection as CollectionImpl, + clause.compareOptions + ) + + // Find the index + const index = findIndexForField( + collection.indexes, + fieldPath, + clause.compareOptions + ) + + if (index && index.supports(`gt`)) { + // Use index optimization + const filterFn = (key: TKey): boolean => { + const value = collection.get(key) + if (value === undefined) { + return false + } + return whereFilter?.(value) ?? true + } + + // Take the keys that match the filter and limit + // if no limit is provided `index.keyCount` is used, + // i.e. we will take all keys that match the filter + return index.take(limit ?? index.keyCount, undefined, filterFn) + } + } + } + + if (optimizedOnly) { + return + } + + // Fallback: collect all items and sort in memory + const allItems: Array<{ key: TKey; value: T }> = [] + for (const [key, value] of collection.entries()) { + if (whereFilter?.(value) ?? true) { + allItems.push({ key, value }) + } + } + + // Sort using makeComparator + const compare = (a: { key: TKey; value: T }, b: { key: TKey; value: T }) => { + for (const clause of orderBy) { + const compareFn = makeComparator(clause.compareOptions) + + // Extract values for comparison + const aValue = extractValueFromItem(a.value, clause.expression) + const bValue = extractValueFromItem(b.value, clause.expression) + + const result = compareFn(aValue, bValue) + if (result !== 0) { + return result + } + } + return 0 + } + + allItems.sort(compare) + const sortedKeys = allItems.map((item) => item.key) + + // Apply limit if provided + if (limit !== undefined) { + return sortedKeys.slice(0, limit) + } + + // if no limit is provided, we will return all keys + return sortedKeys +} + +/** + * Helper function to extract a value from an item based on an expression + */ +function extractValueFromItem(item: any, expression: BasicExpression): any { + if (expression.type === `ref`) { + const propRef = expression + let value = item + for (const pathPart of propRef.path) { + value = value?.[pathPart] + } + return value + } else if (expression.type === `val`) { + return expression.value + } else { + // It must be a function + const evaluator = compileSingleRowExpression(expression) + return evaluator(item as Record) + } +} diff --git a/packages/db/tests/collection-change-events.test.ts b/packages/db/tests/collection-change-events.test.ts new file mode 100644 index 000000000..25b7dbb1f --- /dev/null +++ b/packages/db/tests/collection-change-events.test.ts @@ -0,0 +1,439 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" +import { createCollection } from "../src/collection/index.js" +import { currentStateAsChanges } from "../src/collection/change-events.js" +import { Func, PropRef, Value } from "../src/query/ir.js" +import { DEFAULT_COMPARE_OPTIONS } from "../src/utils.js" + +interface TestUser { + id: string + name: string + age: number + score: number + status: `active` | `inactive` +} + +describe(`currentStateAsChanges`, () => { + let mockSync: ReturnType + + beforeEach(() => { + mockSync = vi.fn() + }) + + afterEach(() => { + vi.clearAllMocks() + }) + + const users: Array = [ + { id: `1`, name: `Alice`, age: 25, score: 100, status: `active` }, + { id: `2`, name: `Bob`, age: 30, score: 80, status: `inactive` }, + { id: `3`, name: `Charlie`, age: 35, score: 90, status: `active` }, + { id: `4`, name: `David`, age: 20, score: 70, status: `active` }, + { id: `5`, name: `Eve`, age: 28, score: 95, status: `inactive` }, + ] + + // Helper function to create and populate collection with test data + async function createAndPopulateCollection( + autoIndex: `eager` | `off` = `eager` + ) { + const collection = createCollection({ + id: `test-collection-${autoIndex}`, + getKey: (user) => user.id, + autoIndex, + sync: { + sync: mockSync, + }, + }) + + // Insert users via sync + mockSync.mockImplementation(({ begin, write, commit }) => { + begin() + users.forEach((user) => { + write({ + type: `insert`, + value: user, + }) + }) + commit() + }) + + collection.startSyncImmediate() + await collection.stateWhenReady() + + return collection + } + + describe.each([ + [`with auto-indexing`, `eager`], + [`without auto-indexing`, `off`], + ])(`%s`, (testName, autoIndex) => { + describe(`where clause without orderBy or limit`, () => { + it(`should return all items when no where clause is provided`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection) + + expect(result).toHaveLength(5) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, + `Bob`, + `Charlie`, + `David`, + `Eve`, + ]) + }) + + it(`should filter items based on where clause`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + where: new Func(`eq`, [new PropRef([`status`]), new Value(`active`)]), + }) + + expect(result).toHaveLength(3) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, + `Charlie`, + `David`, + ]) + }) + + it(`should filter items based on numeric where clause`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + where: new Func(`gt`, [new PropRef([`age`]), new Value(25)]), + }) + + expect(result).toHaveLength(3) + expect(result?.map((change) => change.value.name)).toEqual([ + `Bob`, + `Charlie`, + `Eve`, + ]) + }) + }) + + describe(`orderBy without limit and no where clause`, () => { + it(`should return all items ordered by name ascending`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`name`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `asc` }, + }, + ], + }) + + expect(result).toHaveLength(5) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, + `Bob`, + `Charlie`, + `David`, + `Eve`, + ]) + }) + + it(`should return all items ordered by score descending`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`score`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `desc` }, + }, + ], + }) + + expect(result).toHaveLength(5) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, // score: 100 + `Eve`, // score: 95 + `Charlie`, // score: 90 + `Bob`, // score: 80 + `David`, // score: 70 + ]) + }) + + it(`should return all items ordered by age ascending`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`age`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `asc` }, + }, + ], + }) + + expect(result).toHaveLength(5) + expect(result?.map((change) => change.value.name)).toEqual([ + `David`, // age: 20 + `Alice`, // age: 25 + `Eve`, // age: 28 + `Bob`, // age: 30 + `Charlie`, // age: 35 + ]) + }) + }) + + describe(`orderBy with limit and no where clause`, () => { + it(`should return top 3 items ordered by score descending`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`score`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `desc` }, + }, + ], + limit: 3, + }) + + expect(result).toHaveLength(3) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, // score: 100 + `Eve`, // score: 95 + `Charlie`, // score: 90 + ]) + }) + }) + + describe(`orderBy with limit and where clause`, () => { + it(`should return top active users ordered by score descending`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + where: new Func(`eq`, [new PropRef([`status`]), new Value(`active`)]), + orderBy: [ + { + expression: new PropRef([`score`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `desc` }, + }, + ], + limit: 2, + }) + + expect(result).toHaveLength(2) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, // score: 100, status: active + `Charlie`, // score: 90, status: active + ]) + }) + + it(`should return top users over 25 ordered by age ascending`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + where: new Func(`gt`, [new PropRef([`age`]), new Value(25)]), + orderBy: [ + { + expression: new PropRef([`age`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `asc` }, + }, + ], + limit: 2, + }) + + expect(result).toHaveLength(2) + expect(result?.map((change) => change.value.name)).toEqual([ + `Eve`, // age: 28 + `Bob`, // age: 30 + ]) + }) + + it(`should handle multi-column orderBy with where clause`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + where: new Func(`eq`, [new PropRef([`status`]), new Value(`active`)]), + orderBy: [ + { + expression: new PropRef([`score`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `desc` }, + }, + { + expression: new PropRef([`age`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `asc` }, + }, + ], + limit: 2, + }) + + expect(result).toHaveLength(2) + // Should be ordered by score desc, then age asc for ties + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, // score: 100, age: 25 + `Charlie`, // score: 90, age: 35 + ]) + }) + }) + + describe(`error cases`, () => { + it(`should throw error when limit is provided without orderBy`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + expect(() => { + currentStateAsChanges(collection, { + limit: 5, + }) + }).toThrow(`limit cannot be used without orderBy`) + }) + + it(`should throw error when limit is provided without orderBy even with where clause`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + expect(() => { + currentStateAsChanges(collection, { + where: new Func(`eq`, [ + new PropRef([`status`]), + new Value(`active`), + ]), + limit: 3, + }) + }).toThrow(`limit cannot be used without orderBy`) + }) + }) + + describe(`optimizedOnly option`, () => { + it(`should return undefined when optimizedOnly is true and no index is available`, async () => { + // Only test this with auto-indexing disabled + if (autoIndex === `off`) { + const collection = await createAndPopulateCollection(`off`) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`score`]), + compareOptions: { + ...DEFAULT_COMPARE_OPTIONS, + direction: `desc`, + }, + }, + ], + limit: 1, + optimizedOnly: true, + }) + + expect(result).toBeUndefined() + } + }) + + it(`should return results when optimizedOnly is true and index is available`, async () => { + // Only test this with auto-indexing enabled + if (autoIndex === `eager`) { + const collection = await createAndPopulateCollection(`eager`) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`score`]), + compareOptions: { + ...DEFAULT_COMPARE_OPTIONS, + direction: `desc`, + }, + }, + ], + limit: 1, + optimizedOnly: true, + }) + + expect(result).toHaveLength(1) + expect(result?.[0]?.value.name).toBe(`Alice`) + } + }) + }) + + describe(`edge cases`, () => { + it(`should handle empty collection`, () => { + const collection = createCollection({ + id: `test-collection-empty-${autoIndex}`, + getKey: (user) => user.id, + autoIndex: autoIndex as `eager` | `off`, + sync: { + sync: mockSync, + }, + }) + + // Don't populate the collection + collection.startSyncImmediate() + + const result = currentStateAsChanges(collection) + + expect(result).toHaveLength(0) + }) + + it(`should handle limit larger than collection size`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`name`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `asc` }, + }, + ], + limit: 10, // More than the 5 items in collection + }) + + expect(result).toHaveLength(5) + expect(result?.map((change) => change.value.name)).toEqual([ + `Alice`, + `Bob`, + `Charlie`, + `David`, + `Eve`, + ]) + }) + + it(`should handle limit of 0`, async () => { + const collection = await createAndPopulateCollection( + autoIndex as `eager` | `off` + ) + + const result = currentStateAsChanges(collection, { + orderBy: [ + { + expression: new PropRef([`name`]), + compareOptions: { ...DEFAULT_COMPARE_OPTIONS, direction: `asc` }, + }, + ], + limit: 0, + }) + + expect(result).toHaveLength(0) + }) + }) + }) +})