Skip to content

Commit 14f238d

Browse files
authored
Merge pull request #13 from stackhpc/s3-reduce-copies
Reduce copying of data in S3 client
2 parents 0b20134 + eb47c1a commit 14f238d

File tree

5 files changed

+28
-21
lines changed

5 files changed

+28
-21
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ thiserror = "1.0"
2929
tokio = { version = "1.24", features = ["full"] }
3030
tower = "0.4"
3131
tower-http = { version = "0.3", features = ["normalize-path", "trace", "validate-request"] }
32+
tokio-stream = "0.1"
33+
tracing = "0.1"
34+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
3235
url = { version = "2", features = ["serde"] }
3336
validator = { version = "0.16", features = ["derive"] }
3437
zerocopy = { version = "0.6.1", features = ["alloc", "simd"] }

scripts/upload_sample_data.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import pathlib
44
import s3fs
55

6+
NUM_ITEMS = 10
7+
OBJECT_PREFIX = "data"
8+
69
#Use enum which also subclasses string type so that
710
# auto-generated OpenAPI schema can determine allowed dtypes
811
class AllowedDatatypes(str, Enum):
@@ -32,7 +35,7 @@ def n_bytes(self):
3235

3336
# Create numpy arrays and upload to S3 as bytes
3437
for d in AllowedDatatypes.__members__.keys():
35-
with s3_fs.open(bucket / f'data-{d}.dat', 'wb') as s3_file:
36-
s3_file.write(np.arange(10, dtype=d).tobytes())
38+
with s3_fs.open(bucket / f'{OBJECT_PREFIX}-{d}.dat', 'wb') as s3_file:
39+
s3_file.write(np.arange(NUM_ITEMS, dtype=d).tobytes())
3740

3841
print("Data upload successful. \nBucket contents:\n", s3_fs.ls(bucket))

src/error.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ impl From<ActiveStorageError> for ErrorResponse {
177177
| ActiveStorageError::S3ByteStream(_) => Self::internal_server_error(&error),
178178

179179
ActiveStorageError::S3GetObject(sdk_error) => {
180-
// FIXME: we lose "error retrieving object from S3 storage"
181180
// Tailor the response based on the specific SdkError variant.
182181
match &sdk_error {
183182
// These are generic SdkError variants.
@@ -190,12 +189,6 @@ impl From<ActiveStorageError> for ErrorResponse {
190189
// This is a more specific ServiceError variant, with GetObjectError as the
191190
// inner error.
192191
SdkError::ServiceError(get_obj_error) => {
193-
//let error = if let Some(get_obj_message) = get_obj_error.err().message() {
194-
// // FIXME: use message() & code()?
195-
// &get_obj_error.err()
196-
//} else {
197-
// &sdk_error
198-
//};
199192
let get_obj_error = get_obj_error.err();
200193
match get_obj_error.kind {
201194
GetObjectErrorKind::InvalidObjectState(_)

src/main.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use tokio::signal;
2+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
23

34
mod app;
45
mod array;
@@ -11,6 +12,8 @@ mod validated_json;
1112

1213
#[tokio::main]
1314
async fn main() {
15+
init_tracing();
16+
1417
let router = app::router();
1518

1619
// run it with hyper on localhost:8080
@@ -21,6 +24,16 @@ async fn main() {
2124
.unwrap();
2225
}
2326

27+
fn init_tracing() {
28+
tracing_subscriber::registry()
29+
.with(
30+
tracing_subscriber::EnvFilter::try_from_default_env()
31+
.unwrap_or_else(|_| "s3_active_storage=debug,tower_http=debug".into()),
32+
)
33+
.with(tracing_subscriber::fmt::layer())
34+
.init();
35+
}
36+
2437
async fn shutdown_signal() {
2538
let ctrl_c = async {
2639
signal::ctrl_c()

src/s3_client.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use aws_credential_types::Credentials;
77
use aws_sdk_s3::Client;
88
use aws_types::region::Region;
99
use axum::body::Bytes;
10+
use tokio_stream::StreamExt;
1011
use url::Url;
1112

1213
/// S3 client object.
@@ -49,8 +50,7 @@ impl S3Client {
4950
key: &str,
5051
range: Option<String>,
5152
) -> Result<Bytes, ActiveStorageError> {
52-
// TODO: Provide a streaming response.
53-
let response = self
53+
let mut response = self
5454
.client
5555
.get_object()
5656
.bucket(bucket)
@@ -64,18 +64,13 @@ impl S3Client {
6464
// return the data in Bytes object in which the underlying data has a higher alignment.
6565
// For now we're hard-coding an alignment of 8 bytes, although this should depend on the
6666
// data type, and potentially whether there are any SIMD requirements.
67-
// FIXME: The current method is rather inefficient, involving copying the data at least
68-
// twice. This is functional, but should be revisited.
69-
7067
// Create an 8-byte aligned Vec<u8>.
7168
let mut buf = maligned::align_first::<u8, maligned::A8>(content_length as usize);
72-
// Read all data into memory as an AggregatedBytes.
73-
let data = response.body.collect().await;
74-
// Copy the data into an unaligned Vec<u8>.
75-
let bytes = data.map_err(ActiveStorageError::S3ByteStream)?;
76-
let mut vec = bytes.to_vec();
77-
// Copy the data into the aligned Vec<u8>.
78-
buf.append(&mut vec);
69+
70+
// Iterate over the streaming response, copying data into the aligned Vec<u8>.
71+
while let Some(bytes) = response.body.try_next().await? {
72+
buf.extend_from_slice(&bytes)
73+
}
7974
// Return as Bytes.
8075
Ok(buf.into())
8176
}

0 commit comments

Comments
 (0)