From 357541239dd322ee434f2bc9ec1af0e7da22a881 Mon Sep 17 00:00:00 2001 From: faseng Date: Wed, 3 Sep 2025 18:26:33 +0800 Subject: [PATCH] Enhance message parsing to preserve newlines from empty data fields and update type for ReadableStream result. Add a test case to verify the new behavior. --- src/parse.spec.ts | 28 ++++++++++++++++++++++++++++ src/parse.ts | 14 ++++++++------ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/parse.spec.ts b/src/parse.spec.ts index 599c546..f5c4ea3 100644 --- a/src/parse.spec.ts +++ b/src/parse.spec.ts @@ -349,5 +349,33 @@ describe('parse', () => { expect(idsIdx).toBe(2); expect(msgNum).toBe(1); }); + + it('should preserve newlines from empty data fields', () => { + // arrange: + let msgNum = 0; + const next = parse.getMessages(_id => { + fail('id should not be called'); + }, _retry => { + fail('retry should not be called'); + }, msg => { + ++msgNum; + expect(msg).toEqual({ + data: '\n\nfoo', + id: '', + event: 'message', + retry: undefined, + }); + }); + + // act: + next(encoder.encode('event: message'), 5); + next(encoder.encode('data:'), 4); + next(encoder.encode('data:'), 4); + next(encoder.encode('data: foo'), 4); + next(encoder.encode(''), -1); + + // assert: + expect(msgNum).toBe(1); + }); }); }); diff --git a/src/parse.ts b/src/parse.ts index c13e3c4..489ff99 100644 --- a/src/parse.ts +++ b/src/parse.ts @@ -21,7 +21,7 @@ export interface EventSourceMessage { */ export async function getBytes(stream: ReadableStream, onChunk: (arr: Uint8Array) => void) { const reader = stream.getReader(); - let result: ReadableStreamDefaultReadResult; + let result: ReadableStreamReadResult; while (!(result = await reader.read()).done) { onChunk(result.value); } @@ -127,6 +127,11 @@ export function getMessages( // return a function that can process each incoming line buffer: return function onLine(line: Uint8Array, fieldLength: number) { if (line.length === 0) { + // If the data buffer's last character is a U+000A LINE FEED (LF) character, then remove the last character from the data buffer. + if (message.data.endsWith('\n')) { + message.data = message.data.substring(0, message.data.length - 1); + } + // empty line denotes end of message. Trigger the callback and start a new message: onMessage?.(message); message = newMessage(); @@ -139,11 +144,8 @@ export function getMessages( switch (field) { case 'data': - // if this message already has data, append the new value to the old. - // otherwise, just set to the new value: - message.data = message.data - ? message.data + '\n' + value - : value; // otherwise, + // Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer. + message.data = message.data + value + '\n'; break; case 'event': message.event = value;