Skip to content

Commit 3fd0565

Browse files
committed
node: rewrite AlternatingFileWriter with react-native logic
1 parent f9deea0 commit 3fd0565

File tree

2 files changed

+122
-44
lines changed

2 files changed

+122
-44
lines changed
Lines changed: 112 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,137 @@
1-
import { NodeFileSystem, WritableStream } from '../storage/interfaces/NodeFileSystem';
1+
import { Writable } from 'stream';
2+
import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem';
23

34
export class AlternatingFileWriter {
4-
private _fileStream?: WritableStream;
5+
private _fileStream?: Writable;
56
private _count = 0;
7+
private _size = 0;
68
private _disposed = false;
79

10+
private readonly _logQueue: string[] = [];
11+
private _currentAppendedLog?: string;
12+
813
constructor(
14+
private readonly _fileSystem: NodeFileSystem,
915
private readonly _mainFile: string,
1016
private readonly _fallbackFile: string,
11-
private readonly _fileCapacity: number,
12-
private readonly _fileSystem: NodeFileSystem,
17+
private readonly _maxLines: number,
18+
private readonly _maxSize?: number,
1319
) {}
1420

15-
public async writeLine(value: string): Promise<this> {
16-
if (this._fileCapacity <= 0) {
17-
return this;
18-
}
19-
21+
public async writeLine(value: string): Promise<void> {
2022
if (this._disposed) {
2123
throw new Error('This instance has been disposed.');
2224
}
2325

26+
this._logQueue.push(value);
27+
if (!this._currentAppendedLog) {
28+
return await this.process();
29+
}
30+
}
31+
32+
private async process(): Promise<void> {
33+
this._currentAppendedLog = this._logQueue.shift();
34+
35+
if (!this._currentAppendedLog) {
36+
return;
37+
}
38+
39+
const appendLength = this._currentAppendedLog.length + 1;
40+
this.prepareBreadcrumbStream(appendLength);
41+
2442
if (!this._fileStream) {
25-
const stream = this.safeCreateStream(this._mainFile);
26-
if (!stream) {
27-
return this;
43+
this._logQueue.unshift(this._currentAppendedLog);
44+
this._currentAppendedLog = undefined;
45+
return;
46+
}
47+
48+
// if the queue is full and we can save more item in a batch
49+
// try to save as much as possible to speed up potential native operations
50+
this._count += 1;
51+
this._size += appendLength;
52+
53+
const logsToAppend = [this._currentAppendedLog];
54+
55+
let logsToTake = 0;
56+
let currentCount = this._count;
57+
let currentSize = this._size;
58+
59+
for (let i = 0; i < this._logQueue.length; i++) {
60+
const log = this._logQueue[i];
61+
if (!log) {
62+
continue;
2863
}
2964

30-
this._fileStream = stream;
31-
} else if (this._count >= this._fileCapacity) {
32-
this._fileStream.close();
33-
this.safeMoveMainToFallback();
34-
this._count = 0;
65+
const logLength = log.length + 1;
3566

36-
const stream = this.safeCreateStream(this._mainFile);
37-
if (!stream) {
38-
return this;
67+
if (currentCount + 1 > this._maxLines) {
68+
break;
3969
}
4070

41-
this._fileStream = stream;
71+
if (this._maxSize && currentSize + logLength >= this._maxSize) {
72+
break;
73+
}
74+
75+
logsToTake++;
76+
currentCount++;
77+
currentSize += logLength;
4278
}
4379

44-
await this.safeWriteAsync(this._fileStream, value + '\n');
45-
this._count++;
80+
const restAppendingLogs = this._logQueue.splice(0, logsToTake);
81+
this._count = this._count + restAppendingLogs.length;
82+
this._size += restAppendingLogs.reduce((sum, l) => sum + l.length + 1, 0);
83+
84+
logsToAppend.push(...restAppendingLogs);
4685

47-
return this;
86+
return await this.writeAsync(this._fileStream, logsToAppend.join('\n') + '\n')
87+
.catch(() => {
88+
// handle potential issues with appending logs.
89+
// we can't do really too much here other than retry
90+
// logging the error might also cause a breadcrumb loop, that we should try to avoid
91+
this._logQueue.unshift(...logsToAppend);
92+
})
93+
.finally(() => {
94+
if (this._logQueue.length !== 0) {
95+
return this.process();
96+
} else {
97+
this._currentAppendedLog = undefined;
98+
}
99+
});
48100
}
49101

50-
private safeWriteAsync(fs: WritableStream, data: string) {
51-
return new Promise<boolean>((resolve) => fs.write(data, (err) => (err ? resolve(false) : resolve(true))));
102+
private writeAsync(fs: Writable, data: string) {
103+
return new Promise<void>((resolve, reject) => fs.write(data, (err) => (err ? reject(err) : resolve())));
52104
}
53105

54-
public dispose() {
55-
this._fileStream?.close();
56-
this._disposed = true;
106+
private prepareBreadcrumbStream(newSize: number) {
107+
if (!this._fileStream) {
108+
this._fileStream = this.safeCreateStream(this._mainFile);
109+
} else if (this._count >= this._maxLines || (this._maxSize && this._size + newSize >= this._maxSize)) {
110+
this.switchFile();
111+
}
57112
}
58113

59-
private safeCreateStream(path: string) {
60-
try {
61-
return this._fileSystem.createWriteStream(path);
62-
} catch {
63-
return undefined;
114+
private switchFile() {
115+
if (this._fileStream) {
116+
this._fileStream.destroy();
64117
}
118+
119+
this._fileStream = undefined;
120+
121+
const renameResult = this.safeMoveMainToFallback();
122+
if (!renameResult) {
123+
return;
124+
}
125+
126+
this._fileStream = this.safeCreateStream(this._mainFile);
127+
128+
this._count = 0;
129+
this._size = 0;
130+
}
131+
132+
public dispose() {
133+
this._fileStream?.destroy();
134+
this._disposed = true;
65135
}
66136

67137
private safeMoveMainToFallback() {
@@ -72,4 +142,12 @@ export class AlternatingFileWriter {
72142
return false;
73143
}
74144
}
145+
146+
private safeCreateStream(path: string) {
147+
try {
148+
return this._fileSystem.createWriteStream(path);
149+
} catch {
150+
return undefined;
151+
}
152+
}
75153
}

packages/node/tests/common/alternatingFile.spec.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ describe('AlternatingFileWriter', () => {
3030
});
3131

3232
it('should add line to the main file', async () => {
33-
const writer = new AlternatingFileWriter(file1, file2, 10, new FsNodeFileSystem());
33+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 10);
3434
await writer.writeLine('value');
3535
writer.dispose();
3636

@@ -40,7 +40,7 @@ describe('AlternatingFileWriter', () => {
4040

4141
it('should not move main file to fallback file before adding with fileCapacity reached', async () => {
4242
const count = 5;
43-
const writer = new AlternatingFileWriter(file1, file2, count, new FsNodeFileSystem());
43+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count);
4444
for (let i = 0; i < count; i++) {
4545
await writer.writeLine(`value-${i}`);
4646
}
@@ -51,7 +51,7 @@ describe('AlternatingFileWriter', () => {
5151

5252
it('should move main file to fallback file after adding with fileCapacity reached', async () => {
5353
const count = 5;
54-
const writer = new AlternatingFileWriter(file1, file2, count, new FsNodeFileSystem());
54+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count);
5555
for (let i = 0; i < count; i++) {
5656
await writer.writeLine(`value-${i}`);
5757
}
@@ -66,7 +66,7 @@ describe('AlternatingFileWriter', () => {
6666

6767
it('should add line to the main file after adding with fileCapacity reached', async () => {
6868
const count = 5;
69-
const writer = new AlternatingFileWriter(file1, file2, count, new FsNodeFileSystem());
69+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count);
7070
for (let i = 0; i < count; i++) {
7171
await writer.writeLine(`value-${i}`);
7272
}
@@ -79,13 +79,13 @@ describe('AlternatingFileWriter', () => {
7979
});
8080

8181
it('should throw after adding line when disposed', async () => {
82-
const writer = new AlternatingFileWriter(file1, file2, 10, new FsNodeFileSystem());
82+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 10);
8383
writer.dispose();
8484
await expect(writer.writeLine('value-x')).rejects.toThrowError('This instance has been disposed.');
8585
});
8686

8787
it('should not write when fileCapacity is 0', () => {
88-
const writer = new AlternatingFileWriter(file1, file2, 0, new FsNodeFileSystem());
88+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 0);
8989
writer.writeLine('abc');
9090
writer.dispose();
9191

@@ -94,7 +94,7 @@ describe('AlternatingFileWriter', () => {
9494
});
9595

9696
it('should not write fileCapacity is less than 0', () => {
97-
const writer = new AlternatingFileWriter(file1, file2, -1, new FsNodeFileSystem());
97+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, -1);
9898
writer.writeLine('abc');
9999
writer.dispose();
100100

@@ -104,7 +104,7 @@ describe('AlternatingFileWriter', () => {
104104

105105
describe('stress test', () => {
106106
it('should not throw', async () => {
107-
const writer = new AlternatingFileWriter(file1, file2, 1, new FsNodeFileSystem());
107+
const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 1);
108108

109109
const write = async (count: number, entry: string) => {
110110
for (let i = 0; i < count; i++) {
@@ -116,7 +116,7 @@ describe('AlternatingFileWriter', () => {
116116
const writeCount = 100;
117117
const promises = [...new Array(writerCount)].map(() => write(writeCount, 'text'));
118118
await expect(Promise.all(promises)).resolves.not.toThrow();
119-
});
119+
}, 10000);
120120

121121
it('should not skip text', async () => {
122122
const fs = mockStreamFileSystem();
@@ -129,7 +129,7 @@ describe('AlternatingFileWriter', () => {
129129
return renameSync(oldPath, newPath);
130130
});
131131

132-
const writer = new AlternatingFileWriter(file1, file2, 1, fs);
132+
const writer = new AlternatingFileWriter(fs, file1, file2, 1);
133133

134134
const write = async (count: number, entry: string) => {
135135
for (let i = 0; i < count; i++) {

0 commit comments

Comments
 (0)