Skip to content

Commit 06a42b1

Browse files
committed
subprocess: use promisify() and generators
1 parent 9368ab7 commit 06a42b1

File tree

2 files changed

+103
-124
lines changed

2 files changed

+103
-124
lines changed

ddterm/shell/extension.js

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -283,19 +283,14 @@ class EnabledExtension {
283283
});
284284

285285
this.service.connect('error', (service, ex) => {
286-
const log_collector = service.subprocess?.log_collector;
287-
288-
if (!log_collector) {
289-
this.notifications.show_error(ex);
290-
return;
291-
}
292-
293-
log_collector.collect().then(output => {
294-
this.notifications.show_error(ex, output);
295-
}).catch(ex2 => {
296-
logError(ex2, 'Failed to collect logs');
297-
this.notifications.show_error(ex);
298-
});
286+
(service.subprocess?.get_logs() ?? Promise.resolve()).then(
287+
output => {
288+
this.notifications.show_error(ex, output);
289+
}, ex2 => {
290+
logError(ex2, 'Failed to collect logs');
291+
this.notifications.show_error(ex);
292+
}
293+
);
299294
});
300295

301296
this.window_geometry = new WindowGeometry();

ddterm/shell/subprocess.js

Lines changed: 95 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import Meta from 'gi://Meta';
1111
import Gi from 'gi';
1212

1313
import { sd_journal_stream_fd } from './sd_journal.js';
14+
import { promisify } from '../util/promise.js';
1415

