diff --git a/bun.lock b/bun.lock index 8d5c3a2..8b5d489 100644 --- a/bun.lock +++ b/bun.lock @@ -7,8 +7,12 @@ "@types/benchmark": "^2.1.5", "@vitest/coverage-v8": "^3.2.4", "benchmark": "^2.1.4", + "typescript": "^5", "vitest": "^3.2.4", }, + "peerDependencies": { + "typescript": "^5", + }, }, }, "packages": { @@ -310,6 +314,8 @@ "tinyspy": ["tinyspy@4.0.4", "", {}, "sha512-azl+t0z7pw/z958Gy9svOTuzqIk6xq+NSheJzn5MMWtWTFywIacg2wUlzKFGtt3cthx0r2SxMK0yzJOR0IES7Q=="], + "typescript": ["typescript@5.9.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw=="], + "vite": ["vite@7.1.9", "", { "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", "picomatch": "^4.0.3", "postcss": "^8.5.6", "rollup": "^4.43.0", "tinyglobby": "^0.2.15" }, "optionalDependencies": { "fsevents": "~2.3.3" }, "peerDependencies": { "@types/node": "^20.19.0 || >=22.12.0", "jiti": ">=1.21.0", "less": "^4.0.0", "lightningcss": "^1.21.0", "sass": "^1.70.0", "sass-embedded": "^1.70.0", "stylus": ">=0.54.8", "sugarss": "^5.0.0", "terser": "^5.16.0", "tsx": "^4.8.1", "yaml": "^2.4.2" }, "optionalPeers": ["@types/node", "jiti", "less", "lightningcss", "sass", "sass-embedded", "stylus", "sugarss", "terser", "tsx", "yaml"], "bin": { "vite": "bin/vite.js" } }, "sha512-4nVGliEpxmhCL8DslSAUdxlB6+SMrhB0a1v5ijlh1xB1nEPuy1mxaHxysVucLHuWryAxLWg6a5ei+U4TLn/rFg=="], "vite-node": ["vite-node@3.2.4", "", { "dependencies": { "cac": "^6.7.14", "debug": "^4.4.1", "es-module-lexer": "^1.7.0", "pathe": "^2.0.3", "vite": "^5.0.0 || ^6.0.0 || ^7.0.0-0" }, "bin": { "vite-node": "vite-node.mjs" } }, "sha512-EbKSKh+bh1E1IFxeO0pg1n4dvoOTt0UDiXMd/qn++r98+jPO1xtJilvXldeuQ8giIB5IkpjCgMleHMNEsGH6pg=="], diff --git a/index.d.ts b/index.d.ts index 8193086..e221d5a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -19,11 +19,24 @@ declare module 'fluent-iter' { select(map: (item: TValue) => TOutput): FluentIterable; /** - * Flat Iterable of collections + * Flat Iterable of collections up to N levels, default is 1 level. + * @param depth + */ + flat(depth?: number): FlatFluentIterable; + + /** + * Flat Iterable of collections, alias of flat * @param innerSelector Function which returns an inner collection */ selectMany(innerSelector: (item: TValue) => TInner[]): FluentIterable; + /** + * Flat iterable of collection one level and maps inner elements. + * @param innerSelector Function which returns an inner collection + * @param mapper Function thish converts inner element to result. + */ + flatMap(mapper: (value: TValue) => TResult | ReadonlyArray): FluentIterable; + /** * Flat iterable of collection * @param innerSelector Function which returns an inner collection @@ -447,13 +460,92 @@ declare module 'fluent-iter' { */ take(count: number): FluentIterableAsync; + /** + * Return items while condition return true + * @param condition + */ + takeWhile(condition: (item: TValue, index: number) => boolean): FluentIterableAsync; + + /** + * Skip first N items from iterable + * @param count + */ + skip(count: number): FluentIterableAsync; + + /** + * Skip items while condition return true, get the rest + * @param condition + */ + skipWhile(condition: (item: TValue, index: number) => boolean): FluentIterableAsync; + + /** + * Return distinct items. Can specify optional item comparer + * @param keySelector function to get key for comparison. + */ + distinct(keySelector?: (item: TValue) => TKey): FluentIterableAsync; + + /** + * Group items + * @param keySelector group key selector + */ + groupBy(keySelector: (item: TValue, index: number) => TKey): + [TKey, TValue] extends ['fulfilled' | 'rejected', PromiseResult] + ? FluentIterableAsync< IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>> + : FluentIterableAsync>; + groupBy(keySelector: (item: TValue, index: number) => TKey, elementSelector: (item: TValue, index: number) => TElement): FluentIterableAsync>; + groupBy(keySelector: (item: TValue, index: number) => TKey, elementSelector: (item: TValue, index: number) => TElement, resultCreator: (key: TKey, items: FluentIterable) => TResult): FluentIterableAsync; + /** * Return a promise to an array. */ toArray(): Promise; toArray(map: (item: TValue) => TResult): Promise; + + /** + * Create a map object from sequence + * @param keySelector - key selector - keys should be unique, otherwise last keys will override first. + */ + toMap(keySelector: (item: TValue) => TKey): + [TKey, TValue] extends ['fulfilled' | 'rejected', IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>] + ? Promise> + : Promise>; + + /** + * Create a map object from sequence + * @param keySelector key selector - keys should be unique, otherwise last keys will override first. + * @param elementSelector element selector + */ + toMap(keySelector: (item: TValue) => TKey, elementSelector: (item: TValue) => TElement): Promise>; } + export interface FluentIterableAsyncPromise extends FluentIterableAsync> { + groupByStatus(): FluentIterableAsync< IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>>; + toStatusMap(): Promise>; + } + + type PromiseResult = FulfilledPromiseResult | RejectedPromiseResult; + + interface FulfilledPromiseResult { + index: number; + status: 'fulfilled'; + value: T; + } + + interface RejectedPromiseResult { + index: number; + status: 'rejected'; + reason: any; + } + + interface PromiseMap extends Map<'fulfilled'|'rejected', FluentIterable>> { + get(key: 'fulfilled'): FluentIterable> | undefined; + get(key: 'rejected'): FluentIterable | undefined; + } + + type FlatFluentIterable = Value extends ReadonlyArray + ? FlatFluentIterable + : FluentIterable; + export function from(iterable: Iterable | ArrayLike): FluentIterable; export function from(value: TValue): FluentIterable<{ key: string, value: TValue[TKey] }>; @@ -468,4 +560,7 @@ declare module 'fluent-iter' { export function fromEvent(target: TTarget, event: TEvent): FluentIterableAsync; export function fromTimer(interval: number, delay?: number): FluentIterableAsync; + export function fromPromises(...promises: Promise[]): FluentIterableAsyncPromise; + export function isFulfilled(result: PromiseResult): result is FulfilledPromiseResult; + export function isRejected(result: PromiseResult): result is RejectedPromiseResult; } \ No newline at end of file diff --git a/index.ts b/index.ts index be40e62..5cef85e 100644 --- a/index.ts +++ b/index.ts @@ -9,4 +9,7 @@ export { // async fromEvent, fromTimer, + fromPromises, + isFulfilled, + isRejected, } from "./src/index.ts"; diff --git a/package.json b/package.json index eb1a8a8..719cb79 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "type": "module", "types": "index.d.ts", "scripts": { - "build": "vite build", + "check": "tsc --noEmit", "test": "vitest --run", "test:watch": "vitest", "test:coverage": "vitest run --coverage", @@ -45,7 +45,8 @@ "@types/benchmark": "^2.1.5", "@vitest/coverage-v8": "^3.2.4", "benchmark": "^2.1.4", - "vitest": "^3.2.4" + "vitest": "^3.2.4", + "typescript": "^5" }, "peerDependencies": { "typescript": "^5" diff --git a/src/creation-async.ts b/src/creation-async.ts index 09dbe85..0bfa366 100644 --- a/src/creation-async.ts +++ b/src/creation-async.ts @@ -1,7 +1,8 @@ -import type { FluentIterableAsync } from 'fluent-iter'; -import FluentAsync from "./fluent-async.js"; +import type { FluentIterableAsync, FluentIterableAsyncPromise } from 'fluent-iter'; +import FluentAsync, {FluentAsyncPromise} from "./fluent-async.js"; import fromEventAsync from "./generators/from-event.ts"; import fromTimerAsync from "./generators/from-timer.js"; +import {fromPromisesIterable} from "./generators/promises.ts"; export function fromEvent(target: TTarget, event: TEvent): FluentIterableAsync { return new FluentAsync(fromEventAsync(target, event)); @@ -10,3 +11,9 @@ export function fromEvent { return new FluentAsync(fromTimerAsync(interval, delay)); } + +export function fromPromises( + ...promises: Promise[] +): FluentIterableAsyncPromise { + return new FluentAsyncPromise(fromPromisesIterable(...promises)); +} diff --git a/src/creation.ts b/src/creation.ts index f5c3c77..141aae8 100644 --- a/src/creation.ts +++ b/src/creation.ts @@ -1,14 +1,19 @@ -import type { FluentIterable } from 'fluent-iter'; +import type { FluentIterable, FluentIterableAsync } from 'fluent-iter'; import Fluent from "./fluent.ts"; -import arrayLikeIterator from "./iterables/initial/array-like.ts"; -import objectIterator from "./iterables/initial/object.ts"; +import arrayLikeIterator from "./generators/array-like.ts"; +import objectIterator from "./generators/object.ts"; import rangeIterable from "./generators/range.ts"; import repeatIterable from "./generators/repeat.ts"; +import FluentAsync from "./fluent-async.ts"; export function fromIterable(iterable: Iterable): FluentIterable { return new Fluent(iterable); } +export function fromAsyncIterable(iterable: AsyncIterable): FluentIterableAsync { + return new FluentAsync(iterable); +} + export function fromObject(value: TValue): FluentIterable<{ key: string, value: TValue[TKey] }>; export function fromObject(value: TValue, resultCreator: (key: TKey, value: TValue[TKey]) => TResult): FluentIterable; export function fromObject(value: TValue, resultCreator?: (key: TKey, value: TValue[TKey]) => TResult): FluentIterable<{ key: string, value: TValue[TKey] } | TResult> { @@ -31,25 +36,32 @@ export function repeat(value: TValue, times: number): FluentIterable(iterable: AsyncIterable): FluentIterableAsync; export function from(iterable: Iterable | ArrayLike): FluentIterable; export function from(value: TValue): FluentIterable<{ key: string, value: TValue[TKey] }>; -export function from(source: Iterable | ArrayLike | TValue) { +export function from(source: Iterable | ArrayLike | TValue | AsyncIterable) { if (isIterable(source)) { return fromIterable(source); } + if (isAsyncIterable(source)) { + return fromAsyncIterable(source); + } if (isArrayLike(source)) { return fromArrayLike(source); } return fromObject(source as object); } - - function isIterable(o: any): o is Iterable { const iterator = o[Symbol.iterator]; return typeof iterator === 'function'; } +function isAsyncIterable(o: any): o is AsyncIterable { + const iterator = o[Symbol.asyncIterator]; + return typeof iterator === 'function'; +} + function isArrayLike(o: any): o is ArrayLike { return 'length' in o; } diff --git a/src/finalizers/to-array.ts b/src/finalizers/to-array.ts index 0a82e5d..bf43394 100644 --- a/src/finalizers/to-array.ts +++ b/src/finalizers/to-array.ts @@ -19,3 +19,14 @@ export async function toArrayAsyncCollector(source: AsyncIterable, map? } return result; } + +export async function toMapAsyncCollector( + source: AsyncIterable, + keySelector: (item: TValue) => TKey, + elementSelector?: (item: TValue) => TElement): Promise> { + const map = new Map(); + for await (const item of source) { + map.set(keySelector(item), elementSelector ? elementSelector(item) : item); + } + return map; +} diff --git a/src/fluent-async-subject.ts b/src/fluent-async-subject.ts new file mode 100644 index 0000000..1b85292 --- /dev/null +++ b/src/fluent-async-subject.ts @@ -0,0 +1,128 @@ +import {doneValue, iteratorResultCreator} from "./utils.ts"; +import FluentAsync from "./fluent-async.ts"; + +export interface SubjectIterator extends AsyncIterator { + emit(value: T): void; + complete(): void; + fail(err: any): void; +} + +export class ReplaySubjectAsyncIterator implements SubjectIterator { + private queue: T[] = []; + private waiters: { + resolve: (result: IteratorResult) => void; + reject: (err: any) => void; + }[] = []; + + private closed = false; + private error: any = null; + + public next(): Promise> { + if (this.error) { + return Promise.reject(this.error); + } + + if (this.queue.length > 0) { + return Promise.resolve(iteratorResultCreator(this.queue.shift()!)); + } + + if (this.closed) { + return Promise.resolve(doneValue()); + } + + return new Promise((resolve, reject) => { + this.waiters.push({ resolve, reject }); + }); + } + + public emit(value: T) { + if (this.closed || this.error) { + return; + } + + if (this.waiters.length) { + this.waiters.shift()!.resolve(iteratorResultCreator(value)); + } else { + this.queue.push(value); + } + } + + public complete() { + if (this.closed || this.error) { + return; + } + this.closed = true; + + while (this.waiters.length) { + this.waiters.shift()!.resolve(doneValue()); + } + } + + public fail(err: any) { + if (this.closed || this.error) { + return; + } + this.error = err; + this.closed = true; + this.queue = []; + + while (this.waiters.length) { + this.waiters.shift()!.reject(err); + } + } +} + +export class SubjectAsyncIterator extends ReplaySubjectAsyncIterator { + private started = false; + + public next(): Promise> { + this.started = true; + return super.next(); + } + + public emit(value: T) { + if (!this.started) { + return; + } + super.emit(value); + } +} + +export class FluentAsyncSubject extends FluentAsync implements AsyncDisposable, Disposable { + private subject: SubjectIterator; + + constructor(iterator?: SubjectIterator) { + const iter = iterator ?? new SubjectAsyncIterator(); + super({ + [Symbol.asyncIterator]: () => iter, + }); + this.subject = iter; + } + + [Symbol.asyncDispose](): PromiseLike { + this.complete(); + return Promise.resolve(); + } + + [Symbol.dispose](): void { + this.complete(); + } + + emit(value: T) { + this.subject.emit(value); + } + + complete() { + this.subject.complete(); + } + + fail(err: any) { + this.subject.fail(err); + } +} + +export class FluentAsyncReplaySubject extends FluentAsyncSubject { + constructor() { + super(new ReplaySubjectAsyncIterator()); + } +} diff --git a/src/fluent-async.ts b/src/fluent-async.ts index 5e45c47..6869c3a 100644 --- a/src/fluent-async.ts +++ b/src/fluent-async.ts @@ -1,9 +1,23 @@ -import type { FluentIterableAsync } from 'fluent-iter'; +import type { + FluentIterable, + FluentIterableAsync, + FluentIterableAsyncPromise, + FulfilledPromiseResult, + IGrouping, + PromiseMap, + PromiseResult, + RejectedPromiseResult +} from 'fluent-iter'; import type {Mapper, Predicate} from "./interfaces.js"; import {whereAsyncIterator} from "./iterables/where.js"; import {selectIteratorAsync} from "./iterables/select.js"; -import takeIteratorAsync, {takeIterator} from "./iterables/take.js"; -import {toArrayAsyncCollector} from "./finalizers/to-array.js"; +import takeIteratorAsync from "./iterables/take.js"; +import {toArrayAsyncCollector, toMapAsyncCollector} from "./finalizers/to-array.js"; +import {groupByAsyncIterator} from "./iterables/group.ts"; +import {takeWhileIteratorAsync} from "./iterables/take-while.ts"; +import {skipIteratorAsync} from "./iterables/skip.ts"; +import {skipWhileIteratorAsync} from "./iterables/skip-while.ts"; +import {distinctIteratorAsync} from "./iterables/set-iterators.ts"; export default class FluentAsync implements FluentIterableAsync { readonly #source: AsyncIterable; @@ -23,12 +37,55 @@ export default class FluentAsync implements FluentIterableAsync take(count: number): FluentIterableAsync { return new FluentAsync(takeIteratorAsync(this, count)); } + takeWhile(condition: (item: TValue, index: number) => boolean): FluentIterableAsync { + return new FluentAsync(takeWhileIteratorAsync(this, condition)); + } + skip(count: number): FluentIterableAsync { + return new FluentAsync(skipIteratorAsync(this, count)); + } + skipWhile(condition: (item: TValue, index: number) => boolean): FluentIterableAsync { + return new FluentAsync(skipWhileIteratorAsync(this, condition)); + } + distinct(keySelector?: (item: TValue) => TKey): FluentIterableAsync { + return new FluentAsync(distinctIteratorAsync(this, keySelector)); + } + groupBy(keySelector: (item: TValue, index: number) => TKey): + [TKey, TValue] extends ['fulfilled' | 'rejected', PromiseResult] ? + FluentIterableAsync< IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>> + : FluentIterableAsync>; + groupBy(keySelector: (item: TValue, index: number) => TKey, elementSelector: (item: TValue, index: number) => TElement): FluentIterableAsync>; + groupBy(keySelector: (item: TValue, index: number) => TKey, elementSelector: (item: TValue, index: number) => TElement, resultCreator: (key: TKey, items: FluentIterable) => TResult): FluentIterableAsync; + groupBy(keySelector: (item: TValue, index: number) => TKey, + elementSelector?: (item: TValue, index: number) => TElement, + resultCreator?: (key: TKey, items: FluentIterable) => TResult): FluentIterableAsync | IGrouping | TResult> { + return new FluentAsync(groupByAsyncIterator(this, keySelector, elementSelector, resultCreator)); + } toArray(): Promise; toArray(map: Mapper): Promise; toArray(map?: Mapper): Promise<(TValue|TResult)[]> { return toArrayAsyncCollector(this, map); } + + toMap(keySelector: (item: TValue) => TKey): + [TKey, TValue] extends ['fulfilled' | 'rejected', IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>] + ? Promise> + : Promise>; + toMap(keySelector: (item: TValue) => TKey, elementSelector: (item: TValue) => TElement): Promise>; + toMap(keySelector: (item: TValue) => TKey, elementSelector?: (item: TValue) => TElement): Promise> { + return toMapAsyncCollector(this, keySelector, elementSelector); + } + [Symbol.asyncIterator](): AsyncIterator { return this.#source[Symbol.asyncIterator](); } } + +export class FluentAsyncPromise extends FluentAsync> implements FluentIterableAsyncPromise { + groupByStatus(): FluentIterableAsync< IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>> { + return super.groupBy(x => x.status) as FluentIterableAsync< IGrouping<'fulfilled', FulfilledPromiseResult> | IGrouping<'rejected', RejectedPromiseResult>>; + } + + toStatusMap(): Promise> { + return toMapAsyncCollector(this, p => p.status) as Promise>; + } +} diff --git a/src/fluent-iterable.ts b/src/fluent-iterable.ts deleted file mode 100644 index e69de29..0000000 diff --git a/src/fluent.ts b/src/fluent.ts index 0f7e232..5919515 100644 --- a/src/fluent.ts +++ b/src/fluent.ts @@ -1,11 +1,11 @@ import { whereIterator } from "./iterables/where.ts"; import { selectIterator } from "./iterables/select.ts"; -import selectManyIterator from "./iterables/select-many.ts"; +import { selectManyIterator, flatIterator, flatMapIterator } from "./iterables/select-many.ts"; import { takeIterator } from "./iterables/take.ts"; -import skipIterator from "./iterables/skip.ts"; +import { skipIterator } from "./iterables/skip.ts"; import { toArrayCollector } from "./finalizers/to-array.ts"; -import takeWhileIterator from "./iterables/take-while.ts"; -import skipWhileIterator from "./iterables/skip-while.ts"; +import { takeWhileIterator } from "./iterables/take-while.ts"; +import { skipWhileIterator } from "./iterables/skip-while.ts"; import takeLastIterator from "./iterables/take-last.ts"; import skipLastIterator from "./iterables/skip-last.ts"; import {allAndEveryCollector, allCollector} from "./finalizers/all.ts"; @@ -36,7 +36,7 @@ import groupJoinIterator from "./iterables/group-join.ts"; import zipIterable from "./iterables/zip.js"; import type {Action, Comparer, Mapper, Predicate} from "./interfaces.ts"; -import type { FluentIterable, IGrouping } from 'fluent-iter'; +import type {FlatFluentIterable, FluentIterable, IGrouping } from 'fluent-iter'; export default class Fluent implements FluentIterable { readonly #source: Iterable; @@ -53,8 +53,17 @@ export default class Fluent implements FluentIterable { select(map: Mapper): FluentIterable { return new Fluent(selectIterator(this, map)); } + flat(depth: number = 1): FlatFluentIterable { + return new Fluent(flatIterator(this, depth)) as any; + } + flatMap(mapper: (value: TValue) => TResult | ReadonlyArray): FluentIterable { + return new Fluent(flatMapIterator(this, mapper)); + } selectMany(innerSelector: (item: TValue) => TInner[], resultCreator?: (outer: TValue, inner: TInner) => TResult): FluentIterable { - return new Fluent(selectManyIterator(this, innerSelector, resultCreator)); + if (typeof resultCreator !== 'undefined') { + return new Fluent(selectManyIterator(this, innerSelector, resultCreator)); + } + return new Fluent(selectManyIterator(this, innerSelector)); } take(count: number): FluentIterable { return new Fluent(takeIterator(this, count)); diff --git a/src/iterables/initial/array-like.ts b/src/generators/array-like.ts similarity index 100% rename from src/iterables/initial/array-like.ts rename to src/generators/array-like.ts diff --git a/src/iterables/initial/object.ts b/src/generators/object.ts similarity index 100% rename from src/iterables/initial/object.ts rename to src/generators/object.ts diff --git a/src/generators/promises.ts b/src/generators/promises.ts new file mode 100644 index 0000000..db94cb4 --- /dev/null +++ b/src/generators/promises.ts @@ -0,0 +1,36 @@ +import {FulfilledPromiseResult, PromiseResult, RejectedPromiseResult } from "fluent-iter"; +import {createAsyncIterable} from "../utils.ts"; + +export function fromPromisesIterable( + ...promises: Promise[] +): AsyncIterable> { + return createAsyncIterable(() => fromPromisesGenerator(...promises)); +} + +async function* fromPromisesGenerator( + ...promises: Promise[] +): AsyncGenerator> { + const pending = new Map>>( + promises.map((p, index) => [ + index, + p.then( + (value) => ({ index, status: 'fulfilled', value }), + (reason) => ({ index, status: 'rejected', reason }) + ), + ]) + ); + + while (pending.size > 0) { + const result = await Promise.race(pending.values()); + pending.delete(result.index); + yield result; + } +} + +export function isFulfilled(result: PromiseResult): result is FulfilledPromiseResult { + return result.status === 'fulfilled'; +} + +export function isRejected(result: PromiseResult): result is RejectedPromiseResult { + return result.status === 'rejected'; +} diff --git a/src/index.ts b/src/index.ts index 094071e..829a371 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ export { fromIterable, fromObject, fromArrayLike, range, from, repeat } from './creation.ts'; -export { fromEvent, fromTimer } from './creation-async.ts'; \ No newline at end of file +export { fromEvent, fromTimer, fromPromises } from './creation-async.ts'; +export { isFulfilled, isRejected } from './generators/promises.ts'; diff --git a/src/iterables/group.ts b/src/iterables/group.ts index 463d2fc..88455c5 100644 --- a/src/iterables/group.ts +++ b/src/iterables/group.ts @@ -1,5 +1,5 @@ import type {FluentIterable, IGrouping} from "fluent-iter"; -import {createIterable, group} from "../utils.ts"; +import {createAsyncIterable, createIterable, group, groupAsync} from "../utils.ts"; import {Grouping} from "../fluent.js"; export function groupByIterator( @@ -24,3 +24,26 @@ function* groupByGenerator( yield resultCreate(key, items); } } + +export function groupByAsyncIterator( + source: AsyncIterable, + keySelector: (item: TValue, index: number) => TKey, + elementSelector?: (item: TValue, index: number) => TElement, + resultCreator?: (key: TKey, items: FluentIterable) => TResult): AsyncIterable | IGrouping | TResult> { + return createAsyncIterable(() => groupByAsyncGenerator(source, keySelector, elementSelector, resultCreator)); +} + +async function* groupByAsyncGenerator( + source: AsyncIterable, + keySelector: (item: TValue, index: number) => TKey, + elementSelector?: (item: TValue, index: number) => TElement, + resultCreator?: (key: TKey, items: FluentIterable) => TResult): AsyncGenerator | IGrouping | TResult> { + const elementSelect: (item: TValue, index: number) => TElement = elementSelector ?? ((item) => item as unknown as TElement); + const resultCreate: (key: TKey, items: FluentIterable) => TResult = resultCreator ?? + ((key, items) => new Grouping(key, items) as unknown as TResult); + + const groups = await groupAsync(source, keySelector, elementSelect); + for (const [key, items] of groups.entries()) { + yield resultCreate(key, items); + } +} diff --git a/src/iterables/select-many.ts b/src/iterables/select-many.ts index 8ed75bb..d58dd2d 100644 --- a/src/iterables/select-many.ts +++ b/src/iterables/select-many.ts @@ -1,15 +1,57 @@ import {createIterable} from "../utils.ts"; +import {Mapper} from "../interfaces.ts"; /** * Return flatten mapped array [[1, 2], [3, 4]].selectMany(x => x) === [1, 2, 3, 4, 5] */ -export default function selectManyIterator( +export function selectManyIterator( + input: Iterable, + innerSelector: (item: TValue) => TInner[]): Iterable; +export function selectManyIterator( + input: Iterable, + innerSelector: (item: TValue) => TInner[], + resultCreator: (outer: TValue, inner: TInner) => TResult): Iterable; +export function selectManyIterator( input: Iterable, innerSelector: (item: TValue) => TInner[], resultCreator?: (outer: TValue, inner: TInner) => TResult): Iterable { return createIterable(() => selectManyGenerator(input, innerSelector, resultCreator)); } +export function flatIterator(input: Iterable, depth: number): Iterable { + return createIterable(() => flatGenerator(input, depth, 0)); +} + +export function flatMapIterator(input: Iterable, mapper: Mapper): Iterable { + return createIterable(() => flatMapGenerator(input, mapper)); +} + +export function* flatGenerator(input: Iterable, depth: number, level: number): Generator { + for (const item of input) { + if (Array.isArray(item)) { + if (level >= depth) { + yield item; + } else { + level++; + yield* flatGenerator(item, depth, level); + level--; + } + } else { + yield item; + } + } +} + +export function* flatMapGenerator(input: Iterable, mapper: Mapper): Generator { + for (const item of input) { + if (Array.isArray(item)) { + yield* item.map(mapper) as TResult[]; + } else { + yield mapper(item) as TResult; + } + } +} + export function* selectManyGenerator( input: Iterable, innerSelector: (item: TValue) => TInner[], diff --git a/src/iterables/set-iterators.ts b/src/iterables/set-iterators.ts index e1e1b17..17057c0 100644 --- a/src/iterables/set-iterators.ts +++ b/src/iterables/set-iterators.ts @@ -16,6 +16,21 @@ export function distinctIterator(source: Iterable, }; } +export function distinctIteratorAsync(source: AsyncIterable, keySelector?: (item: TValue) => TKey): AsyncIterable { + const keySelectorFunc = keySelector ?? defaultKeySelector; + return { + [Symbol.asyncIterator]: async function* (){ + const set = new SetCheck(); + for await (const item of source) { + const key = keySelectorFunc(item); + if (set.tryAdd(key)) { + yield item; + } + } + } + }; +} + export function unionIterator(first: Iterable, second: Iterable, keySelector?: (item: TValue) => TKey): Iterable { const keySelectorFunc = keySelector ?? defaultKeySelector; const set = new SetCheck(); diff --git a/src/iterables/skip-while.ts b/src/iterables/skip-while.ts index 403232a..9f52624 100644 --- a/src/iterables/skip-while.ts +++ b/src/iterables/skip-while.ts @@ -3,7 +3,7 @@ import {createIterable} from "../utils.ts"; /** * Return skip first elements until condition got falsy and return rest */ -export default function skipWhileIterator(input: Iterable, condition: (item: TValue, index: number) => boolean): Iterable { +export function skipWhileIterator(input: Iterable, condition: (item: TValue, index: number) => boolean): Iterable { return createIterable(() => skipWhileGenerator(input, condition)); } @@ -11,7 +11,7 @@ function* skipWhileGenerator(input: Iterable, condition: (item: let flag = false; let index = 0; for (const item of input) { - if (condition(item, index++) && !flag) { + if (!flag && condition(item, index++)) { continue; } else { flag = true; @@ -19,3 +19,20 @@ function* skipWhileGenerator(input: Iterable, condition: (item: yield item; } } + +export function skipWhileIteratorAsync(input: AsyncIterable, condition: (item: TValue, index: number) => boolean): AsyncIterable { + return { + [Symbol.asyncIterator]: async function* (){ + let flag = false; + let index = 0; + for await (const item of input) { + if (!flag && condition(item, index++)) { + continue; + } else { + flag = true; + } + yield item; + } + } + }; +} diff --git a/src/iterables/skip.ts b/src/iterables/skip.ts index b09710d..3fa5df7 100644 --- a/src/iterables/skip.ts +++ b/src/iterables/skip.ts @@ -3,7 +3,7 @@ import {createIterable} from "../utils.ts"; /** * Skip first N numbers of source and return the rest */ -export default function skipIterator(input: Iterable, count: number): Iterable { +export function skipIterator(input: Iterable, count: number): Iterable { return createIterable(() => skipGenerator(input, count)); } @@ -17,3 +17,18 @@ function* skipGenerator(input: Iterable, count: number): Generat yield item; } } + +export function skipIteratorAsync(input: AsyncIterable, count: number): AsyncIterable { + return { + [Symbol.asyncIterator]: async function* (){ + let skipped = 0; + for await (const item of input) { + if (skipped < count) { + skipped++; + continue; + } + yield item; + } + } + } +} diff --git a/src/iterables/take-while.ts b/src/iterables/take-while.ts index d3490ef..55b6778 100644 --- a/src/iterables/take-while.ts +++ b/src/iterables/take-while.ts @@ -3,7 +3,7 @@ import {createIterable} from "../utils.ts"; /** * Return items until certain condition got falsy */ -export default function takeWhileIterator(input: Iterable, condition: (item: TValue, index: number) => boolean): Iterable { +export function takeWhileIterator(input: Iterable, condition: (item: TValue, index: number) => boolean): Iterable { return createIterable(() => takeWhileGenerator(input, condition)); } @@ -17,3 +17,18 @@ function* takeWhileGenerator(input: Iterable, condition: (item: } } } + +export function takeWhileIteratorAsync(input: AsyncIterable, condition: (item: TValue, index: number) => boolean): AsyncIterable { + return { + [Symbol.asyncIterator]: async function* (){ + let index = 0; + for await (const item of input) { + if (condition(item, index++)) { + yield item; + } else { + break; + } + } + } + }; +} diff --git a/src/utils.ts b/src/utils.ts index 47321de..c0e353c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,6 +1,6 @@ import {Comparer} from "./interfaces.ts"; import {from, fromIterable} from "./creation.js"; -import { FluentIterable } from "fluent-iter"; +import { FluentIterable, FluentIterableAsync} from "fluent-iter"; /** * Helper function to be use to access Symbol.iterator of iterable @@ -179,12 +179,42 @@ export function group( return fromIterable(map).toMap(([key, _]) => key as TKey, ([_, value]) => from(value)); } +export async function groupAsync( + iterable: AsyncIterable, + keySelector: (item: TValue, index: number) => TKey, + elementSelector: (item: TValue, index: number) => TElement): Promise>> { + const map = new Map(); + let i = 0; + for await (const item of iterable) { + const key = keySelector(item, i); + if ((key !== null && typeof key === 'object') || typeof key === "function") { + throw new TypeError('groupBy method does not support keys to be objects or functions'); + } + const element = elementSelector(item, i); + const value = map.get(key) || []; + value.push(element); + map.set(key, value); + i++; + } + const newMap = new Map>(); + for (const [key, value] of map) { + newMap.set(key, from(value)); + } + return newMap; +} + export function createIterable(generator: () => Generator): Iterable { return { [Symbol.iterator]: generator, } } +export function createAsyncIterable(generator: () => AsyncGenerator): AsyncIterable { + return { + [Symbol.asyncIterator]: generator, + } +} + export function delay(ms: number): Promise { return new Promise((resolve) => { setTimeout(resolve, ms); diff --git a/test/test-utils.ts b/test/test-utils.ts new file mode 100644 index 0000000..14c1849 --- /dev/null +++ b/test/test-utils.ts @@ -0,0 +1,23 @@ +import {doneValue} from "../src/utils.ts"; + +export function wait(ms: number, result: T): Promise { + return new Promise((resolve) => { + setTimeout(() => { + resolve(result); + }, ms); + }); +} + +export function waitAndReject(ms: number, reason: any): Promise { + return new Promise((_, reject) => { + setTimeout(() => { + reject(reason); + }, ms); + }); +} + +export const emptyAsyncIterable: AsyncIterable = ({ + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve(doneValue()) + }), +}); diff --git a/test/unit/distinct.spec.ts b/test/unit/distinct.spec.ts index 3bbfcb5..7d68f1a 100644 --- a/test/unit/distinct.spec.ts +++ b/test/unit/distinct.spec.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from "vitest"; -import { from, fromIterable } from "../../src/index.ts"; +import {from, fromIterable, fromPromises, isFulfilled} from "../../src/index.ts"; import { Person } from "./models.ts"; +import {wait} from "../test-utils.ts"; describe('distinct tests', () => { [ @@ -56,3 +57,29 @@ describe('distinct tests', () => { ]); }); }); + +describe('distinct async tests', () => { + it('should take distinct values', async () => { + const res = await fromPromises( + wait(1, 1), + wait(2, 2), + wait(3, 3), + wait(4, 2), + wait(5, 1) + ).where(isFulfilled).select(x => x.value).distinct().toArray(); + + expect(res).toStrictEqual([1, 2, 3]); + }); + + it('should take distinct values by key', async () => { + const res = await fromPromises( + wait(1, 2), + wait(2, 2), + wait(3, 3), + wait(4, 3), + wait(5, 1) + ).where(isFulfilled).distinct(x => x.value).select(x => x.value).toArray(); + + expect(res).toStrictEqual([2, 3, 1]); + }); +}); diff --git a/test/unit/fluent-async-subject.spec.ts b/test/unit/fluent-async-subject.spec.ts new file mode 100644 index 0000000..c19f2a5 --- /dev/null +++ b/test/unit/fluent-async-subject.spec.ts @@ -0,0 +1,144 @@ +import {expect, describe, it} from "vitest"; +import {FluentAsyncReplaySubject, FluentAsyncSubject} from "../../src/fluent-async-subject.ts"; + +describe('FluentAsyncReplaySubject', () => { + it('should emit values to array', async () => { + const subject = new FluentAsyncReplaySubject(); + subject.emit(1); + subject.emit(2); + subject.complete(); + const values = await subject.toArray(); + expect(values).toStrictEqual([1, 2]); + }); + + it('should emit values', async () => { + const subject = new FluentAsyncReplaySubject(); + const values: number[] = []; + let i = 1; + subject.emit(i); + for await (const value of subject) { + values.push(value); + i++; + if (i === 5) { + subject.complete(); + } + subject.emit(i); + } + expect(values).toStrictEqual([1, 2, 3, 4]); + }); + + it('should process values emitted before complete and ignore the rest.', async () => { + const subject = new FluentAsyncReplaySubject(); + const values: number[] = []; + let i = 1; + setTimeout(() => { + subject.emit(i); + }, 1); + for await (const value of subject) { + values.push(value); + i++; + if (i === 5) { + subject.complete(); + } + subject.emit(i); + subject.emit(i); + } + expect(values).toStrictEqual([1, 2, 2, 3, 3, 4, 4]); + }); + + it('should be fluent iterable.', async () => { + const subject = new FluentAsyncReplaySubject(); + const values: number[] = []; + let i = 1; + subject.emit(i); + for await (const value of subject.select(x => x * 2).take(3)) { + values.push(value); + i++; + if (i === 5) { + subject.complete(); + } + subject.emit(i); + subject.emit(i); + } + // from first 3 emitted values: 1, 2, 2, multiply by 2. + expect(values).toStrictEqual([2, 4, 4]); + }); + + it('should return values for each next call', async () => { + const subject = new FluentAsyncReplaySubject(); + const iterator = subject[Symbol.asyncIterator](); + setTimeout(() => { + subject.emit(1); + subject.emit(2); + subject.complete(); + }, 1); + const p0 = iterator.next(); + const p1 = iterator.next(); + const p2 = iterator.next(); + + const [r0, r1, r2] = await Promise.all([p0, p1, p2]); + + expect(r0.value).toBe(1); + expect(r1.value).toBe(2); + expect(r2.done).toBe(true); + }); + + it('should be async disposable.', async () => { + const f = async () => { + await using subject = new FluentAsyncReplaySubject(); + subject.emit(1); + subject.emit(2); + return subject; + } + + const subject = await f(); + const values: number[] = []; + for await (const value of subject) { + values.push(value); + } + expect(values).toStrictEqual([1, 2]); + }); + + it('should be disposable.', async () => { + const f = () => { + using subject = new FluentAsyncReplaySubject(); + subject.emit(1); + subject.emit(2); + return subject; + } + + const subject = f(); + const values: number[] = []; + for await (const value of subject) { + values.push(value); + } + expect(values).toStrictEqual([1, 2]); + }); +}); + +describe('FluentAsyncSubject', () => { + it('should not return anything if completed before started', async () => { + const subject = new FluentAsyncSubject(); + subject.emit(1); + subject.emit(2); + subject.complete(); + const values = await subject.toArray(); + + expect(values).toStrictEqual([]); + }); + + it('should return values emitted after start', async () => { + const subject = new FluentAsyncSubject(); + subject.emit(1); + + const values: number[] = []; + setTimeout(() => { + subject.emit(2); + }, 1); + for await (const value of subject) { + values.push(value); + subject.complete(); + } + expect(values).toStrictEqual([2]); + }); +}); diff --git a/test/unit/from-promises.spec.ts b/test/unit/from-promises.spec.ts new file mode 100644 index 0000000..2af3e11 --- /dev/null +++ b/test/unit/from-promises.spec.ts @@ -0,0 +1,44 @@ +import {describe, expect, it} from "vitest"; + +import {fromPromises, isFulfilled} from "../../src/index.ts"; +import {wait, waitAndReject} from "../test-utils.ts"; + +describe('from-promises', () => { + it('should return all results', async () => { + const promise0 = wait(10, '1'); + const promise1 = wait(20, '2'); + const promise2 = wait(30, '3'); + + const results = await fromPromises(promise0, promise1, promise2).where(isFulfilled).select(x => '0' + x.value).toArray(); + expect(results).toStrictEqual(['01', '02', '03']); + }); + + it('should be iterable', async () => { + const promise0 = wait(10, '1'); + const promise1 = wait(20, '2'); + const promise2 = wait(30, '3'); + + const results = fromPromises(promise0, promise1, promise2).where(isFulfilled).select(x => '0' + x.value); + const values: string[] = []; + for await (const result of results) { + values.push(result); + } + expect(values).toStrictEqual(['01', '02', '03']); + }); + + it('should have rejected and fulfillment', async () => { + const promise0 = wait(10, '1'); + const promise1 = waitAndReject(20, '2') as Promise; + const promise2 = wait(30, '3'); + + const results = await fromPromises(promise0, promise1, promise2) + .groupByStatus() + .toMap(x => x.key); + expect(results.has('fulfilled')).toBeTruthy(); + expect(results.has('rejected')).toBeTruthy(); + const fulfilled = results.get('fulfilled')!.toArray(x => x.value); + const rejected = results.get('rejected')!.toArray(x => x.reason); + expect(fulfilled).toStrictEqual(['1', '3']); + expect(rejected).toStrictEqual(['2']); + }); +}); \ No newline at end of file diff --git a/test/unit/select-many.spec.ts b/test/unit/select-many.spec.ts index 3c87979..243881f 100644 --- a/test/unit/select-many.spec.ts +++ b/test/unit/select-many.spec.ts @@ -89,4 +89,33 @@ describe('select many tests', () => { expect(resultFlat).toEqual(resultSelectMany); }); }); + + it('should flat sequence: depth 1', () => { + const input = [ + [1, 2, 3, 4, [5, 6]], + ['a', 'b', ['c', ['d']]], + 'e' + ]; + const output = from(input).flat().toArray(); + expect(output).toStrictEqual([1, 2, 3, 4, [5, 6], 'a', 'b', ['c', ['d']], 'e']); + }); + + it('should flat sequence: depth 2', () => { + const input = [ + [1, 2, 3, 4, [5, 6]], + ['a', 'b', 'c', 'd'], + 'e' + ]; + const output = from(input).flat(2).toArray(); + expect(output).toStrictEqual([1, 2, 3, 4, 5, 6, 'a', 'b', 'c', 'd', 'e']); + }); + + it('should flatMap sequence', () => { + const input = [ + [1, 2, 3, 4, [5, 6]], + 10 + ]; + const output = from(input).flatMap(i => i).toArray(); + expect(output).toStrictEqual([1, 2, 3, 4, [5, 6], 10]); + }); }); diff --git a/test/unit/skip.spec.ts b/test/unit/skip.spec.ts index ddbccad..4dcde56 100644 --- a/test/unit/skip.spec.ts +++ b/test/unit/skip.spec.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "vitest"; -import { from, fromIterable, range } from "../../src/index.ts"; +import {from, fromIterable, fromTimer, range} from "../../src/index.ts"; +import {emptyAsyncIterable} from "../test-utils.ts"; describe('skip tests', () => { [ @@ -71,3 +72,35 @@ describe('skip tests', () => { }); }); }); + +describe('skipAsync', () => { + it('should skip first n elements', async () => { + const arr = await fromTimer(1).skip(2).take(3).toArray(); + expect(arr).toStrictEqual([2, 3, 4]); + }); + + it('should skip zero elements', async () => { + const arr = await fromTimer(1).skip(0).take(1).toArray(); + expect(arr).toStrictEqual([0]); + }); + + it('should skip empty sequence', async () => { + const arr = await from(emptyAsyncIterable).skip(3).toArray(); + expect(arr).toStrictEqual([]); + }); + + it('should skip while while predicate is true', async () => { + const arr = await fromTimer(1).skipWhile(x => x < 5).take(3).toArray(); + expect(arr).toStrictEqual([5, 6, 7]); + }); + + it('should skip while nothing from empty', async () => { + const arr = await from(emptyAsyncIterable as AsyncIterable).skipWhile(x => x < 5).toArray(); + expect(arr).toStrictEqual([]); + }); + + it('should skip while nothing when false', async () => { + const arr = await fromTimer(1).skipWhile(x => x > 5).take(2).toArray(); + expect(arr).toStrictEqual([0, 1]); + }); +}); diff --git a/test/unit/take.spec.ts b/test/unit/take.spec.ts index 9e331ef..d90b81c 100644 --- a/test/unit/take.spec.ts +++ b/test/unit/take.spec.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "vitest"; -import { from, fromIterable, range } from "../../src/index.ts"; +import {from, fromIterable, fromTimer, range} from "../../src/index.ts"; +import {emptyAsyncIterable} from "../test-utils.ts"; describe('take tests', () => { [ @@ -72,3 +73,35 @@ describe('take tests', () => { }); }); }); + +describe('takeAsync', () => { + it('should take first n elements', async () => { + const arr = await fromTimer(1).take(3).toArray(); + expect(arr).toStrictEqual([0, 1, 2]); + }); + + it('should take zero elements', async () => { + const arr = await fromTimer(1).take(0).toArray(); + expect(arr).toStrictEqual([]); + }); + + it('should take empty sequence', async () => { + const arr = await from(emptyAsyncIterable).take(3).toArray(); + expect(arr).toStrictEqual([]); + }); + + it('should take while while predicate is true', async () => { + const arr = await fromTimer(1).takeWhile(x => x < 5).toArray(); + expect(arr).toStrictEqual([0, 1, 2, 3, 4]); + }); + + it('should take while nothing from empty', async () => { + const arr = await from(emptyAsyncIterable as AsyncIterable).takeWhile(x => x < 5).toArray(); + expect(arr).toStrictEqual([]); + }); + + it('should take while nothing when false', async () => { + const arr = await fromTimer(1).take(3).takeWhile(x => x > 5).toArray(); + expect(arr).toStrictEqual([]); + }); +}); diff --git a/test/unit/to-array.spec.ts b/test/unit/to-array.spec.ts index a2ffa67..5a43afe 100644 --- a/test/unit/to-array.spec.ts +++ b/test/unit/to-array.spec.ts @@ -7,4 +7,10 @@ describe('to array', () => { expect(arr).toEqual([0, 1, 2]); }); + + it('should get a mapped array from async iterable', async () => { + const arr = await fromTimer(5, 0).take(3).toArray(x => x * 2); + + expect(arr).toEqual([0, 2, 4]); + }); });