Skip to content

Commit d74ec6e

Browse files
authored
Merge pull request #9153 from guymguym/guy-rdma-pr1
NSFS refactor file_reader and file_writer
2 parents 2a4e196 + 237aa34 commit d74ec6e

File tree

13 files changed

+694
-216
lines changed

13 files changed

+694
-216
lines changed

config.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,7 @@ config.NSFS_WARN_THRESHOLD_MS = 100;
827827
config.NSFS_CALCULATE_MD5 = false;
828828
config.NSFS_TRIGGER_FSYNC = true;
829829
config.NSFS_CHECK_BUCKET_BOUNDARIES = true;
830+
config.NSFS_CHECK_BUCKET_PATH_EXISTS = true;
830831
config.NSFS_REMOVE_PARTS_ON_COMPLETE = true;
831832

832833
config.NSFS_BUF_POOL_WARNING_TIMEOUT = 2 * 60 * 1000;
@@ -1009,7 +1010,8 @@ config.NSFS_GLACIER_RESERVED_BUCKET_TAGS = {};
10091010
// anonymous account name
10101011
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';
10111012

1012-
config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
1013+
config.NSFS_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
1014+
config.NSFS_DOWNLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
10131015

10141016
// we want to change our handling related to EACCESS error
10151017
config.NSFS_LIST_IGNORE_ENTRY_ON_EACCES = true;

