@@ -32,7 +32,7 @@ CSVScanner::CSVScanner(peloton::type::AbstractPool &pool,
32
32
file_path_ (file_path),
33
33
file_(),
34
34
buffer_(nullptr ),
35
- buffer_begin_ (0 ),
35
+ buffer_pos_ (0 ),
36
36
buffer_end_(0 ),
37
37
line_(nullptr ),
38
38
line_len_(0 ),
@@ -59,12 +59,17 @@ CSVScanner::CSVScanner(peloton::type::AbstractPool &pool,
59
59
CSVScanner::~CSVScanner () {
60
60
if (buffer_ != nullptr ) {
61
61
memory_.Free (buffer_);
62
+ buffer_ = nullptr ;
62
63
}
64
+
63
65
if (line_ != nullptr ) {
64
66
memory_.Free (line_);
67
+ line_ = nullptr ;
65
68
}
69
+
66
70
if (cols_ != nullptr ) {
67
71
memory_.Free (cols_);
72
+ cols_ = nullptr ;
68
73
}
69
74
}
70
75
@@ -90,21 +95,22 @@ void CSVScanner::Produce() {
90
95
Initialize ();
91
96
92
97
// Loop lines
93
- while (const char *line = NextLine ()) {
98
+ while (char *line = NextLine ()) {
94
99
ProduceCSV (line);
95
100
}
96
101
}
97
102
98
103
void CSVScanner::Initialize () {
99
104
// Let's first perform a few validity checks
100
- boost::filesystem::path path{ file_path_} ;
105
+ boost::filesystem::path path ( file_path_) ;
101
106
102
107
if (!boost::filesystem::exists (path)) {
103
- throw ExecutorException{ StringUtil::Format (" input path '%s' does not exist" ,
104
- file_path_.c_str ())} ;
108
+ throw ExecutorException ( StringUtil::Format (" input path '%s' does not exist" ,
109
+ file_path_.c_str ())) ;
105
110
} else if (!boost::filesystem::is_regular_file (file_path_)) {
106
- throw ExecutorException{
107
- StringUtil::Format (" unable to read file '%s'" , file_path_.c_str ())};
111
+ auto msg =
112
+ StringUtil::Format (" unable to read file '%s'" , file_path_.c_str ());
113
+ throw ExecutorException (msg);
108
114
}
109
115
110
116
// The path looks okay, let's try opening it
@@ -125,7 +131,7 @@ void CSVScanner::Initialize() {
125
131
126
132
bool CSVScanner::NextBuffer () {
127
133
// Do read
128
- buffer_begin_ = 0 ;
134
+ buffer_pos_ = 0 ;
129
135
buffer_end_ = static_cast <uint32_t >(file_.Read (buffer_, kDefaultBufferSize ));
130
136
131
137
// Update stats
@@ -134,7 +140,9 @@ bool CSVScanner::NextBuffer() {
134
140
return (buffer_end_ != 0 );
135
141
}
136
142
137
- void CSVScanner::AppendToCurrentLine (const char *data, uint32_t len) {
143
+ void CSVScanner::AppendToLineBuffer (const char *data, uint32_t len) {
144
+ PELOTON_ASSERT (len > 0 );
145
+
138
146
// Short-circuit if we're not appending any data
139
147
if (len == 0 ) {
140
148
return ;
@@ -146,7 +154,7 @@ void CSVScanner::AppendToCurrentLine(const char *data, uint32_t len) {
146
154
const auto msg = StringUtil::Format (
147
155
" Line %u in file '%s' exceeds maximum line length: %lu" ,
148
156
line_number_ + 1 , file_path_.c_str (), kMaxAllocSize );
149
- throw Exception{ msg} ;
157
+ throw Exception ( msg) ;
150
158
}
151
159
152
160
// The current line buffer isn't large enough to store the new bytes, so we
@@ -186,41 +194,44 @@ void CSVScanner::AppendToCurrentLine(const char *data, uint32_t len) {
186
194
stats_.num_copies ++;
187
195
}
188
196
189
- // The main purpose of this function is to find the start of the next line in
190
- // the CSV file.
191
- const char *CSVScanner::NextLine () {
197
+ // The objective of this function is to find a complete line in the CSV file.
198
+ // The returned value will be a valid pointer to a null-terminated string that
199
+ // is the next line in the CSV to be processed.
200
+ char *CSVScanner::NextLine () {
192
201
line_len_ = 0 ;
193
202
203
+ const char quote = quote_;
204
+ const char escape = (quote_ == escape_ ? static_cast <char >(' \0 ' ) : escape_);
205
+
194
206
bool in_quote = false ;
195
207
bool last_was_escape = false ;
196
- bool copied_to_line_buf = false ;
197
208
198
- uint32_t line_end = buffer_begin_;
199
-
200
- char quote = quote_;
201
- char escape = (quote_ == escape_ ? static_cast <char >(' \0 ' ) : escape_);
209
+ const char *buf = buffer_;
210
+ uint32_t curr_buffer_pos = buffer_pos_;
202
211
203
212
while (true ) {
204
- if (line_end > = buffer_end_) {
213
+ if (curr_buffer_pos = = buffer_end_) {
205
214
// We need to read more data from the CSV file. But first, we need to copy
206
215
// all the data in the read-buffer (i.e., [buffer_begin_, buffer_end_] to
207
216
// the line-buffer.
217
+ if (buffer_pos_ < curr_buffer_pos) {
218
+ AppendToLineBuffer (buffer_ + buffer_pos_,
219
+ curr_buffer_pos - buffer_pos_);
220
+ buffer_pos_ = curr_buffer_pos;
221
+ }
208
222
209
- AppendToCurrentLine (buffer_ + buffer_begin_,
210
- static_cast < uint32_t >(buffer_end_ - buffer_begin_)) ;
223
+ // Reset positions
224
+ curr_buffer_pos = 0 ;
211
225
212
226
// Now, read more data
213
227
if (!NextBuffer ()) {
214
- return nullptr ;
228
+ // We hit en EOF
229
+ break ;
215
230
}
216
-
217
- // Reset positions
218
- line_end = buffer_begin_;
219
- copied_to_line_buf = true ;
220
231
}
221
232
222
233
// Read character
223
- char c = buffer_[line_end ];
234
+ char c = buf[curr_buffer_pos++ ];
224
235
225
236
if (in_quote && c == escape) {
226
237
last_was_escape = !last_was_escape;
@@ -235,47 +246,120 @@ const char *CSVScanner::NextLine() {
235
246
// Process the new-line character. If we a new-line and we're not currently
236
247
// in a quoted section, we're done.
237
248
if (c == ' \n ' && !in_quote) {
238
- buffer_[line_end] = ' \0 ' ;
239
249
break ;
240
250
}
251
+ }
241
252
242
- // Move along
243
- line_end++;
253
+ // Flush remaining valid bytes
254
+ if (buffer_pos_ < curr_buffer_pos) {
255
+ AppendToLineBuffer (buffer_ + buffer_pos_, curr_buffer_pos - buffer_pos_);
256
+ buffer_pos_ = curr_buffer_pos;
244
257
}
245
258
246
259
// Increment line number
247
260
line_number_++;
248
261
249
- if (copied_to_line_buf) {
250
- AppendToCurrentLine (buffer_, line_end);
251
- buffer_begin_ = line_end + 1 ;
252
- return line_;
253
- } else {
254
- const char *ret = buffer_ + buffer_begin_;
255
- buffer_begin_ = line_end + 1 ;
256
- return ret;
262
+ // If we didn't transfer any bytes to the line buffer, we must have reached an
263
+ // EOF. If so, return null indicating there are no more lines.
264
+ if (line_len_ == 0 ) {
265
+ return nullptr ;
257
266
}
267
+
268
+ // A full line has been transferred to the line buffer, but we also copied the
269
+ // newline character. Strip it off now.
270
+ line_len_--;
271
+ line_[line_len_] = ' \0 ' ;
272
+
273
+ // Done
274
+ return line_;
258
275
}
259
276
260
- void CSVScanner::ProduceCSV (const char *line) {
261
- // At this point, we have a well-formed line. Let's pull out pointers to the
262
- // columns.
277
+ void CSVScanner::ProduceCSV (char *line) {
278
+ const char delimiter = delimiter_;
279
+ const char quote = quote_;
280
+ const char escape = escape_;
263
281
264
- const auto *iter = line;
265
- for (uint32_t col_idx = 0 ; col_idx < num_cols_; col_idx++) {
266
- // Start points to the beginning of the column's data value
267
- const char *start = iter;
282
+ // The iterator over characters in the line
283
+ char *iter = line;
268
284
269
- // Eat text until the next delimiter
270
- while (*iter != 0 && *iter != delimiter_) {
271
- iter++;
285
+ for (uint32_t col_idx = 0 ; col_idx < num_cols_; col_idx++) {
286
+ char *col_begin = iter;
287
+ char *col_end = nullptr ;
288
+
289
+ // We need to move col_end to the end of the column's data. Along the way,
290
+ // we may need to shift data down due to quotes and escapes. Inspired by
291
+ // Postgres.
292
+ {
293
+ char *out = col_begin;
294
+ while (true ) {
295
+ // This first loop looks for either the delimiter character or the end
296
+ // of the line, indicating the end of a columns data. It breaks out of
297
+ // the loop if a quote character is found. It flows into a second loop
298
+ // whose only purpose is to find the end of the quoted section.
299
+ while (true ) {
300
+ char c = *iter++;
301
+
302
+ // If we see the delimiter character, or the end of the string,
303
+ // finish
304
+ if (c == delimiter || c == ' \0 ' ) {
305
+ col_end = out;
306
+ iter--;
307
+ goto colend;
308
+ }
309
+
310
+ // If we see a quote character, move to the second loop to find the
311
+ // closing quote.
312
+ if (c == quote) {
313
+ break ;
314
+ }
315
+
316
+ *out++ = c;
317
+ }
318
+
319
+ while (true ) {
320
+ char c = *iter++;
321
+
322
+ // If we see the end of the line *within* a quoted section, throw
323
+ // error
324
+ if (c == ' \0 ' ) {
325
+ throw Exception (StringUtil::Format (
326
+ " unterminated CSV quoted field at %u" , col_idx));
327
+ }
328
+
329
+ // If we see an escape character within a quoted section, we need to
330
+ // check if the following character is a quote. If so, we must
331
+ // escape it
332
+ if (c == escape) {
333
+ char next = *iter;
334
+ if (next == quote || next == escape) {
335
+ *out++ = next;
336
+ iter++;
337
+ continue ;
338
+ }
339
+ }
340
+
341
+ // If we see the closing quote, we're done.
342
+ if (c == quote) {
343
+ break ;
344
+ }
345
+
346
+ *out++ = c;
347
+ }
348
+ }
272
349
}
273
350
274
- // At this point, iter points to the end of the column's data value
351
+ colend:
352
+ // If we've reached the of the line, but haven't setup all the columns, then
353
+ // we're missing data for the remaining columns and should throw an error.
354
+ if (*iter == ' \0 ' && col_idx != (num_cols_ - 1 )) {
355
+ throw Exception (
356
+ StringUtil::Format (" missing data for column %u on line %u" ,
357
+ (col_idx + 2 ), line_number_));
358
+ }
275
359
276
360
// Let's setup the columns
277
- cols_[col_idx].ptr = start ;
278
- cols_[col_idx].len = static_cast <uint32_t >(iter - start );
361
+ cols_[col_idx].ptr = col_begin ;
362
+ cols_[col_idx].len = static_cast <uint32_t >(col_end - col_begin );
279
363
cols_[col_idx].is_null = (cols_[col_idx].len == 0 );
280
364
281
365
// Eat delimiter, moving to next column
0 commit comments