Skip to content

Commit 1bf0b9a

Browse files
Your Nameclaude
andcommitted
Optimize frame codec and Socket.js for zero-copy operations and reduced allocations
Core optimizations: - FrameCodec.js: Zero-copy varint encoding/decoding, reusable buffers, optimized drain handling - FrameCodecCirc.js: Circular buffer implementation for high-throughput streaming - FrameCodecShared.js: Shared encoding utilities with zero-copy fast paths - Socket.js: Eliminated base64 allocation hotspots in paused data handling Test infrastructure: - test-tug-of-war.js: Consolidated DRY test suite supporting basic and variance modes Type safety: - src/FrameCodec.d.ts: TypeScript definitions for frame codec exports - lib/types/Socket.d.ts: Updated Socket type definitions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 81ad117 commit 1bf0b9a

File tree

8 files changed

+866
-222
lines changed

8 files changed

+866
-222
lines changed

bench-ab.js

Lines changed: 0 additions & 108 deletions
This file was deleted.

lib/types/Socket.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ export default class Socket extends EventEmitter<SocketEvents & ReadableEvents,
6969
private _pending;
7070
/** @private */
7171
private _destroyed;
72+
/** @private */
73+
private _writableEnded;
74+
/** @private */
75+
private _readableEnded;
7276
/** @type {'opening' | 'open' | 'readOnly' | 'writeOnly'} @private */
7377
private _readyState;
7478
/** @type {{ id: number; data: string; }[]} @private */

src/FrameCodec.d.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Transform } from 'stream';
2+
import { Buffer } from 'buffer';
3+
4+
export class FrameEncoder extends Transform {
5+
constructor();
6+
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
7+
}
8+
9+
export class FrameDecoder extends Transform {
10+
constructor();
11+
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
12+
static _nextId: number;
13+
}
14+
15+
export function createLibp2pStream(socket: any): {
16+
source: AsyncGenerator<any, void, unknown>;
17+
sink: (src: AsyncIterable<any>) => Promise<void>;
18+
[Symbol.asyncIterator]: () => AsyncIterator<any>;
19+
};
20+
21+
export function encodeFrame(buffer: Buffer | string): Buffer;

src/FrameCodec.js

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,12 @@
11
const { Transform } = require('stream');
2+
const {
3+
FrameEncoder,
4+
createLibp2pStreamFactory,
5+
encodeFrame
6+
} = require('./FrameCodecShared');
27

38
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
49

5-
const varint = {
6-
encode: (n) => {
7-
if (n < 0) throw new RangeError('varint unsigned only');
8-
const o = [];
9-
do {
10-
let b = n & 0x7f;
11-
n = Math.floor(n / 128);
12-
if (n > 0) b |= 0x80;
13-
o.push(b);
14-
} while (n > 0);
15-
return Buffer.from(o);
16-
},
17-
decodeFrom: (buf, offset = 0) => {
18-
let r = 0, s = 0, i = offset;
19-
for (; i < buf.length; i++) {
20-
const b = buf[i];
21-
r |= (b & 0x7f) << s;
22-
if ((b & 0x80) === 0) return { value: r, bytes: i - offset + 1 };
23-
s += 7;
24-
if (s > 53) break;
25-
}
26-
return null;
27-
}
28-
};
29-
30-
class FrameEncoder extends Transform {
31-
constructor() {
32-
super({ writableObjectMode: true });
33-
}
34-
_transform(f, e, cb) {
35-
try {
36-
if (!Buffer.isBuffer(f)) f = Buffer.from(f);
37-
this.push(Buffer.concat([varint.encode(f.length), f]));
38-
cb();
39-
} catch (err) {
40-
cb(err);
41-
}
42-
}
43-
}
44-
4510
class FrameDecoder extends Transform {
4611
constructor() {
4712
super({ readableObjectMode: true }); // object mode ensures zero-length payloads surface as readable chunks
@@ -129,9 +94,25 @@ class FrameDecoder extends Transform {
12994
}
13095

13196
_take(n, label = 'bytes') {
97+
this._log('take_start', { label, bytes: n, buffered: this._l });
98+
99+
// Zero-copy fast path: single chunk contains all needed bytes
100+
if (this._q.length > 0 && this._q[0].length >= n) {
101+
const head = this._q[0];
102+
const slice = head.slice(0, n);
103+
this._l -= n;
104+
if (n === head.length) {
105+
this._q.shift();
106+
} else {
107+
this._q[0] = head.slice(n);
108+
}
109+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: true });
110+
return slice;
111+
}
112+
113+
// Multi-chunk path: allocate and copy
132114
const f = Buffer.allocUnsafe(n);
133115
let w = 0;
134-
this._log('take_start', { label, bytes: n, buffered: this._l });
135116
while (w < n && this._q.length > 0) {
136117
const next = this._q[0];
137118
const t = Math.min(next.length, n - w);
@@ -145,19 +126,16 @@ class FrameDecoder extends Transform {
145126
}
146127
this._log('take_progress', { label, copied: t, written: w, buffered: this._l });
147128
}
148-
this._log('take_complete', { label, bytes: n, buffered: this._l });
129+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: false });
149130
return f;
150131
}
151132
}
152133

