Skip to content

Commit 9a5e5fa

Browse files
Merge pull request #60 from curtisalexander/mem
Mem
2 parents 73cecc8 + 9953757 commit 9a5e5fa

File tree

6 files changed

+13
-18
lines changed

6 files changed

+13
-18
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# data
22
/readstat/tests/data/*.csv
33
/readstat/tests/data/*.feather
4+
/readstat/tests/data/*.ndjson
45
/readstat/tests/data/*.parquet
56
# environment
67
.vscode/

readstat/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

readstat/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "readstat"
3-
version = "0.9.1"
3+
version = "0.9.2"
44
authors = ["Curtis Alexander <calex@calex.org>"]
55
edition = "2021"
66
description = "Rust wrapper of the ReadStat C library"

readstat/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub use rs_path::ReadStatPath;
3232
pub use rs_write::ReadStatWriter;
3333

3434
// Default rows to stream
35-
const STREAM_ROWS: u32 = 50000;
35+
const STREAM_ROWS: u32 = 10000;
3636

3737
// StructOpt
3838
#[derive(StructOpt, Debug)]
@@ -338,7 +338,6 @@ pub fn run(rs: ReadStat) -> Result<(), Box<dyn Error + Send + Sync>> {
338338
let offsets = build_offsets(total_rows_to_process, total_rows_to_stream)?;
339339

340340
// Create channels
341-
//let (s, r) = bounded(channel_capacity);
342341
let (s, r) = unbounded();
343342

344343
// Initialize writing
@@ -415,7 +414,7 @@ pub fn run(rs: ReadStat) -> Result<(), Box<dyn Error + Send + Sync>> {
415414
for (i, (d, rsp, pairs_cnt)) in r.iter().enumerate() {
416415
wtr.write(&d, &rsp)?;
417416

418-
if i == pairs_cnt {
417+
if i == (pairs_cnt - 1) {
419418
wtr.finish(&d, &rsp)?;
420419
}
421420

readstat/src/rs_data.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,12 @@ impl ReadStatData {
160160

161161
// setup parser
162162
// once call parse_sas7bdat, iteration begins
163-
let mut parser = ReadStatParser::new()
163+
let error = ReadStatParser::new()
164164
// do not set metadata handler nor variable handler as already processed
165165
.set_value_handler(Some(cb::handle_value))?
166166
.set_row_limit(Some(self.batch_rows_to_process.try_into().unwrap()))?
167-
.set_row_offset(Some(self.batch_row_start.try_into().unwrap()))?;
168-
169-
let error = parser.parse_sas7bdat(ppath, ctx);
170-
171-
// drop parser after finished
172-
drop(parser);
167+
.set_row_offset(Some(self.batch_row_start.try_into().unwrap()))?
168+
.parse_sas7bdat(ppath, ctx);
173169

174170
match FromPrimitive::from_i32(error as i32) {
175171
Some(ReadStatError::READSTAT_OK) => Ok(()),

readstat/src/rs_write.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Create a writer struct
22
use std::fs::OpenOptions;
33
use std::io::stdout;
4-
use std::sync::Arc;
54
use std::error::Error;
65

76
use arrow::csv as csv_arrow;
@@ -399,18 +398,18 @@ impl ReadStatWriter {
399398
self.write_message_for_rows(&d, &rsp);
400399

401400
// setup writer if not already started writing
402-
if !self.wrote_start {
401+
//if !self.wrote_start {
403402
self.wtr = Some(ReadStatWriterFormat::Parquet(ArrowWriter::try_new(
404403
f,
405-
Arc::new(d.schema.clone()),
406-
Some(WriterProperties::builder().build()),
404+
d.batch.schema(),
405+
Some(WriterProperties::builder().set_write_batch_size(d.batch_rows_to_process).set_max_row_group_size(d.batch_rows_to_process).build()),
407406
)?));
408-
};
407+
// };
409408

410409
// write
411410
if let Some(rswf) = &mut self.wtr {
412411
match rswf {
413-
ReadStatWriterFormat::Parquet(wtr) => wtr.write(&d.batch)?,
412+
ReadStatWriterFormat::Parquet(wtr) => { wtr.write(&d.batch)? },
414413
_ => unreachable!()
415414
}
416415
};

0 commit comments

Comments
 (0)