1516
function try_require(namespace, version = undefined) {
1617
try {
@@ -60,115 +61,102 @@ function shell_join(argv) {
6061
return argv.map(arg => GLib.shell_quote(arg)).join(' ');
6162
}
6263

63-
class JournalctlLogCollector {
64-
constructor(journalctl, since, pid) {
65-
this._argv = [
66-
journalctl,
67-
'--user',
68-
'-b',
69-
`--since=${since.format('%C%y-%m-%d %H:%M:%S UTC')}`,
70-
'-ocat',
71-
`-n${KEEP_LOG_LINES}`,
72-
`_PID=${pid}`,
73-
];
74-
}
64+
async function collect_journald_logs(journalctl, since, pid) {
65+
const argv = [
66+
journalctl,
67+
'--user',
68+
'-b',
69+
`--since=${since.format('%C%y-%m-%d %H:%M:%S UTC')}`,
70+
'-ocat',
71+
`-n${KEEP_LOG_LINES}`,
72+
];
73+
74+
if (pid)
75+
argv.push(`_PID=${pid}`);
76+
77+
const proc = Gio.Subprocess.new(
78+
argv,
79+
Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDERR_MERGE
80+
);
7581

76-
_begin(resolve, reject) {
77-
const proc = Gio.Subprocess.new(
78-
this._argv,
79-
Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDERR_MERGE
80-
);
82+
const communicate = promisify(proc.communicate_async, proc.communicate_finish);
83+
const [, stdout_buf] = await communicate.call(proc, null, null);
8184

82-
proc.communicate_utf8_async(null, null, this._finish.bind(this, resolve, reject));
83-
}
85+
return new TextDecoder().decode(stdout_buf);
86+
}
8487

85-
_finish(resolve, reject, source, result) {
86-
try {
87-
const [, stdout_buf] = source.communicate_utf8_finish(result);
88-
resolve(stdout_buf);
89-
} catch (ex) {
90-
reject(ex);
91-
}
92-
}
88+
async function *read_chunks(input_stream) {
89+
const read_bytes =
90+
promisify(input_stream.read_bytes_async, input_stream.read_bytes_finish);
9391

94-
collect() {
95-
return new Promise(this._begin.bind(this));
96-
}
97-
}
92+
try {
93+
for (;;) {
94+
// eslint-disable-next-line no-await-in-loop
95+
const chunk = await read_bytes.call(input_stream, 4096, GLib.PRIORITY_DEFAULT, null);
9896

99-
class TeeLogCollector {
100-
constructor(stream) {
101-
this._input = stream;
102-
this._output = new UnixOutputStream({ fd: STDERR_FD, close_fd: false });
103-
this._collected = [];
104-
this._collected_lines = 0;
105-
this._promise = new Promise((resolve, reject) => {
106-
this._resolve = resolve;
107-
this._reject = reject;
108-
});
109-
110-
this._read_more();
111-
}
97+
if (chunk.get_size() === 0)
98+
return;
11299

113-
_read_more() {
114-
this._input.read_bytes_async(4096, GLib.PRIORITY_DEFAULT, null, this._read_done.bind(this));
100+
yield chunk.toArray();
101+
}
102+
} finally {
103+
input_stream.close(null);
115104
}
105+
}
116106

117-
_read_done(source, result) {
118-
try {
119-
const chunk = source.read_bytes_finish(result).toArray();
107+
function *split_array_keep_delimiter(bytes, delimiter) {
108+
let start = 0;
120109

121-
if (chunk.length === 0) {
122-
this._input.close(null);
123-
this._output.close(null);
124-
this._resolve();
125-
return;
126-
}
110+
for (;;) {
111+
let end = bytes.indexOf(delimiter, start);
112+
113+
if (end === -1)
114+
break;
127115

128-
const delimiter = '\n'.charCodeAt(0);
129-
let start = 0;
116+
yield bytes.subarray(start, end + 1);
130117

131-
for (;;) {
132-
let end = chunk.indexOf(delimiter, start);
118+
start = end + 1;
119+
}
133120

134-
if (end === -1) {
135-
if (start < chunk.length)
136-
this._collected.push(chunk.subarray(start));
121+
yield bytes.subarray(start);
122+
}
137123

138-
break;
139-
}
124+
async function collect_stdio_logs(input_stream) {
125+
const delimiter = '\n'.charCodeAt(0);
126+
const collected = [];
127+
let lines = 0;
128+
const stderr = new UnixOutputStream({ fd: STDERR_FD, close_fd: false });
140129

141-
this._collected.push(chunk.subarray(start, end + 1));
142-
this._collected_lines += 1;
130+
for await (const chunk of read_chunks(input_stream)) {
131+
// I hope sync/blocking writes to stderr are fine.
132+
// After all, this is the same thing that printerr() does.
133+
stderr.write_all(chunk, null);
143134

144-
start = end + 1;
145-
}
135+
for (const sub_chunk of split_array_keep_delimiter(chunk, delimiter)) {
136+
collected.push(sub_chunk);
146137

147-
let remove = 0;
138+
if (sub_chunk.at(-1) === delimiter)
139+
lines += 1;
140+
}
148141

149-
while (this._collected_lines > KEEP_LOG_LINES) {
150-
const remove_chunk = this._collected[remove];
142+
let remove = 0;
151143

152-
remove += 1;
144+
while (lines > KEEP_LOG_LINES) {
145+
const remove_chunk = collected[remove];
153146

154-
if (remove_chunk[remove_chunk.length - 1] === delimiter)
155-
this._collected_lines -= 1;
156-
}
147+
remove += 1;
157148

158-
this._collected.splice(0, remove);
159-
this._output.write(chunk, null);
160-
this._read_more();
161-
} catch (ex) {
162-
this._reject(ex);
149+
if (remove_chunk.at(-1) === delimiter)
150+
lines -= 1;
163151
}
152+
153+
if (remove > 0)
154+
collected.splice(0, remove);
164155
}
165156

166-
async collect() {
167-
await this._promise;
157+
const decoder = new TextDecoder();
168158

169-
const decoder = new TextDecoder();
170-
return this._collected.map(line => decoder.decode(line)).join('\n');
171-
}
159+
return collected.map(v => decoder.decode(v)).join('');
172160
}
173161

174162
export const Subprocess = GObject.registerClass({
@@ -213,10 +201,6 @@ export const Subprocess = GObject.registerClass({
213201
? make_subprocess_launcher_journald(this.journal_identifier)
214202
: make_subprocess_launcher_fallback();
215203

216-
const launch_context = global.create_app_launch_context(0, -1);
217-
218-
subprocess_launcher.set_environ(launch_context.get_environment());
219-
220204
for (const extra_env of this.environ) {
221205
const split_pos = extra_env.indexOf('=');
222206
const name = extra_env.slice(0, split_pos);
@@ -231,13 +215,18 @@ export const Subprocess = GObject.registerClass({
231215
subprocess_launcher.close();
232216
}
233217

234-
this.log_collector = logging_to_journald
235-
? new JournalctlLogCollector(journalctl, start_date, this._subprocess.get_identifier())
236-
: new TeeLogCollector(this._subprocess.get_stdout_pipe());
218+
const pid = this._subprocess.get_identifier();
219+
220+
this._get_logs = logging_to_journald
221+
? collect_journald_logs.bind(globalThis, journalctl, start_date, pid)
222+
: collect_stdio_logs(this._subprocess.get_stdout_pipe()).catch(logError);
223+
224+
if (!pid)
225+
return;
237226

238227
GnomeDesktop.start_systemd_scope(
239228
this.journal_identifier,
240-
parseInt(this._subprocess.get_identifier(), 10),
229+
parseInt(pid, 10),
241230
null,
242231
null,
243232
null,
@@ -261,33 +250,28 @@ export const Subprocess = GObject.registerClass({
261250
}
262251

263252
wait(cancellable = null) {
264-
return new Promise((resolve, reject) => {
265-
this.g_subprocess.wait_async(cancellable, (source, result) => {
266-
try {
267-
resolve(source.wait_finish(result));
268-
} catch (ex) {
269-
reject(ex);
270-
}
271-
});
272-
});
253+
const { wait_async, wait_finish } = this.g_subprocess;
254+
255+
return promisify(wait_async, wait_finish).call(this.g_subprocess, cancellable);
273256
}
274257

275258
wait_check(cancellable = null) {
276-
return new Promise((resolve, reject) => {
277-
this.g_subprocess.wait_check_async(cancellable, (source, result) => {
278-
try {
279-
resolve(source.wait_check_finish(result));
280-
} catch (ex) {
281-
reject(ex);
282-
}
283-
});
284-
});
259+
const { wait_check_async, wait_check_finish } = this.g_subprocess;
260+
261+
return promisify(wait_check_async, wait_check_finish).call(this.g_subprocess, cancellable);
285262
}
286263

287264
terminate() {
288265
this.g_subprocess.send_signal(SIGTERM);
289266
}
290267

268+
get_logs() {
269+
if (this._get_logs instanceof Function)
270+
return this._get_logs();
271+
272+
return this._get_logs;
273+
}
274+
291275
_spawn(subprocess_launcher) {
292276
log(`Starting subprocess: ${shell_join(this.argv)}`);
293277
return subprocess_launcher.spawnv(this.argv);

0 commit comments

Comments
 (0)