From 8b8b157abf62de5ac9c859a4a6a05ff968e652d7 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Sat, 11 Dec 2021 10:37:30 -0500 Subject: [PATCH 1/4] chore(stream): use experimental streams for interop with NodeJS Writables and Readables --- packages/node/src/Stream/index.ts | 201 ++++++++++++++++-------------- packages/node/test/stream.test.ts | 56 +++++++-- 2 files changed, 148 insertions(+), 109 deletions(-) diff --git a/packages/node/src/Stream/index.ts b/packages/node/src/Stream/index.ts index 107793d..18102bb 100644 --- a/packages/node/src/Stream/index.ts +++ b/packages/node/src/Stream/index.ts @@ -4,12 +4,10 @@ import * as C from "@effect-ts/core/Collections/Immutable/Chunk" import * as T from "@effect-ts/core/Effect" +import * as S from "@effect-ts/core/Effect/Experimental/Stream" +import * as Sink from "@effect-ts/core/Effect/Experimental/Stream/Sink" import * as M from "@effect-ts/core/Effect/Managed" -import * as S from "@effect-ts/core/Effect/Stream" -import * as Push from "@effect-ts/core/Effect/Stream/Push" -import * as Sink from "@effect-ts/core/Effect/Stream/Sink" -import { pipe, tuple } from "@effect-ts/core/Function" -import * as O from "@effect-ts/core/Option" +import { pipe } from "@effect-ts/core/Function" import type * as stream from "stream" import * as Byte from "../Byte" @@ -23,7 +21,7 @@ export class ReadableError { * Captures a Node `Readable`, converting it into a `Stream`, * * Note: your Readable should not have an encoding set in order to work with buffers, - * calling this with a Readable with an encoding setted with `Die`. + * calling this with a Readable with an encoding set will `Die`. */ export function streamFromReadable( r: () => stream.Readable @@ -37,21 +35,21 @@ export function streamFromReadable( ) : T.unit ), - S.bracket((sr) => + S.acquireReleaseWith((sr) => T.succeedWith(() => { sr.destroy() }) ), S.chain((sr) => - S.effectAsync((cb) => { + S.async((emit) => { sr.on("data", (data) => { - cb(T.succeed(Byte.chunk(data))) + emit.chunk(Byte.chunk(data)) }) sr.on("end", () => { - cb(T.fail(O.none)) + emit.end() }) sr.on("error", (err) => { - cb(T.fail(O.some(new ReadableError(err)))) + emit.fail(new ReadableError(err)) }) }) ) @@ -64,12 +62,15 @@ export class WritableError { } /** - * Captures a Node `Writable`, converting it into a `Sink` + * Uses the provided NodeJS `Writable` stream to create a `Sink` that consumes + * byte chunks and writes them to the `Writable` stream. The sink will yield + * the count of bytes written. + * + * The `Writable` stream will be automatically closed after the stream is + * finished or an error occurred. */ -export function sinkFromWritable( - w: () => stream.Writable -): Sink.Sink { - return new Sink.Sink( +export function sinkFromWritable(w: () => stream.Writable) { + return Sink.unwrapManaged( pipe( T.succeedWith(w), M.makeExit((sw) => @@ -77,19 +78,18 @@ export function sinkFromWritable( sw.destroy() }) ), - M.map( - (sw) => (o: O.Option>) => - O.isNone(o) - ? Push.emit(undefined, C.empty()) - : T.effectAsync((cb) => { - sw.write(Byte.buffer(o.value), (err) => { - if (err) { - cb(Push.fail(new WritableError(err), C.empty())) - } else { - cb(Push.more) - } - }) - }) + M.map((sw) => + Sink.foldLeftChunksEffect(0, (bytesWritten, byteChunk: C.Chunk) => + T.effectAsync((resume) => { + sw.write(Byte.buffer(byteChunk), (err) => { + if (err) { + resume(T.fail(new WritableError(err))) + } else { + resume(T.succeed(bytesWritten + byteChunk.length)) + } + }) + }) + ) ) ) ) @@ -100,78 +100,85 @@ export class TransformError { constructor(readonly error: Error) {} } -/** - * Captures a Node `Transform` for use with `Stream` - */ -export function transform( - tr: () => stream.Transform -): ( - stream: S.Stream -) => S.Stream { - return (stream: S.Stream) => { - const managedSink = pipe( - T.succeedWith(tr), - M.makeExit((st) => - T.succeedWith(() => { - st.destroy() - }) - ), - M.map((st) => - tuple( - st, - Sink.fromPush( - O.fold( - () => - T.chain_( - T.succeedWith(() => { - st.end() - }), - () => Push.emit(undefined, C.empty()) - ), - (chunk) => - T.effectAsync((cb) => { - st.write(Byte.buffer(chunk), (err) => - err - ? cb(Push.fail(new TransformError(err), C.empty())) - : cb(Push.more) - ) - }) - ) - ) - ) - ) - ) - return pipe( - S.managed(managedSink), - S.chain(([st, sink]) => - S.effectAsyncM( - (cb) => - T.zipRight_( - T.succeedWith(() => { - st.on("data", (chunk) => { - cb(T.succeed(Byte.chunk(chunk))) - }) - st.on("error", (err) => { - cb(T.fail(O.some(new TransformError(err)))) - }) - st.on("end", () => { - cb(T.fail(O.none)) - }) - }), - S.run_(stream, sink) - ) - ) - ) - ) - } -} +// /** +// * Captures a Node `Transform` for use with `Stream` +// */ +// export function transform( +// tr: () => stream.Transform +// ): ( +// stream: S.Stream +// ) => S.Stream { +// return (stream: S.Stream) => { +// const managedSink = pipe( +// T.succeedWith(tr), +// M.makeExit((st) => +// T.succeedWith(() => { +// st.destroy() +// }) +// ), +// M.map((st) => +// tuple( +// st, +// Sink.fromPush( +// O.fold( +// () => +// T.chain_( +// T.succeedWith(() => { +// st.end() +// }), +// () => Push.emit(undefined, C.empty()) +// ), +// (chunk) => +// T.effectAsync((cb) => { +// st.write(Byte.buffer(chunk), (err) => +// err +// ? cb(Push.fail(new TransformError(err), C.empty())) +// : cb(Push.more) +// ) +// }) +// ) +// ) +// ) +// ) +// ) +// return pipe( +// S.managed(managedSink), +// S.chain(([st, sink]) => +// S.effectAsyncM( +// (cb) => +// T.zipRight_( +// T.succeedWith(() => { +// st.on("data", (chunk) => { +// cb(T.succeed(Byte.chunk(chunk))) +// }) +// st.on("error", (err) => { +// cb(T.fail(O.some(new TransformError(err)))) +// }) +// st.on("end", () => { +// cb(T.fail(O.none)) +// }) +// }), +// S.run_(stream, sink) +// ) +// ) +// ) +// ) +// } +// } /** - * A sink that collects all of its inputs into an array. + * A sink that collects all of its inputs into a `Buffer`. */ -export function collectBuffer(): Sink.Sink { +export function collectBuffer(): Sink.Sink< + unknown, + E, + Byte.Byte, + E, + unknown, + Buffer +> { return Sink.map_( - Sink.reduceLeftChunks(C.empty())((s, i: C.Chunk) => + Sink.foldLeftChunks(C.empty(), (s, i: C.Chunk) => C.concat_(s, i) ), Byte.buffer diff --git a/packages/node/test/stream.test.ts b/packages/node/test/stream.test.ts index 5a4928d..b6c0eff 100644 --- a/packages/node/test/stream.test.ts +++ b/packages/node/test/stream.test.ts @@ -1,15 +1,18 @@ +import * as C from "@effect-ts/core/Collections/Immutable/Chunk" import * as T from "@effect-ts/core/Effect" -import * as S from "@effect-ts/core/Effect/Stream" +import * as S from "@effect-ts/core/Effect/Experimental/Stream" +import * as Ref from "@effect-ts/core/Effect/Ref" import { pipe } from "@effect-ts/core/Function" import * as fs from "fs" import * as path from "path" -import * as zlib from "zlib" +import * as stream from "stream" +// import * as zlib from "zlib" import * as Byte from "../src/Byte" import * as NS from "../src/Stream" describe("Node Stream", () => { - it("build from readable", async () => { + it("should build an Effect-TS Stream from a NodeJS stream.Readable", async () => { const res = await pipe( NS.streamFromReadable(() => fs.createReadStream(path.join(__dirname, "fix/data.txt")) @@ -20,19 +23,48 @@ describe("Node Stream", () => { expect(res.toString("utf-8")).toEqual("a, b, c") }) - it("transform (gzip/gunzip)", async () => { + + it("should build an Effect-TS Sink from a NodeJS stream.Writable", async () => { + const mockStream = new stream.PassThrough() + let output: C.Chunk = C.empty() + + mockStream.on("data", (chunk) => { + output = C.concat_(output, Byte.chunk(chunk)) + }) + const res = await pipe( - NS.streamFromReadable(() => - fs.createReadStream(path.join(__dirname, "fix/data.txt")) - ), - NS.transform(zlib.createGzip), - NS.runBuffer, - T.chain((x) => - pipe(Byte.chunk(x), S.fromChunk, NS.transform(zlib.createGunzip), NS.runBuffer) + T.do, + T.bind("bytesWritten", () => + pipe( + Ref.makeRef(0), + T.map((ref) => + S.repeatEffect(T.delay(10)(Ref.updateAndGet_(ref, (n) => n + 1))) + ), + S.unwrap, + S.take(5), + S.map(Byte.byte), + S.run(NS.sinkFromWritable(() => mockStream)) + ) ), T.runPromise ) - expect(res.toString("utf-8")).toEqual("a, b, c") + expect(res.bytesWritten).toEqual(5) + expect(C.toArray(output)).toEqual([1, 2, 3, 4, 5]) }) + // it("transform (gzip/gunzip)", async () => { + // const res = await pipe( + // NS.streamFromReadable(() => + // fs.createReadStream(path.join(__dirname, "fix/data.txt")) + // ), + // NS.transform(zlib.createGzip), + // NS.runBuffer, + // T.chain((x) => + // pipe(S.fromChunk(Byte.chunk(x)), NS.transform(zlib.createGunzip), NS.runBuffer) + // ), + // T.runPromise + // ) + + // expect(res.toString("utf-8")).toEqual("a, b, c") + // }) }) From 345d076c6e0444257bdd4b74695025791e49f6c6 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Sun, 12 Dec 2021 14:26:16 -0500 Subject: [PATCH 2/4] chore: move experimental stream to subdirectory --- .../node/src/Stream/Experimental/index.ts | 129 +++++++++++ packages/node/src/Stream/index.ts | 201 +++++++++--------- .../node/test/stream.experimental.test.ts | 54 +++++ packages/node/test/stream.test.ts | 56 ++--- 4 files changed, 292 insertions(+), 148 deletions(-) create mode 100644 packages/node/src/Stream/Experimental/index.ts create mode 100644 packages/node/test/stream.experimental.test.ts diff --git a/packages/node/src/Stream/Experimental/index.ts b/packages/node/src/Stream/Experimental/index.ts new file mode 100644 index 0000000..2f3752b --- /dev/null +++ b/packages/node/src/Stream/Experimental/index.ts @@ -0,0 +1,129 @@ +/** + * ets_tracing: off + */ + +import * as C from "@effect-ts/core/Collections/Immutable/Chunk" +import * as T from "@effect-ts/core/Effect" +import * as S from "@effect-ts/core/Effect/Experimental/Stream" +import * as Sink from "@effect-ts/core/Effect/Experimental/Stream/Sink" +import * as M from "@effect-ts/core/Effect/Managed" +import { pipe } from "@effect-ts/core/Function" +import type * as stream from "stream" + +import * as Byte from "../../Byte" + +export class ReadableError { + readonly _tag = "ReadableError" + constructor(readonly error: Error) {} +} + +/** + * Captures a Node `Readable`, converting it into a `Stream`, + * + * Note: your Readable should not have an encoding set in order to work with buffers, + * calling this with a Readable with an encoding set will `Die`. + */ +export function streamFromReadable( + r: () => stream.Readable +): S.Stream { + return pipe( + T.succeedWith(r), + T.tap((sr) => + sr.readableEncoding != null + ? T.dieMessage( + `stream.Readable encoding set to ${sr.readableEncoding} cannot be used to produce Buffer` + ) + : T.unit + ), + S.acquireReleaseWith((sr) => + T.succeedWith(() => { + sr.destroy() + }) + ), + S.chain((sr) => + S.async((emit) => { + sr.on("data", (data) => { + emit.chunk(Byte.chunk(data)) + }) + sr.on("end", () => { + emit.end() + }) + sr.on("error", (err) => { + emit.fail(new ReadableError(err)) + }) + }) + ) + ) +} + +export class WritableError { + readonly _tag = "WritableError" + constructor(readonly error: Error) {} +} + +/** + * Uses the provided NodeJS `Writable` stream to create a `Sink` that consumes + * byte chunks and writes them to the `Writable` stream. The sink will yield + * the count of bytes written. + * + * The `Writable` stream will be automatically closed after the stream is + * finished or an error occurred. + */ +export function sinkFromWritable(w: () => stream.Writable) { + return Sink.unwrapManaged( + pipe( + T.succeedWith(w), + M.makeExit((sw) => + T.succeedWith(() => { + sw.destroy() + }) + ), + M.map((sw) => + Sink.foldLeftChunksEffect(0, (bytesWritten, byteChunk: C.Chunk) => + T.effectAsync((resume) => { + sw.write(Byte.buffer(byteChunk), (err) => { + if (err) { + resume(T.fail(new WritableError(err))) + } else { + resume(T.succeed(bytesWritten + byteChunk.length)) + } + }) + }) + ) + ) + ) + ) +} + +export class TransformError { + readonly _tag = "TransformError" + constructor(readonly error: Error) {} +} + +/** + * A sink that collects all of its inputs into a `Buffer`. + */ +export function collectBuffer(): Sink.Sink< + unknown, + E, + Byte.Byte, + E, + unknown, + Buffer +> { + return Sink.map_( + Sink.foldLeftChunks(C.empty(), (s, i: C.Chunk) => + C.concat_(s, i) + ), + Byte.buffer + ) +} + +/** + * Runs the stream and collects all of its elements to a buffer. + */ +export function runBuffer( + self: S.Stream +): T.Effect { + return S.run_(self, collectBuffer()) +} diff --git a/packages/node/src/Stream/index.ts b/packages/node/src/Stream/index.ts index 18102bb..107793d 100644 --- a/packages/node/src/Stream/index.ts +++ b/packages/node/src/Stream/index.ts @@ -4,10 +4,12 @@ import * as C from "@effect-ts/core/Collections/Immutable/Chunk" import * as T from "@effect-ts/core/Effect" -import * as S from "@effect-ts/core/Effect/Experimental/Stream" -import * as Sink from "@effect-ts/core/Effect/Experimental/Stream/Sink" import * as M from "@effect-ts/core/Effect/Managed" -import { pipe } from "@effect-ts/core/Function" +import * as S from "@effect-ts/core/Effect/Stream" +import * as Push from "@effect-ts/core/Effect/Stream/Push" +import * as Sink from "@effect-ts/core/Effect/Stream/Sink" +import { pipe, tuple } from "@effect-ts/core/Function" +import * as O from "@effect-ts/core/Option" import type * as stream from "stream" import * as Byte from "../Byte" @@ -21,7 +23,7 @@ export class ReadableError { * Captures a Node `Readable`, converting it into a `Stream`, * * Note: your Readable should not have an encoding set in order to work with buffers, - * calling this with a Readable with an encoding set will `Die`. + * calling this with a Readable with an encoding setted with `Die`. */ export function streamFromReadable( r: () => stream.Readable @@ -35,21 +37,21 @@ export function streamFromReadable( ) : T.unit ), - S.acquireReleaseWith((sr) => + S.bracket((sr) => T.succeedWith(() => { sr.destroy() }) ), S.chain((sr) => - S.async((emit) => { + S.effectAsync((cb) => { sr.on("data", (data) => { - emit.chunk(Byte.chunk(data)) + cb(T.succeed(Byte.chunk(data))) }) sr.on("end", () => { - emit.end() + cb(T.fail(O.none)) }) sr.on("error", (err) => { - emit.fail(new ReadableError(err)) + cb(T.fail(O.some(new ReadableError(err)))) }) }) ) @@ -62,15 +64,12 @@ export class WritableError { } /** - * Uses the provided NodeJS `Writable` stream to create a `Sink` that consumes - * byte chunks and writes them to the `Writable` stream. The sink will yield - * the count of bytes written. - * - * The `Writable` stream will be automatically closed after the stream is - * finished or an error occurred. + * Captures a Node `Writable`, converting it into a `Sink` */ -export function sinkFromWritable(w: () => stream.Writable) { - return Sink.unwrapManaged( +export function sinkFromWritable( + w: () => stream.Writable +): Sink.Sink { + return new Sink.Sink( pipe( T.succeedWith(w), M.makeExit((sw) => @@ -78,18 +77,19 @@ export function sinkFromWritable(w: () => stream.Writable) { sw.destroy() }) ), - M.map((sw) => - Sink.foldLeftChunksEffect(0, (bytesWritten, byteChunk: C.Chunk) => - T.effectAsync((resume) => { - sw.write(Byte.buffer(byteChunk), (err) => { - if (err) { - resume(T.fail(new WritableError(err))) - } else { - resume(T.succeed(bytesWritten + byteChunk.length)) - } - }) - }) - ) + M.map( + (sw) => (o: O.Option>) => + O.isNone(o) + ? Push.emit(undefined, C.empty()) + : T.effectAsync((cb) => { + sw.write(Byte.buffer(o.value), (err) => { + if (err) { + cb(Push.fail(new WritableError(err), C.empty())) + } else { + cb(Push.more) + } + }) + }) ) ) ) @@ -100,85 +100,78 @@ export class TransformError { constructor(readonly error: Error) {} } -// /** -// * Captures a Node `Transform` for use with `Stream` -// */ -// export function transform( -// tr: () => stream.Transform -// ): ( -// stream: S.Stream -// ) => S.Stream { -// return (stream: S.Stream) => { -// const managedSink = pipe( -// T.succeedWith(tr), -// M.makeExit((st) => -// T.succeedWith(() => { -// st.destroy() -// }) -// ), -// M.map((st) => -// tuple( -// st, -// Sink.fromPush( -// O.fold( -// () => -// T.chain_( -// T.succeedWith(() => { -// st.end() -// }), -// () => Push.emit(undefined, C.empty()) -// ), -// (chunk) => -// T.effectAsync((cb) => { -// st.write(Byte.buffer(chunk), (err) => -// err -// ? cb(Push.fail(new TransformError(err), C.empty())) -// : cb(Push.more) -// ) -// }) -// ) -// ) -// ) -// ) -// ) -// return pipe( -// S.managed(managedSink), -// S.chain(([st, sink]) => -// S.effectAsyncM( -// (cb) => -// T.zipRight_( -// T.succeedWith(() => { -// st.on("data", (chunk) => { -// cb(T.succeed(Byte.chunk(chunk))) -// }) -// st.on("error", (err) => { -// cb(T.fail(O.some(new TransformError(err)))) -// }) -// st.on("end", () => { -// cb(T.fail(O.none)) -// }) -// }), -// S.run_(stream, sink) -// ) -// ) -// ) -// ) -// } -// } +/** + * Captures a Node `Transform` for use with `Stream` + */ +export function transform( + tr: () => stream.Transform +): ( + stream: S.Stream +) => S.Stream { + return (stream: S.Stream) => { + const managedSink = pipe( + T.succeedWith(tr), + M.makeExit((st) => + T.succeedWith(() => { + st.destroy() + }) + ), + M.map((st) => + tuple( + st, + Sink.fromPush( + O.fold( + () => + T.chain_( + T.succeedWith(() => { + st.end() + }), + () => Push.emit(undefined, C.empty()) + ), + (chunk) => + T.effectAsync((cb) => { + st.write(Byte.buffer(chunk), (err) => + err + ? cb(Push.fail(new TransformError(err), C.empty())) + : cb(Push.more) + ) + }) + ) + ) + ) + ) + ) + return pipe( + S.managed(managedSink), + S.chain(([st, sink]) => + S.effectAsyncM( + (cb) => + T.zipRight_( + T.succeedWith(() => { + st.on("data", (chunk) => { + cb(T.succeed(Byte.chunk(chunk))) + }) + st.on("error", (err) => { + cb(T.fail(O.some(new TransformError(err)))) + }) + st.on("end", () => { + cb(T.fail(O.none)) + }) + }), + S.run_(stream, sink) + ) + ) + ) + ) + } +} /** - * A sink that collects all of its inputs into a `Buffer`. + * A sink that collects all of its inputs into an array. */ -export function collectBuffer(): Sink.Sink< - unknown, - E, - Byte.Byte, - E, - unknown, - Buffer -> { +export function collectBuffer(): Sink.Sink { return Sink.map_( - Sink.foldLeftChunks(C.empty(), (s, i: C.Chunk) => + Sink.reduceLeftChunks(C.empty())((s, i: C.Chunk) => C.concat_(s, i) ), Byte.buffer diff --git a/packages/node/test/stream.experimental.test.ts b/packages/node/test/stream.experimental.test.ts new file mode 100644 index 0000000..f6c39f9 --- /dev/null +++ b/packages/node/test/stream.experimental.test.ts @@ -0,0 +1,54 @@ +import * as C from "@effect-ts/core/Collections/Immutable/Chunk" +import * as T from "@effect-ts/core/Effect" +import * as S from "@effect-ts/core/Effect/Experimental/Stream" +import * as Ref from "@effect-ts/core/Effect/Ref" +import { pipe } from "@effect-ts/core/Function" +import * as fs from "fs" +import * as path from "path" +import * as stream from "stream" + +import * as Byte from "../src/Byte" +import * as NS from "../src/Stream/Experimental" + +describe("Node Stream", () => { + it("should build an Effect-TS Stream from a NodeJS stream.Readable", async () => { + const res = await pipe( + NS.streamFromReadable(() => + fs.createReadStream(path.join(__dirname, "fix/data.txt")) + ), + NS.runBuffer, + T.runPromise + ) + + expect(res.toString("utf-8")).toEqual("a, b, c") + }) + + it("should build an Effect-TS Sink from a NodeJS stream.Writable", async () => { + const mockStream = new stream.PassThrough() + let output: C.Chunk = C.empty() + + mockStream.on("data", (chunk) => { + output = C.concat_(output, Byte.chunk(chunk)) + }) + + const res = await pipe( + T.do, + T.bind("bytesWritten", () => + pipe( + Ref.makeRef(0), + T.map((ref) => + S.repeatEffect(T.delay(10)(Ref.updateAndGet_(ref, (n) => n + 1))) + ), + S.unwrap, + S.take(5), + S.map(Byte.byte), + S.run(NS.sinkFromWritable(() => mockStream)) + ) + ), + T.runPromise + ) + + expect(res.bytesWritten).toEqual(5) + expect(C.toArray(output)).toEqual([1, 2, 3, 4, 5]) + }) +}) diff --git a/packages/node/test/stream.test.ts b/packages/node/test/stream.test.ts index b6c0eff..5a4928d 100644 --- a/packages/node/test/stream.test.ts +++ b/packages/node/test/stream.test.ts @@ -1,18 +1,15 @@ -import * as C from "@effect-ts/core/Collections/Immutable/Chunk" import * as T from "@effect-ts/core/Effect" -import * as S from "@effect-ts/core/Effect/Experimental/Stream" -import * as Ref from "@effect-ts/core/Effect/Ref" +import * as S from "@effect-ts/core/Effect/Stream" import { pipe } from "@effect-ts/core/Function" import * as fs from "fs" import * as path from "path" -import * as stream from "stream" +import * as zlib from "zlib" -// import * as zlib from "zlib" import * as Byte from "../src/Byte" import * as NS from "../src/Stream" describe("Node Stream", () => { - it("should build an Effect-TS Stream from a NodeJS stream.Readable", async () => { + it("build from readable", async () => { const res = await pipe( NS.streamFromReadable(() => fs.createReadStream(path.join(__dirname, "fix/data.txt")) @@ -23,48 +20,19 @@ describe("Node Stream", () => { expect(res.toString("utf-8")).toEqual("a, b, c") }) - - it("should build an Effect-TS Sink from a NodeJS stream.Writable", async () => { - const mockStream = new stream.PassThrough() - let output: C.Chunk = C.empty() - - mockStream.on("data", (chunk) => { - output = C.concat_(output, Byte.chunk(chunk)) - }) - + it("transform (gzip/gunzip)", async () => { const res = await pipe( - T.do, - T.bind("bytesWritten", () => - pipe( - Ref.makeRef(0), - T.map((ref) => - S.repeatEffect(T.delay(10)(Ref.updateAndGet_(ref, (n) => n + 1))) - ), - S.unwrap, - S.take(5), - S.map(Byte.byte), - S.run(NS.sinkFromWritable(() => mockStream)) - ) + NS.streamFromReadable(() => + fs.createReadStream(path.join(__dirname, "fix/data.txt")) + ), + NS.transform(zlib.createGzip), + NS.runBuffer, + T.chain((x) => + pipe(Byte.chunk(x), S.fromChunk, NS.transform(zlib.createGunzip), NS.runBuffer) ), T.runPromise ) - expect(res.bytesWritten).toEqual(5) - expect(C.toArray(output)).toEqual([1, 2, 3, 4, 5]) + expect(res.toString("utf-8")).toEqual("a, b, c") }) - // it("transform (gzip/gunzip)", async () => { - // const res = await pipe( - // NS.streamFromReadable(() => - // fs.createReadStream(path.join(__dirname, "fix/data.txt")) - // ), - // NS.transform(zlib.createGzip), - // NS.runBuffer, - // T.chain((x) => - // pipe(S.fromChunk(Byte.chunk(x)), NS.transform(zlib.createGunzip), NS.runBuffer) - // ), - // T.runPromise - // ) - - // expect(res.toString("utf-8")).toEqual("a, b, c") - // }) }) From dd2eea38bde5623ff9d6a40039087d9d4094fc6f Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Mon, 13 Dec 2021 19:53:04 -0500 Subject: [PATCH 3/4] feat(stream): add experimental stream helpers --- packages/node/src/Stream/Experimental/index.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/node/src/Stream/Experimental/index.ts b/packages/node/src/Stream/Experimental/index.ts index 2f3752b..511e8f8 100644 --- a/packages/node/src/Stream/Experimental/index.ts +++ b/packages/node/src/Stream/Experimental/index.ts @@ -18,13 +18,14 @@ export class ReadableError { } /** - * Captures a Node `Readable`, converting it into a `Stream`, + * Captures a Node `Readable`, converting it into a `Stream`. The size * * Note: your Readable should not have an encoding set in order to work with buffers, * calling this with a Readable with an encoding set will `Die`. */ export function streamFromReadable( - r: () => stream.Readable + r: () => stream.Readable, + bufferSize: number = S.DEFAULT_CHUNK_SIZE ): S.Stream { return pipe( T.succeedWith(r), @@ -42,8 +43,12 @@ export function streamFromReadable( ), S.chain((sr) => S.async((emit) => { - sr.on("data", (data) => { - emit.chunk(Byte.chunk(data)) + sr.on("readable", () => { + let buffer: Buffer | null = null + + while ((buffer = sr.read(bufferSize)) !== null) { + emit.chunk(Byte.chunk(buffer)) + } }) sr.on("end", () => { emit.end() From 5fc8ef7d74432e58c96abc496f4007bf119ffc59 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 14 Dec 2021 09:32:50 -0500 Subject: [PATCH 4/4] chore: re-implement collectBuffer in terms of Sink.collectAll --- packages/node/src/Stream/Experimental/index.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/node/src/Stream/Experimental/index.ts b/packages/node/src/Stream/Experimental/index.ts index 511e8f8..6a2fae0 100644 --- a/packages/node/src/Stream/Experimental/index.ts +++ b/packages/node/src/Stream/Experimental/index.ts @@ -2,7 +2,7 @@ * ets_tracing: off */ -import * as C from "@effect-ts/core/Collections/Immutable/Chunk" +import type * as C from "@effect-ts/core/Collections/Immutable/Chunk" import * as T from "@effect-ts/core/Effect" import * as S from "@effect-ts/core/Effect/Experimental/Stream" import * as Sink from "@effect-ts/core/Effect/Experimental/Stream/Sink" @@ -116,12 +116,7 @@ export function collectBuffer(): Sink.Sink< unknown, Buffer > { - return Sink.map_( - Sink.foldLeftChunks(C.empty(), (s, i: C.Chunk) => - C.concat_(s, i) - ), - Byte.buffer - ) + return Sink.map_(Sink.collectAll(), Byte.buffer) } /**