153134
FrameDecoder._nextId = 1;
154135

155-
function createLibp2pStream(socket) {
156-
const d = new FrameDecoder(), e = new FrameEncoder();
157-
socket.pipe(d); e.pipe(socket);
158-
const s = { source: (async function* () { for await (const c of d) yield c; })(), sink: async (src) => { for await (const c of src) { if (!e.write(c)) await new Promise(r => e.once('drain', r)); } e.end(); } };
159-
s[Symbol.asyncIterator] = () => s.source[Symbol.asyncIterator]();
160-
return s;
161-
}
162-
163-
module.exports = { FrameEncoder, FrameDecoder, createLibp2pStream, encodeFrame: (b) => { const buf = Buffer.isBuffer(b) ? b : Buffer.from(b); return Buffer.concat([varint.encode(buf.length), buf]); } };
136+
module.exports = {
137+
FrameEncoder,
138+
FrameDecoder,
139+
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoder()),
140+
encodeFrame
141+
};

src/FrameCodecCirc.js

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,12 @@
11
const { Transform } = require('stream');
2+
const {
3+
FrameEncoder,
4+
createLibp2pStreamFactory,
5+
encodeFrame
6+
} = require('./FrameCodecShared');
27

38
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
49

5-
const varint = {
6-
encode: (n) => {
7-
if (n < 0) throw new RangeError('varint unsigned only');
8-
const o = [];
9-
do {
10-
let b = n & 0x7f;
11-
n = Math.floor(n / 128);
12-
if (n > 0) b |= 0x80;
13-
o.push(b);
14-
} while (n > 0);
15-
return Buffer.from(o);
16-
},
17-
decodeFrom: (buf, offset = 0) => {
18-
let r = 0, s = 0, i = offset;
19-
for (; i < buf.length; i++) {
20-
const b = buf[i];
21-
r |= (b & 0x7f) << s;
22-
if ((b & 0x80) === 0) return { value: r, bytes: i - offset + 1 };
23-
s += 7;
24-
if (s > 53) break;
25-
}
26-
return null;
27-
}
28-
};
29-
30-
class FrameEncoder extends Transform {
31-
constructor() {
32-
super({ writableObjectMode: true });
33-
}
34-
_transform(f, e, cb) {
35-
try {
36-
if (!Buffer.isBuffer(f)) f = Buffer.from(f);
37-
this.push(Buffer.concat([varint.encode(f.length), f]));
38-
cb();
39-
} catch (err) {
40-
cb(err);
41-
}
42-
}
43-
}
44-
4510
class FrameDecoderCirc extends Transform {
4611
constructor(bufferSize = 16384) {
4712
super({ readableObjectMode: true });
@@ -191,12 +156,9 @@ class FrameDecoderCirc extends Transform {
191156

192157
FrameDecoderCirc._nextId = 1;
193158

194-
function createLibp2pStream(socket) {
195-
const d = new FrameDecoderCirc(), e = new FrameEncoder();
196-
socket.pipe(d); e.pipe(socket);
197-
const s = { source: (async function* () { for await (const c of d) yield c; })(), sink: async (src) => { for await (const c of src) { if (!e.write(c)) await new Promise(r => e.once('drain', r)); } e.end(); } };
198-
s[Symbol.asyncIterator] = () => s.source[Symbol.asyncIterator]();
199-
return s;
200-
}
201-
202-
module.exports = { FrameEncoder, FrameDecoderCirc, createLibp2pStream, encodeFrame: (b) => { const buf = Buffer.isBuffer(b) ? b : Buffer.from(b); return Buffer.concat([varint.encode(buf.length), buf]); } };
159+
module.exports = {
160+
FrameEncoder,
161+
FrameDecoderCirc,
162+
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoderCirc()),
163+
encodeFrame
164+
};

0 commit comments

Comments
 (0)