Skip to content

Commit 2740d92

Browse files
authored
Support reductions over a subset of axes (#100)
* Adapt test client to handle reduction over axes * Add axis field to request data model * Implement sum over axis * Implement sum over multiple axes * Implement min over multiple axes * Rename request field: axes -> axis * Implement max over multiple axes * Add test to ensure selection ignores axis * Implement count over multiple axes * Tmp: switch to dedicated test suite branch in CI * Misc code clean up * Handle empty axis list in count operation * Handle empty axis list in min, max and sum operation * Improve validation of multi-axis reduction requests * Refactor to avoid unwrap * DRY refactoring * Update docs to include axis field * Add explanatory commment * Revert "Tmp: switch to dedicated test suite branch in CI" This reverts commit 95cc35f.
1 parent fd1d335 commit 2740d92

File tree

13 files changed

+1229
-186
lines changed

13 files changed

+1229
-186
lines changed

benches/byte_order.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ fn get_test_request_data() -> RequestData {
1414
offset: None,
1515
size: None,
1616
shape: None,
17+
axis: reductionist::models::ReductionAxes::All,
1718
order: None,
1819
selection: None,
1920
compression: None,

benches/operations.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ fn get_test_request_data() -> RequestData {
1919
offset: None,
2020
size: None,
2121
shape: None,
22+
axis: reductionist::models::ReductionAxes::All,
2223
order: None,
2324
selection: None,
2425
compression: None,
@@ -34,7 +35,7 @@ fn criterion_benchmark(c: &mut Criterion) {
3435
let size = size_k * 1024;
3536
let data: Vec<i64> = (0_i64..size).map(|i| i % 256).collect::<Vec<i64>>();
3637
let data: Vec<u8> = data.as_bytes().into();
37-
let missings = vec![
38+
let missing_types = vec![
3839
None,
3940
Some(Missing::MissingValue(42.into())),
4041
Some(Missing::MissingValues(vec![42.into()])),
@@ -50,7 +51,7 @@ fn criterion_benchmark(c: &mut Criterion) {
5051
("sum", Box::new(operations::Sum::execute)),
5152
];
5253
for (op_name, execute) in operations {
53-
for missing in missings.clone() {
54+
for missing in missing_types.clone() {
5455
let name = format!("{}({}, {:?})", op_name, size, missing);
5556
c.bench_function(&name, |b| {
5657
b.iter(|| {

docs/api.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ The request body should be a JSON object of the form:
3737
// - optional, defaults to a simple 1D array
3838
"shape": [20, 5],
3939
40+
// The axis or axes over which to perform the reduction operation
41+
// - optional, can be either a single axis or list of axes, defaults
42+
// to a reduction over all axes
43+
"axis": 0,
44+
4045
// Indicates whether the data is in C order (row major)
4146
// or Fortran order (column major, indicated by 'F')
4247
// - optional, defaults to 'C'
@@ -78,10 +83,10 @@ Unauthenticated access to S3 is possible by omitting the basic auth header.
7883
On success, all operations return HTTP 200 OK with the response using the same datatype as specified in the request except for `count` which always returns the result as `int64`.
7984
The server returns the following headers with the HTTP response:
8085

81-
* `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`.
82-
* `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`.
83-
* `x-activestorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result.
84-
* `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.
86+
- `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`.
87+
- `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`.
88+
- `x-activestorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result.
89+
- `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.
8590

8691
On error, an HTTP 4XX (client) or 5XX (server) response code will be returned, with the response body being a JSON object of the following format:
8792

docs/architecture.md

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ Reductionist is built on top of a number of popular open source components.
77

88
A few properties make it relatively easy to build a conceptual mental model of how Reductionist works.
99

10-
* All operations share the same request processing pipeline.
11-
* The request processing pipeline for each request is a fairly linear sequence of steps.
12-
* There is no persistent state.
13-
* The only external service that is interacted with is an S3-compatible object store.
10+
- All operations share the same request processing pipeline.
11+
- The request processing pipeline for each request is a fairly linear sequence of steps.
12+
- There is no persistent state.
13+
- The only external service that is interacted with is an S3-compatible object store.
1414

1515
The more challenging aspects of the system are the lower level details of asynchronous programming, memory management, the Rust type system and working with multi-dimensional arrays.
1616

@@ -29,7 +29,6 @@ A diagram of this step for the sum operation is shown in Figure 2.
2929
<figcaption>Figure 2: Sum operation flow diagram</figcaption>
3030
</figure>
3131

32-
3332
## Axum web server
3433

3534
[Axum](https://docs.rs/axum) is an asynchronous web framework that performs well in [various benchmarks](https://github.com/programatik29/rust-web-benchmarks/blob/master/result/hello-world.md) and is built on top of various popular components, including the [hyper](https://hyper.rs/) HTTP library.
@@ -110,13 +109,11 @@ Each operation is implemented by a struct that implements the `NumOperation` tra
110109
For example, the sum operation is implemented by the `Sum` struct in `src/operations.rs`.
111110
The `Sum` struct's `execute_t` method does the following:
112111

113-
* Zero copy conversion of the byte array to a multi-dimensional [ndarray::ArrayView](https://docs.rs/ndarray/latest/ndarray/type.ArrayView.html) object of the data type, shape and byte order specified in the request data
114-
* If a selection was specified in the request data, create a sliced `ndarray::ArrayView` onto the original array view
115-
* If missing data was specified in the request data:
116-
* Create an iterator over the array view that filters out missing data, performs the sum operation and counts non-missing elements
117-
* Otherwise:
118-
* Use the array view's native `sum` and `len` methods to take the sum and element count
119-
* Convert the sum to a byte array and return with the element count
112+
- Zero copy conversion of the byte array to a multi-dimensional [ndarray::ArrayView](https://docs.rs/ndarray/latest/ndarray/type.ArrayView.html) object of the data type, shape and byte order specified in the request data
113+
- If a selection was specified in the request data, create a sliced `ndarray::ArrayView` onto the original array view
114+
- Checks whether the reduction should be performed over all or only a subset of the sliced data's axes
115+
- Performs a fold over each of the requested axes to calculate the required reduction while ignoring any specified missing data
116+
- Convert the sum to a byte array and return with the element count
120117

121118
The procedure for other operations varies slightly but generally follows the same pattern.
122119

@@ -136,9 +133,9 @@ Reductionist supports optional restriction of resource usage.
136133
This is implemented in `src/resource_manager.rs` using [Tokio Semaphores](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html).
137134
This allows Reductionist to limit the quantity of various resources used at any time:
138135

139-
* S3 connections
140-
* memory used for numeric data (this is more of a rough guide than a perfect limit)
141-
* threads used for CPU-bound work
136+
- S3 connections
137+
- memory used for numeric data (this is more of a rough guide than a perfect limit)
138+
- threads used for CPU-bound work
142139

143140
## CPU-bound work
144141

@@ -158,9 +155,9 @@ The second approach may leave the server more responsive if more CPU-heavy opera
158155
Prometheus metrics are implemented in `src/metrics.rs` and are exposed by the Reductionist API under the `/metrics` path.
159156
These include:
160157

161-
* incoming requests (counter)
162-
* outgoing response (counter)
163-
* response time (histogram)
158+
- incoming requests (counter)
159+
- outgoing response (counter)
160+
- response time (histogram)
164161

165162
## Tracing and profiling
166163

docs/pyactivestorage.md

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,36 @@
33
Reductionist has been integrated with the [PyActiveStorage](https://github.com/valeriupredoi/PyActiveStorage) library, and PyActiveStorage acts as a client of the Reductionist server.
44
PyActiveStorage currently works with data in netCDF4 format, and is able to perform reductions on a variable within such a dataset.
55
Numerical operations are performed on individual storage chunks, with the results later aggregated.
6-
The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implementated in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist.
6+
The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implemented in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist.
77
The following code snippet shows the `reduce_chunk` function signature.
88

99
```python
1010
def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None):
11-
""" We do our own read of chunks and decoding etc
12-
13-
rfile - the actual file with the data
11+
""" We do our own read of chunks and decoding etc
12+
13+
rfile - the actual file with the data
1414
offset, size - where and what we want ...
1515
compression - optional `numcodecs.abc.Codec` compression codec
1616
filters - optional list of `numcodecs.abc.Codec` filter codecs
17-
dtype - likely float32 in most cases.
18-
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
17+
dtype - likely float32 in most cases.
18+
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
1919
chunk itself
2020
order - typically 'C' for c-type ordering
2121
chunk_selection - python slice tuples for each dimension, e.g.
2222
(slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1))
2323
this defines the part of the chunk which is to be obtained
2424
or operated upon.
25-
method - computation desired
26-
(in this Python version it's an actual method, in
25+
method - computation desired
26+
(in this Python version it's an actual method, in
2727
storage implementations we'll change to controlled vocabulary)
28-
28+
2929
"""
3030
```
3131

3232
For Reductionist, the `reduce_chunk` function signature in `activestorage/reductionist.py` is similar, but replaces the local file path with a `requests.Session` object, the Reductionist server URL, S3-compatible object store URL, and the bucket and object containing the data.
3333

34+
<!-- TODO: Update to include axis arg once integrated into PyActiveStorage -->
35+
3436
```python
3537
def reduce_chunk(session, server, source, bucket, object,
3638
offset, size, compression, filters, missing, dtype, shape,
@@ -64,11 +66,11 @@ def reduce_chunk(session, server, source, bucket, object,
6466

6567
Within the `reduce_chunk` implementation for Reductionist, the following steps are taken:
6668

67-
* build Reductionist API request data
68-
* build Reductionist API URL
69-
* perform an HTTP(S) POST request to Reductionist
70-
* on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers
71-
* on failure, raise a `ReductionistError` with the response status code and JSON encoded error response
69+
- build Reductionist API request data
70+
- build Reductionist API URL
71+
- perform an HTTP(S) POST request to Reductionist
72+
- on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers
73+
- on failure, raise a `ReductionistError` with the response status code and JSON encoded error response
7274

7375
The use of a `requests.Session` object allows for TCP connection pooling, reducing connection overhead when multiple requests are made within a short timeframe.
7476

@@ -77,9 +79,9 @@ It should be possible to provide a unified interface to storage systems by abstr
7779
Other changes to the main `activestorage.Active` class were necessary for integration of Reductionist.
7880
These include:
7981

80-
* Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries
81-
* Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket
82-
* Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend
83-
* Use of a thread pool to execute storage chunk reductions in parallel
84-
* Unit tests to cover new and modified code
85-
* Integration test changes to allow running against a POSIX or S3 storage backend
82+
- Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries
83+
- Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket
84+
- Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend
85+
- Use of a thread pool to execute storage chunk reductions in parallel
86+
- Unit tests to cover new and modified code
87+
- Integration test changes to allow running against a POSIX or S3 storage backend

scripts/client.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def get_args() -> argparse.Namespace:
3636
parser.add_argument("--offset", type=int)
3737
parser.add_argument("--size", type=int)
3838
parser.add_argument("--shape", type=str)
39+
parser.add_argument("--axis", type=str)
3940
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
4041
parser.add_argument("--selection", type=str)
4142
parser.add_argument("--compression", type=str)
@@ -72,6 +73,8 @@ def build_request_data(args: argparse.Namespace) -> dict:
7273
request_data["byte_order"] = args.byte_order
7374
if args.shape:
7475
request_data["shape"] = json.loads(args.shape)
76+
if args.axis is not None:
77+
request_data["axis"] = json.loads(args.axis)
7578
if args.selection:
7679
request_data["selection"] = json.loads(args.selection)
7780
if args.compression:
@@ -113,11 +116,16 @@ def display(response, verbose=False):
113116
#print(response.content)
114117
dtype = response.headers['x-activestorage-dtype']
115118
shape = json.loads(response.headers['x-activestorage-shape'])
116-
result = np.frombuffer(response.content, dtype=dtype)
117-
result = result.reshape(shape)
119+
counts = json.loads(response.headers['x-activestorage-count'])
120+
counts = np.array(counts)
121+
if len(counts) > 1:
122+
counts = counts.reshape(shape)
123+
result = np.frombuffer(response.content, dtype=dtype).reshape(shape)
118124
if verbose:
125+
sep = "\n" if len(counts.shape) > 1 else " "
119126
print("\nResponse headers:", response.headers)
120-
print("\nResult:", result)
127+
print("\nNon-missing count(s):", counts, sep=sep)
128+
print("\nResult:", result, sep=sep)
121129
else:
122130
print(result)
123131

scripts/parallel-client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def get_args() -> argparse.Namespace:
4848
parser.add_argument("--offset", type=int)
4949
parser.add_argument("--size", type=int)
5050
parser.add_argument("--shape", type=str)
51+
parser.add_argument("--axis", type=str)
5152
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
5253
parser.add_argument("--selection", type=str)
5354
parser.add_argument("--compression", type=str)
@@ -90,6 +91,8 @@ def build_request_data(args: argparse.Namespace) -> dict:
9091
request_data["byte_order"] = args.byte_order
9192
if args.shape:
9293
request_data["shape"] = json.loads(args.shape)
94+
if args.axis is not None:
95+
request_data["axis"] = json.loads(args.axis)
9396
if args.selection:
9497
request_data["selection"] = json.loads(args.selection)
9598
if args.compression:

scripts/upload_sample_data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
import s3fs
77
import zlib
88

9-
NUM_ITEMS = 10
9+
NUM_ITEMS = 12
1010
OBJECT_PREFIX = "data"
1111
COMPRESSION_ALGS = [None, "gzip", "zlib"]
1212
FILTER_ALGS = [None, "shuffle"]
1313

14-
#Use enum which also subclasses string type so that
14+
# Use enum which also subclasses string type so that
1515
# auto-generated OpenAPI schema can determine allowed dtypes
1616
class AllowedDatatypes(str, Enum):
1717
""" Data types supported by active storage proxy """
@@ -31,7 +31,7 @@ def n_bytes(self):
3131
s3_fs = s3fs.S3FileSystem(key='minioadmin', secret='minioadmin', client_kwargs={'endpoint_url': S3_URL})
3232
bucket = pathlib.Path('sample-data')
3333

34-
#Make sure s3 bucket exists
34+
# Make sure s3 bucket exists
3535
try:
3636
s3_fs.mkdir(bucket)
3737
except FileExistsError:

0 commit comments

Comments
 (0)