docs/NooBaaNonContainerized/ConfigFileCustomizations.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,20 @@ Warning: After setting this configuration, NooBaa will skip schema validations a
532532
3. systemctl restart noobaa
533533
```
534534
535+
### 37. Trigger Check bucket path exists -
536+
* <u>Key</u>: `NSFS_CHECK_BUCKET_PATH_EXISTS`
537+
* <u>Type</u>: Boolean
538+
* <u>Default</u>: true
539+
* <u>Description</u>: Enable/Disable bucket path existance checks. This is EXPERIMENTAL and NOT recommended for production! When disabled, will reduce some latency on object operations, but calls to non existing bucket paths will result with unexpected behavior (e.g. could return NO_SUCH_OBJECT instead of NO_SUCH_BUCKET).
540+
* <u>Steps</u>:
541+
```
542+
1. Open /path/to/config_dir/config.json file.
543+
2. Set the config key -
544+
Example:
545+
"NSFS_CHECK_BUCKET_PATH_EXISTS": false
546+
```
547+
548+
535549
## Config.json File Examples
536550
The following is an example of a config.json file -
537551

src/sdk/bucketspace_fs.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ class BucketSpaceFS extends BucketSpaceSimpleFS {
433433
async get_bucket_lifecycle_configuration_rules(params) {
434434
try {
435435
const { name } = params;
436-
dbg.log0('BucketSpaceFS.get_bucket_lifecycle_configuration_rules: Bucket name', name);
436+
dbg.log1('BucketSpaceFS.get_bucket_lifecycle_configuration_rules: Bucket name', name);
437437
const bucket = await this.config_fs.get_bucket_by_name(name);
438438
return bucket.lifecycle_configuration_rules || [];
439439
} catch (error) {

src/sdk/namespace_fs.js

Lines changed: 72 additions & 151 deletions
Large diffs are not rendered by default.

src/sdk/object_io.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,8 @@ class ObjectIO {
435435
];
436436

437437
await stream_utils.pipeline(transforms);
438-
await stream_utils.wait_finished(uploader);
438+
// Explicitly wait for finish as a defensive measure although pipeline should do it
439+
await stream.promises.finished(uploader);
439440

440441
if (splitter.md5) complete_params.md5_b64 = splitter.md5.toString('base64');
441442
if (splitter.sha256) complete_params.sha256_b64 = splitter.sha256.toString('base64');
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/* Copyright (C) 2020 NooBaa */
2+
'use strict';
3+
4+
const fs = require('fs');
5+
const path = require('path');
6+
const assert = require('assert');
7+
const buffer_utils = require('../../../util/buffer_utils');
8+
const native_fs_utils = require('../../../util/native_fs_utils');
9+
const { FileReader } = require('../../../util/file_reader');
10+
const { multi_buffer_pool } = require('../../../sdk/namespace_fs');
11+
12+
const fs_context = {};
13+
14+
describe('FileReader', () => {
15+
16+
const test_files = fs.readdirSync(__dirname).map(file => path.join(__dirname, file));
17+
18+
/**
19+
* @param {(file_path: string, start?: number, end?: number) => void} tester
20+
*/
21+
function describe_read_cases(tester) {
22+
describe('list files and read entire', () => {
23+
for (const file_path of test_files) {
24+
tester(file_path);
25+
}
26+
});
27+
describe('skip start cases', () => {
28+
tester(__filename, 1, Infinity);
29+
tester(__filename, 3, Infinity);
30+
tester(__filename, 11, Infinity);
31+
tester(__filename, 1023, Infinity);
32+
tester(__filename, 1024, Infinity);
33+
tester(__filename, 1025, Infinity);
34+
});
35+
describe('edge cases', () => {
36+
tester(__filename, 0, 1);
37+
tester(__filename, 0, 2);
38+
tester(__filename, 0, 3);
39+
tester(__filename, 1, 2);
40+
tester(__filename, 1, 3);
41+
tester(__filename, 2, 3);
42+
tester(__filename, 0, 1023);
43+
tester(__filename, 0, 1024);
44+
tester(__filename, 0, 1025);
45+
tester(__filename, 1, 1023);
46+
tester(__filename, 1, 1024);
47+
tester(__filename, 1, 1025);
48+
tester(__filename, 1023, 1024);
49+
tester(__filename, 1023, 1025);
50+
tester(__filename, 1024, 1025);
51+
tester(__filename, 123, 345);
52+
tester(__filename, 1000000000, Infinity);
53+
});
54+
}
55+
56+
describe('as stream.Readable', () => {
57+
58+
describe_read_cases(tester);
59+
60+
function tester(file_path, start = 0, end = Infinity) {
61+
const basename = path.basename(file_path);
62+
it(`test read ${start}-${end} ${basename}`, async () => {
63+
await native_fs_utils.use_file({
64+
fs_context,
65+
bucket_path: file_path,
66+
open_path: file_path,
67+
scope: async file => {
68+
const stat = await file.stat(fs_context);
69+
const aborter = new AbortController();
70+
const signal = aborter.signal;
71+
const file_reader = new FileReader({
72+
fs_context,
73+
file,
74+
file_path,
75+
stat,
76+
start,
77+
end,
78+
signal,
79+
multi_buffer_pool,
80+
highWaterMark: 1024, // bytes
81+
});
82+
const data = await buffer_utils.read_stream_join(file_reader);
83+
const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 });
84+
const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream);
85+
assert.strictEqual(data.length, node_fs_data.length);
86+
assert.strictEqual(data.toString(), node_fs_data.toString());
87+
}
88+
});
89+
});
90+
}
91+
});
92+
93+
describe('read_into_stream with buffer pooling', () => {
94+
95+
describe_read_cases(tester);
96+
97+
function tester(file_path, start = 0, end = Infinity) {
98+
const basename = path.basename(file_path);
99+
it(`test read ${start}-${end} ${basename}`, async () => {
100+
await native_fs_utils.use_file({
101+
fs_context,
102+
bucket_path: file_path,
103+
open_path: file_path,
104+
scope: async file => {
105+
const stat = await file.stat(fs_context);
106+
const aborter = new AbortController();
107+
const signal = aborter.signal;
108+
const file_reader = new FileReader({
109+
fs_context,
110+
file,
111+
file_path,
112+
stat,
113+
start,
114+
end,
115+
signal,
116+
multi_buffer_pool,
117+
highWaterMark: 1024, // bytes
118+
});
119+
const writable = buffer_utils.write_stream();
120+
await file_reader.read_into_stream(writable);
121+
const data = writable.join();
122+
const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 });
123+
const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream);
124+
assert.strictEqual(data.length, node_fs_data.length);
125+
assert.strictEqual(data.toString(), node_fs_data.toString());
126+
}
127+
});
128+
});
129+
}
130+
131+
});
132+
133+
134+
// Abort tests are disabled temporarily due to flakiness
135+
//
136+
// describe('abort during read_into_stream', () => {
137+
// tester(__filename);
138+
// function tester(file_path, start = 0, end = Infinity) {
139+
// const basename = path.basename(file_path);
140+
// it(`test abort read ${start}-${end} ${basename}`, async () => {
141+
// await native_fs_utils.use_file({
142+
// fs_context,
143+
// bucket_path: file_path,
144+
// open_path: file_path,
145+
// scope: async file => {
146+
// const stat = await file.stat(fs_context);
147+
// const aborter = new AbortController();
148+
// const signal = aborter.signal;
149+
// const file_reader = new FileReader({
150+
// fs_context,
151+
// file,
152+
// file_path,
153+
// stat,
154+
// start,
155+
// end,
156+
// signal,
157+
// multi_buffer_pool,
158+
// highWaterMark: 1024, // bytes
159+
// });
160+
// const writable = buffer_utils.write_stream();
161+
// const promise = file_reader.read_into_stream(writable);
162+
// setImmediate(() => aborter.abort()); // abort quickly
163+
// await assert.rejects(promise, { name: 'AbortError' });
164+
// }
165+
// });
166+
// });
167+
// }
168+
// });
169+
170+
});

src/test/unrelated/for_await_stream.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
/* Copyright (C) 2020 NooBaa */
22
'use strict';
33

4-
const util = require('util');
54
const events = require('events');
65
const stream = require('stream');
76
const argv = require('minimist')(process.argv);
8-
const stream_finished = util.promisify(stream.finished);
97

108
async function test(mode) {
119
try {
@@ -68,7 +66,7 @@ async function test(mode) {
6866

6967
}
7068

71-
await stream_finished(output);
69+
await stream.promises.finished(output);
7270
console.log(`${mode}: done.`);
7371

7472
} catch (err) {

src/test/unrelated/stream_pipe_to_multiple_targets.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/* Copyright (C) 2016 NooBaa */
22
'use strict';
33

4-
const util = require('util');
54
const assert = require('assert');
65
const stream = require('stream');
76
const crypto = require('crypto');
@@ -81,8 +80,6 @@ class WriteTarget extends stream.Writable {
8180
}
8281
}
8382

84-
const wait_finished = util.promisify(stream.finished);
85-
8683
async function main() {
8784
try {
8885
console.log('main: starting ...');
@@ -145,8 +142,8 @@ async function main() {
145142
}
146143

147144
await Promise.allSettled([
148-
wait_finished(hub),
149-
wait_finished(cache),
145+
stream.promises.finished(hub),
146+
stream.promises.finished(cache),
150147
]);
151148

152149
if (cache.destroyed) {

src/tools/file_writer_hashing.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const assert = require('assert');
66
const FileWriter = require('../util/file_writer');
77
const config = require('../../config');
88
const nb_native = require('../util/nb_native');
9-
const stream_utils = require('../util/stream_utils');
109
const P = require('../util/promise');
1110
const stream = require('stream');
1211
const fs = require('fs');
@@ -72,12 +71,11 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
7271
}());
7372
const target = new TargetHash();
7473
const file_writer = new FileWriter({
75-
target_file: target,
74+
target_file: /**@type {any}*/ (target),
7675
fs_context: DEFAULT_FS_CONFIG,
7776
namespace_resource_id: 'MajesticSloth'
7877
});
79-
await stream_utils.pipeline([source_stream, file_writer]);
80-
await stream_utils.wait_finished(file_writer);
78+
await file_writer.write_entire_stream(source_stream);
8179
const write_hash = target.digest();
8280
console.log(
8381
'Hash target',
@@ -95,7 +93,7 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
9593

9694
async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
9795
config.NSFS_DEFAULT_IOV_MAX = iov_max;
98-
fs.mkdirSync(F_PREFIX);
96+
fs.mkdirSync(F_PREFIX, { recursive: true });
9997
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
10098
let target_file;
10199
const data = crypto.randomBytes(PART_SIZE);
@@ -114,8 +112,7 @@ async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
114112
fs_context: DEFAULT_FS_CONFIG,
115113
namespace_resource_id: 'MajesticSloth'
116114
});
117-
await stream_utils.pipeline([source_stream, file_writer]);
118-
await stream_utils.wait_finished(file_writer);
115+
await file_writer.write_entire_stream(source_stream);
119116
if (XATTR) {
120117
await target_file.replacexattr(
121118
DEFAULT_FS_CONFIG,

0 commit comments

Comments
 (0)