Skip to content

Commit 7ac724a

Browse files
committed
pipefs: Enable pipes to notify readiness
The poll method of PIPEFS receives a notification callback from the caller. PIPEFS notifies the caller when the fd becomes readable using that callback. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent 5318eb8 commit 7ac724a

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

src/lib/libpipefs.js

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,19 @@ addToLibrary({
2121
// able to read from the read end after write end is closed.
2222
refcnt : 2,
2323
timestamp: new Date(),
24+
readableHandlers: [],
25+
registerReadableHanlders: (notifyCallback) => {
26+
if (notifyCallback == null) return;
27+
notifyCallback.registerCleanupFunc(() => {
28+
const i = pipe.readableHandlers.indexOf(notifyCallback);
29+
if (i !== -1) pipe.readableHandlers.splice(i, 1);
30+
});
31+
pipe.readableHandlers.push(notifyCallback);
32+
},
33+
notifyReadableHanders: () => {
34+
pipe.readableHandlers.forEach(cb => cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}));
35+
pipe.readableHandlers = [];
36+
}
2437
};
2538

2639
pipe.buckets.push({
@@ -80,7 +93,7 @@ addToLibrary({
8093
blocks: 0,
8194
};
8295
},
83-
poll(stream) {
96+
poll(stream, timeout, notifyCallback) {
8497
var pipe = stream.node.pipe;
8598

8699
if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
@@ -92,6 +105,7 @@ addToLibrary({
92105
}
93106
}
94107

108+
pipe.registerReadableHanlders(notifyCallback);
95109
return 0;
96110
},
97111
dup(stream) {
@@ -204,6 +218,7 @@ addToLibrary({
204218
if (freeBytesInCurrBuffer >= dataLen) {
205219
currBucket.buffer.set(data, currBucket.offset);
206220
currBucket.offset += dataLen;
221+
pipe.notifyReadableHanders();
207222
return dataLen;
208223
} else if (freeBytesInCurrBuffer > 0) {
209224
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
@@ -235,6 +250,7 @@ addToLibrary({
235250
newBucket.buffer.set(data);
236251
}
237252

253+
pipe.notifyReadableHanders();
238254
return dataLen;
239255
},
240256
close(stream) {

0 commit comments

Comments
 (0)