Skip to content

Commit 9953757

Browse files
Do not closing and reopening parquet files when writing. Correct issue where writer was never finishing. Set default stream rows to 10,000.
1 parent c9961db commit 9953757

File tree

5 files changed

+10
-18
lines changed

5 files changed

+10
-18
lines changed

readstat/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

readstat/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "readstat"
3-
version = "0.9.1"
3+
version = "0.9.2"
44
authors = ["Curtis Alexander <calex@calex.org>"]
55
edition = "2021"
66
description = "Rust wrapper of the ReadStat C library"

readstat/src/lib.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
#![allow(non_camel_case_types)]
22

33
use colored::Colorize;
4-
use crossbeam::channel::bounded;
5-
// use crossbeam::channel::unbounded;
4+
use crossbeam::channel::unbounded;
65
use log::debug;
76
use path_abs::{PathAbs, PathInfo};
87
use rayon::prelude::*;
@@ -33,7 +32,7 @@ pub use rs_path::ReadStatPath;
3332
pub use rs_write::ReadStatWriter;
3433

3534
// Default rows to stream
36-
const STREAM_ROWS: u32 = 50000;
35+
const STREAM_ROWS: u32 = 10000;
3736

3837
// StructOpt
3938
#[derive(StructOpt, Debug)]
@@ -339,8 +338,7 @@ pub fn run(rs: ReadStat) -> Result<(), Box<dyn Error + Send + Sync>> {
339338
let offsets = build_offsets(total_rows_to_process, total_rows_to_stream)?;
340339

341340
// Create channels
342-
let (s, r) = bounded(rayon::current_num_threads());
343-
//let (s, r) = unbounded();
341+
let (s, r) = unbounded();
344342

345343
// Initialize writing
346344
let mut wtr = ReadStatWriter::new();
@@ -416,7 +414,7 @@ pub fn run(rs: ReadStat) -> Result<(), Box<dyn Error + Send + Sync>> {
416414
for (i, (d, rsp, pairs_cnt)) in r.iter().enumerate() {
417415
wtr.write(&d, &rsp)?;
418416

419-
if i == pairs_cnt {
417+
if i == (pairs_cnt - 1) {
420418
wtr.finish(&d, &rsp)?;
421419
}
422420

readstat/src/rs_data.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,12 @@ impl ReadStatData {
160160

161161
// setup parser
162162
// once call parse_sas7bdat, iteration begins
163-
let mut parser = ReadStatParser::new()
163+
let error = ReadStatParser::new()
164164
// do not set metadata handler nor variable handler as already processed
165165
.set_value_handler(Some(cb::handle_value))?
166166
.set_row_limit(Some(self.batch_rows_to_process.try_into().unwrap()))?
167-
.set_row_offset(Some(self.batch_row_start.try_into().unwrap()))?;
168-
169-
let error = parser.parse_sas7bdat(ppath, ctx);
170-
171-
// drop parser after finished
172-
// drop(parser);
167+
.set_row_offset(Some(self.batch_row_start.try_into().unwrap()))?
168+
.parse_sas7bdat(ppath, ctx);
173169

174170
match FromPrimitive::from_i32(error as i32) {
175171
Some(ReadStatError::READSTAT_OK) => Ok(()),

readstat/src/rs_write.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,17 +401,15 @@ impl ReadStatWriter {
401401
//if !self.wrote_start {
402402
self.wtr = Some(ReadStatWriterFormat::Parquet(ArrowWriter::try_new(
403403
f,
404-
//Arc::new(d.schema.clone()),
405404
d.batch.schema(),
406-
// Arc::new(d.schema.clone()),
407405
Some(WriterProperties::builder().set_write_batch_size(d.batch_rows_to_process).set_max_row_group_size(d.batch_rows_to_process).build()),
408406
)?));
409407
// };
410408

411409
// write
412410
if let Some(rswf) = &mut self.wtr {
413411
match rswf {
414-
ReadStatWriterFormat::Parquet(wtr) => { wtr.write(&d.batch)?; wtr.close()?; },
412+
ReadStatWriterFormat::Parquet(wtr) => { wtr.write(&d.batch)? },
415413
_ => unreachable!()
416414
}
417415
};

0 commit comments

Comments
 (0)