Skip to content

Commit 72ce63f

Browse files
committed
Support byte shuffle filter
Both Zarr and HDF5 provide a byte shuffle filter which can be used to improve compression ratios of certain data patterns. There does not appear to be a Rust implementation of the byte shuffle filter, so this change includes one. It also adds a "filters" parameter to the API request data which accepts a list of filter definitions. Each filter definition is a dict with an "id" field and optional parameter fields specific to each filter. The shuffle filter has one parameter, "element_size".
1 parent a61d1fb commit 72ce63f

File tree

10 files changed

+340
-25
lines changed

10 files changed

+340
-25
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ with a JSON payload of the form:
7777
7878
// Algorithm used to compress the data
7979
// - optional, defaults to no compression
80-
"compression": "gzip|zlib"
80+
"compression": {"id": "gzip|zlib"},
81+
82+
// List of algorithms used to filter the data
83+
// - optional, defaults to no filters
84+
"filters": [{"id": "shuffle", "element_size": 4}],
8185
}
8286
```
8387

scripts/client.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ def get_args() -> argparse.Namespace:
3737
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
3838
parser.add_argument("--selection", type=str)
3939
parser.add_argument("--compression", type=str)
40-
parser.add_argument("--show-response-headers", action=argparse.BooleanOptionalAction)
40+
parser.add_argument("--shuffle", action=argparse.BooleanOptionalAction)
41+
parser.add_argument("--verbose", action=argparse.BooleanOptionalAction)
4142
return parser.parse_args()
4243

4344

@@ -56,6 +57,14 @@ def build_request_data(args: argparse.Namespace) -> dict:
5657
request_data["shape"] = json.loads(args.shape)
5758
if args.selection:
5859
request_data["selection"] = json.loads(args.selection)
60+
if args.compression:
61+
request_data["compression"] = {"id": args.compression}
62+
filters = []
63+
if args.shuffle:
64+
element_size = 4 if "32" in args.dtype else 8
65+
filters.append({"id": "shuffle", "element_size": element_size})
66+
if filters:
67+
request_data["filters"] = filters
5968
return {k: v for k, v in request_data.items() if v is not None}
6069

6170

@@ -68,13 +77,13 @@ def request(url: str, username: str, password: str, request_data: dict):
6877
return response
6978

7079

71-
def display(response, show_headers=False):
80+
def display(response, verbose=False):
7281
#print(response.content)
7382
dtype = response.headers['x-activestorage-dtype']
7483
shape = json.loads(response.headers['x-activestorage-shape'])
7584
result = np.frombuffer(response.content, dtype=dtype)
7685
result = result.reshape(shape)
77-
if show_headers:
86+
if verbose:
7887
print("\nResponse headers:", response.headers)
7988
print("\nResult:", result)
8089
else:
@@ -92,10 +101,12 @@ def display_error(response):
92101
def main():
93102
args = get_args()
94103
request_data = build_request_data(args)
104+
if args.verbose:
105+
print("\nRequest data:", request_data)
95106
url = f'{args.server}/v1/{args.operation}/'
96107
response = request(url, args.username, args.password, request_data)
97108
if response.ok:
98-
display(response, show_headers=args.show_response_headers)
109+
display(response, verbose=args.verbose)
99110
else:
100111
display_error(response)
101112
sys.exit(1)

scripts/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
numcodecs
12
numpy
23
requests
34
s3fs

scripts/upload_sample_data.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from enum import Enum
22
import gzip
3+
import numcodecs
34
import numpy as np
45
import pathlib
56
import s3fs
@@ -8,6 +9,7 @@
89
NUM_ITEMS = 10
910
OBJECT_PREFIX = "data"
1011
COMPRESSION_ALGS = [None, "gzip", "zlib"]
12+
FILTER_ALGS = [None, "shuffle"]
1113

1214
#Use enum which also subclasses string type so that
1315
# auto-generated OpenAPI schema can determine allowed dtypes
@@ -38,14 +40,18 @@ def n_bytes(self):
3840
# Create numpy arrays and upload to S3 as bytes
3941
for compression in COMPRESSION_ALGS:
4042
compression_suffix = f"-{compression}" if compression else ""
41-
for d in AllowedDatatypes.__members__.keys():
42-
obj_name = f'{OBJECT_PREFIX}-{d}{compression_suffix}.dat'
43-
with s3_fs.open(bucket / obj_name, 'wb') as s3_file:
44-
data = np.arange(NUM_ITEMS, dtype=d).tobytes()
45-
if compression == "gzip":
46-
data = gzip.compress(data)
47-
elif compression == "zlib":
48-
data = zlib.compress(data)
49-
s3_file.write(data)
50-
51-
print("Data upload successful. \nBucket contents:\n", s3_fs.ls(bucket))
43+
for filter in FILTER_ALGS:
44+
filter_suffix = f"-{filter}" if filter else ""
45+
for d in AllowedDatatypes:
46+
obj_name = f'{OBJECT_PREFIX}-{d}{compression_suffix}{filter_suffix}.dat'
47+
with s3_fs.open(bucket / obj_name, 'wb') as s3_file:
48+
data = np.arange(NUM_ITEMS, dtype=d).tobytes()
49+
if filter == "shuffle":
50+
data = numcodecs.Shuffle(d.n_bytes()).encode(data)
51+
if compression == "gzip":
52+
data = gzip.compress(data)
53+
elif compression == "zlib":
54+
data = zlib.compress(data)
55+
s3_file.write(data)
56+
57+
print("Data upload successful. \nBucket contents:\n", "\n".join(s3_fs.ls(bucket)))

src/filter_pipeline.rs

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use crate::compression;
44
use crate::error::ActiveStorageError;
5+
use crate::filters;
56
use crate::models;
67

78
use axum::body::Bytes;
@@ -13,24 +14,31 @@ use axum::body::Bytes;
1314
/// # Arguments
1415
///
1516
/// * `request_data`: RequestData object for the request
16-
/// * `data`: Data to apply filter pipeline to.
17+
/// * `data`: Data [Bytes](axum::body::Bytes) to apply the pipeline to.
1718
pub fn filter_pipeline(
1819
request_data: &models::RequestData,
1920
data: &Bytes,
2021
) -> Result<Bytes, ActiveStorageError> {
22+
// Make a mutable shallow copy of the data.
23+
let mut data = data.clone();
24+
// First decompress.
2125
if let Some(compression) = request_data.compression {
22-
compression::decompress(compression, data)
23-
} else {
24-
Ok(data.clone())
25-
}
26-
// TODO: Defilter
26+
data = compression::decompress(compression, &data)?
27+
};
28+
// Then decode the filters in reverse order.
29+
if let Some(filters) = &request_data.filters {
30+
for filter in filters.iter().rev() {
31+
data = filters::decode(filter, &data)?
32+
}
33+
};
34+
Ok(data)
2735
}
2836

2937
#[cfg(test)]
3038
mod tests {
3139
use super::*;
3240
use crate::test_utils;
33-
use flate2::read::GzEncoder;
41+
use flate2::read::{GzEncoder, ZlibEncoder};
3442
use flate2::Compression;
3543
use std::io::Read;
3644

@@ -42,6 +50,14 @@ mod tests {
4250
result.into()
4351
}
4452

53+
fn compress_zlib(data: &[u8]) -> Bytes {
54+
// Adapated from flate2 documentation.
55+
let mut result = Vec::<u8>::new();
56+
let mut deflater = ZlibEncoder::new(data, Compression::fast());
57+
deflater.read_to_end(&mut result).unwrap();
58+
result.into()
59+
}
60+
4561
#[test]
4662
fn test_filter_pipeline_noop() {
4763
let data = [1, 2, 3, 4];
@@ -60,4 +76,47 @@ mod tests {
6076
let result = filter_pipeline(&request_data, &bytes).unwrap();
6177
assert_eq!(data.as_ref(), result);
6278
}
79+
80+
#[test]
81+
fn test_filter_pipeline_shuffle() {
82+
let data = [1, 2, 3, 4, 5, 6, 7, 8];
83+
let bytes = Bytes::copy_from_slice(&data);
84+
let shuffled = filters::shuffle::test_utils::shuffle(&bytes, 4);
85+
let mut request_data = test_utils::get_test_request_data();
86+
request_data.filters = Some(vec![models::Filter::Shuffle { element_size: 4 }]);
87+
let result = filter_pipeline(&request_data, &shuffled).unwrap();
88+
assert_eq!(data.as_ref(), result);
89+
}
90+
91+
#[test]
92+
fn test_filter_pipeline_shuffle_zlib() {
93+
let data: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8];
94+
let bytes = Bytes::copy_from_slice(&data);
95+
let shuffled = filters::shuffle::test_utils::shuffle(&bytes, 4);
96+
let bytes = compress_zlib(shuffled.as_ref());
97+
let mut request_data = test_utils::get_test_request_data();
98+
request_data.compression = Some(models::Compression::Zlib);
99+
request_data.filters = Some(vec![models::Filter::Shuffle { element_size: 4 }]);
100+
let result = filter_pipeline(&request_data, &bytes).unwrap();
101+
assert_eq!(data.as_ref(), result.as_ref());
102+
}
103+
104+
#[test]
105+
fn test_filter_pipeline_shuffle_x2_zlib() {
106+
// Test multiple filters.
107+
// Currently we only have shuffle, so run it twice with different element types.
108+
let data: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8];
109+
let bytes = Bytes::copy_from_slice(&data);
110+
let shuffled = filters::shuffle::test_utils::shuffle(&bytes, 4);
111+
let reshuffled = filters::shuffle::test_utils::shuffle(&shuffled, 2);
112+
let bytes = compress_zlib(reshuffled.as_ref());
113+
let mut request_data = test_utils::get_test_request_data();
114+
request_data.compression = Some(models::Compression::Zlib);
115+
request_data.filters = Some(vec![
116+
models::Filter::Shuffle { element_size: 4 },
117+
models::Filter::Shuffle { element_size: 2 },
118+
]);
119+
let result = filter_pipeline(&request_data, &bytes).unwrap();
120+
assert_eq!(data.as_ref(), result.as_ref());
121+
}
63122
}

src/filters.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//! Filter implementations.
2+
3+
pub mod shuffle;
4+
5+
use crate::error::ActiveStorageError;
6+
use crate::models;
7+
8+
use axum::body::Bytes;
9+
10+
/// Decodes some bytes using the specified filter and returns the result.
11+
///
12+
/// # Arguments
13+
///
14+
/// * `filter`: Filter algorithm
15+
/// * `data`: Filtered data [Bytes](axum::body::Bytes)
16+
pub fn decode(filter: &models::Filter, data: &Bytes) -> Result<Bytes, ActiveStorageError> {
17+
match filter {
18+
models::Filter::Shuffle { element_size } => Ok(shuffle::deshuffle(data, *element_size)),
19+
}
20+
}
21+
22+
#[cfg(test)]
23+
mod tests {
24+
use super::*;
25+
use crate::filters;
26+
27+
#[test]
28+
fn test_decode_shuffle() {
29+
let data = [1, 2, 3, 4, 5, 6, 7, 8];
30+
let bytes = Bytes::copy_from_slice(&data);
31+
let shuffled = filters::shuffle::test_utils::shuffle(&bytes, 4);
32+
let filter = models::Filter::Shuffle { element_size: 4 };
33+
let result = decode(&filter, &shuffled).unwrap();
34+
assert_eq!(data.as_ref(), result);
35+
}
36+
}

0 commit comments

Comments
 (0)