1
1
import { ReadStream , createReadStream } from "fs" ;
2
2
import { once } from "events" ;
3
3
import type { ElementType , ReadStreamOptions } from "../index.types" ;
4
- import { CHARACTER } from "../constants" ;
4
+ import { CHARACTER , ERRORS } from "../constants" ;
5
5
6
6
class JsonArrayStreamer < T > {
7
7
private readStream : ReadStream | null ;
@@ -41,6 +41,15 @@ class JsonArrayStreamer<T> {
41
41
}
42
42
}
43
43
44
+ private getParsedElement < T > ( ) {
45
+ try {
46
+ const element : T = JSON . parse ( this . chunkBuffer ) ;
47
+ return element ;
48
+ } catch ( error ) {
49
+ throw new Error ( ERRORS . INVALID_ELEMENT ( this . chunkBuffer ) ) ;
50
+ }
51
+ }
52
+
44
53
private addToResult ( element : T , filter ?: ( element : T ) => boolean ) {
45
54
if ( ! filter ) {
46
55
this . resultBuffer . push ( element ) ;
@@ -66,7 +75,7 @@ class JsonArrayStreamer<T> {
66
75
67
76
if ( char === CHARACTER . QUOTE ) {
68
77
if ( this . isCharInsideQuotes && ! this . isCharEscaped ) {
69
- const element : T = JSON . parse ( this . chunkBuffer ) ;
78
+ const element = < T > this . getParsedElement ( ) ;
70
79
this . addToResult ( element , filter ) ;
71
80
this . resetParser ( ) ;
72
81
} else if ( this . chunkBuffer === CHARACTER . QUOTE ) {
@@ -85,8 +94,8 @@ class JsonArrayStreamer<T> {
85
94
char : string ,
86
95
filter ?: ( element : T ) => boolean
87
96
) {
88
- if ( [ CHARACTER . COMMA , CHARACTER . BRACKET . CLOSE ] . includes ( char ) ) {
89
- const element : T = JSON . parse ( this . chunkBuffer ) ;
97
+ if ( char === CHARACTER . COMMA ) {
98
+ const element = < T > this . getParsedElement ( ) ;
90
99
this . addToResult ( element , filter ) ;
91
100
this . resetParser ( ) ;
92
101
} else {
@@ -109,7 +118,7 @@ class JsonArrayStreamer<T> {
109
118
this . elementEnclosureCount -= 1 ;
110
119
111
120
if ( this . elementEnclosureCount === 0 ) {
112
- const element : T = JSON . parse ( this . chunkBuffer ) ;
121
+ const element = < T > this . getParsedElement ( ) ;
113
122
this . addToResult ( element , filter ) ;
114
123
this . resetParser ( ) ;
115
124
}
@@ -127,19 +136,29 @@ class JsonArrayStreamer<T> {
127
136
}
128
137
129
138
public async * stream ( chunkSize : number , filter ?: ( element : T ) => boolean ) {
130
- for await ( const chunk of this . chunkGenerator ( ) ) {
139
+ characterStream: for await ( const chunk of this . chunkGenerator ( ) ) {
131
140
for ( let char of chunk ) {
132
141
if ( ! this . rootDetected ) {
142
+ if (
143
+ ! [
144
+ CHARACTER . SPACE ,
145
+ CHARACTER . NEW_LINE ,
146
+ CHARACTER . BRACKET . OPEN ,
147
+ ] . includes ( char )
148
+ )
149
+ throw new Error ( ERRORS . INVALID_FILE ) ;
150
+
133
151
this . rootDetected = char === CHARACTER . BRACKET . OPEN ;
134
152
continue ;
135
153
}
136
154
137
155
if ( ! this . elementDetected ) {
156
+ if ( char === CHARACTER . BRACKET . CLOSE ) break characterStream;
157
+
138
158
this . elementDetected = ! [
139
159
CHARACTER . SPACE ,
140
160
CHARACTER . COMMA ,
141
161
CHARACTER . NEW_LINE ,
142
- CHARACTER . BRACKET . CLOSE ,
143
162
] . includes ( char ) ;
144
163
}
145
164
@@ -158,6 +177,11 @@ class JsonArrayStreamer<T> {
158
177
this . elementType = "others" ;
159
178
this . elementParser = this . primitiveElementParser ;
160
179
}
180
+ } else if (
181
+ this . elementParser === this . primitiveElementParser &&
182
+ char === CHARACTER . BRACKET . CLOSE
183
+ ) {
184
+ break characterStream;
161
185
}
162
186
163
187
this . elementParser ( char , filter ) ;
@@ -175,7 +199,7 @@ class JsonArrayStreamer<T> {
175
199
this . readStream = null ;
176
200
177
201
if ( this . chunkBuffer . length ) {
178
- const element : T = JSON . parse ( this . chunkBuffer ) ;
202
+ const element = < T > this . getParsedElement ( ) ;
179
203
this . addToResult ( element , filter ) ;
180
204
this . resetParser ( ) ;
181
205
}
0 commit comments