-
-
Notifications
You must be signed in to change notification settings - Fork 51
StreamBase
All provided streamers use StreamBase
as its foundation. It is a base class meant to be extended, which provides common facilities used by streamers.
StreamBase
is based on Transform. It operates in object mode consuming a token stream produced by a parser or filters, which will be transformed into a stream of partially assembled JavaScript objects.
This document describes the user-facing interface only. If you want to build your own filter, feel free to inspect the code to gain more insights.
Internally StreamBase
uses Assembler to assemble objects and to keep track of a current state.
options
is an optional object described in details in node.js' Stream documentation. Additionally, the following optional custom properties are recognized:
-
objectFilter
is an optional function.- If specified, it is used to inspect incomplete objects as they are being assembled.
- It is called in the context of a streamer with one argument: an internal Assembler instance.
- A filter function can return three values:
- Truthy. This response means that we should include this object in an output stream. The object will be assembled from an input stream without further checks.
-
false
. This response means that we should not output this object. A partially assembled object will be discarded, and the rest of the object will be ignored without further checks. - Anything else (usually
undefined
) signifies that the filter has not enough information to make a decision, the next token should be processed, and the filter will be called again on the same object.
- If not specified (the default), all objects are going to be included in an output stream.
-
includeUndecided
is a flag. It controls how to handle objects, which were not decided one way or another.- If it is truthy, an undecided object (already fully assembled) will be included in the output.
- Otherwise (the default), an undecided object will be discarded.
objectFilter
and token stream filters serve a different purpose. The latter edit a token stream, while the former governs assembling individual already selected values. In the end, objectFilter
is an optimization feature.
For example, we want to read in all employee records and select only employees whose department is "accounting".
A simple way (preferred in some cases) is to use stream-chain facilities:
const {chain} = require('stream-chain');
const {parser} = require('stream-json');
const {streamArray} = require('stream-json/streamers/StreamArray');
const fs = require('fs');
const zlib = require('zlib');
const pipeline = chain([
fs.createReadStream('sample.json.gz'),
zlib.createGunzip(),
parser(),
streamArray(),
data => {
if (data.value.department === 'accounting') return data;
// return undefined by default skipping this item
}
]);
Potentially a more efficient way to do the same while assembling objects:
const {chain} = require('stream-chain');
const {parser} = require('stream-json');
const {streamArray} = require('stream-json/streamers/StreamArray');
const fs = require('fs');
const zlib = require('zlib');
const pipeline = chain([
fs.createReadStream('sample.json.gz'),
zlib.createGunzip(),
parser(),
streamArray({objectFilter: asm => {
const value = asm.current; // the value we are working on
// the value can be incomplete, check if we have necessary properties
if (value && typeof value.department == 'string') {
// we have 'department' value and can make the final decision now
return value.department === 'accounting';
// depending on the return value above we made a final decision:
// we accepted or rejected an object,
// now it will be speedily assembled or skipped.
}
// return undefined by default meaning "we are undecided yet"
}})
]);
Now an object can be discarded before being fully assembled. It can save some CPU and memory. OTOH, in the first case our filter is called once per object, while in the second case it can be called multiple times. So there is a trade-off, which should be decided on a case-by-case basis possibly using trials.