Skip to content

Commit c8f6ad7

Browse files
committed
subprocess: use promisify() and generators
1 parent 8a0c02a commit c8f6ad7

File tree

3 files changed

+110
-138
lines changed

3 files changed

+110
-138
lines changed

ddterm/shell/extension.js

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -283,19 +283,14 @@ class EnabledExtension {
283283
});
284284

285285
this.service.connect('error', (service, ex) => {
286-
const log_collector = service.subprocess?.log_collector;
287-
288-
if (!log_collector) {
289-
this.notifications.show_error(ex);
290-
return;
291-
}
292-
293-
log_collector.collect().then(output => {
294-
this.notifications.show_error(ex, output);
295-
}).catch(ex2 => {
296-
logError(ex2, 'Failed to collect logs');
297-
this.notifications.show_error(ex);
298-
});
286+
service.subprocess.get_logs().then(
287+
output => {
288+
this.notifications.show_error(ex, output);
289+
}, ex2 => {
290+
logError(ex2, 'Failed to collect logs');
291+
this.notifications.show_error(ex);
292+
}
293+
);
299294
});
300295

301296
this.window_geometry = new WindowGeometry();

ddterm/shell/service.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ export const Service = GObject.registerClass({
177177
];
178178

179179
const params = {
180-
journal_identifier: this.bus_name,
181-
argv,
180+
app_id: this.bus_name,
182181
environ: this.extra_env,
182+
argv,
183183
};
184184

185185
if (this.wayland)

ddterm/shell/subprocess.js

Lines changed: 100 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import Meta from 'gi://Meta';
1111
import Gi from 'gi';
1212

1313
import { sd_journal_stream_fd } from './sd_journal.js';
14+
import { promisify } from '../util/promise.js';
1415

1516
function try_require(namespace, version = undefined) {
1617
try {
@@ -50,7 +51,7 @@ function make_subprocess_launcher_journald(journal_identifier) {
5051
return subprocess_launcher;
5152
}
5253

53-
function make_subprocess_launcher_fallback() {
54+
function make_subprocess_launcher_stdio() {
5455
return Gio.SubprocessLauncher.new(
5556
Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDERR_MERGE
5657
);
@@ -60,121 +61,108 @@ function shell_join(argv) {
6061
return argv.map(arg => GLib.shell_quote(arg)).join(' ');
6162
}
6263

63-
class JournalctlLogCollector {
64-
constructor(journalctl, since, pid) {
65-
this._argv = [
66-
journalctl,
67-
'--user',
68-
'-b',
69-
`--since=${since.format('%C%y-%m-%d %H:%M:%S UTC')}`,
70-
'-ocat',
71-
`-n${KEEP_LOG_LINES}`,
72-
`_PID=${pid}`,
73-
];
74-
}
64+
async function collect_journald_logs(journalctl, since, pid) {
65+
const argv = [
66+
journalctl,
67+
'--user',
68+
'-b',
69+
`--since=${since.format('%C%y-%m-%d %H:%M:%S UTC')}`,
70+
'-ocat',
71+
`-n${KEEP_LOG_LINES}`,
72+
];
73+
74+
if (pid)
75+
argv.push(`_PID=${pid}`);
76+
77+
const proc = Gio.Subprocess.new(
78+
argv,
79+
Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDERR_MERGE
80+
);
7581

76-
_begin(resolve, reject) {
77-
const proc = Gio.Subprocess.new(
78-
this._argv,
79-
Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDERR_MERGE
80-
);
82+
const communicate = promisify(proc.communicate_async, proc.communicate_finish);
83+
const [, stdout_buf] = await communicate.call(proc, null, null);
8184

82-
proc.communicate_utf8_async(null, null, this._finish.bind(this, resolve, reject));
83-
}
85+
return new TextDecoder().decode(stdout_buf);
86+
}
8487

85-
_finish(resolve, reject, source, result) {
86-
try {
87-
const [, stdout_buf] = source.communicate_utf8_finish(result);
88-
resolve(stdout_buf);
89-
} catch (ex) {
90-
reject(ex);
91-
}
92-
}
88+
async function *read_chunks(input_stream) {
89+
const read_bytes =
90+
promisify(input_stream.read_bytes_async, input_stream.read_bytes_finish);
9391

94-
collect() {
95-
return new Promise(this._begin.bind(this));
96-
}
97-
}
92+
try {
93+
for (;;) {
94+
// eslint-disable-next-line no-await-in-loop
95+
const chunk = await read_bytes.call(input_stream, 4096, GLib.PRIORITY_DEFAULT, null);
9896

99-
class TeeLogCollector {
100-
constructor(stream) {
101-
this._input = stream;
102-
this._output = new UnixOutputStream({ fd: STDERR_FD, close_fd: false });
103-
this._collected = [];
104-
this._collected_lines = 0;
105-
this._promise = new Promise((resolve, reject) => {
106-
this._resolve = resolve;
107-
this._reject = reject;
108-
});
109-
110-
this._read_more();
111-
}
97+
if (chunk.get_size() === 0)
98+
return;
11299

113-
_read_more() {
114-
this._input.read_bytes_async(4096, GLib.PRIORITY_DEFAULT, null, this._read_done.bind(this));
100+
yield chunk.toArray();
101+
}
102+
} finally {
103+
input_stream.close(null);
115104
}
105+
}
116106

117-
_read_done(source, result) {
118-
try {
119-
const chunk = source.read_bytes_finish(result).toArray();
107+
function *split_array_keep_delimiter(bytes, delimiter) {
108+
let start = 0;
120109

121-
if (chunk.length === 0) {
122-
this._input.close(null);
123-
this._output.close(null);
124-
this._resolve();
125-
return;
126-
}
110+
for (;;) {
111+
let end = bytes.indexOf(delimiter, start);
112+
113+
if (end === -1)
114+
break;
127115

128-
const delimiter = '\n'.charCodeAt(0);
129-
let start = 0;
116+
yield bytes.subarray(start, end + 1);
130117

131-
for (;;) {
132-
let end = chunk.indexOf(delimiter, start);
118+
start = end + 1;
119+
}
133120

134-
if (end === -1) {
135-
if (start < chunk.length)
136-
this._collected.push(chunk.subarray(start));
121+
yield bytes.subarray(start);
122+
}
137123

138-
break;
139-
}
124+
async function collect_stdio_logs(input_stream) {
125+
const delimiter = '\n'.charCodeAt(0);
126+
const collected = [];
127+
let lines = 0;
128+
const stderr = new UnixOutputStream({ fd: STDERR_FD, close_fd: false });
140129

141-
this._collected.push(chunk.subarray(start, end + 1));
142-
this._collected_lines += 1;
130+
for await (const chunk of read_chunks(input_stream)) {
131+
// I hope sync/blocking writes to stderr are fine.
132+
// After all, this is the same thing that printerr() does.
133+
stderr.write_all(chunk, null);
143134

144-
start = end + 1;
145-
}
135+
for (const sub_chunk of split_array_keep_delimiter(chunk, delimiter)) {
136+
collected.push(sub_chunk);
146137

147-
let remove = 0;
138+
if (sub_chunk.at(-1) === delimiter)
139+
lines += 1;
140+
}
148141

149-
while (this._collected_lines > KEEP_LOG_LINES) {
150-
const remove_chunk = this._collected[remove];
142+
let remove = 0;
151143

152-
remove += 1;
144+
while (lines > KEEP_LOG_LINES) {
145+
const remove_chunk = collected[remove];
153146

154-
if (remove_chunk[remove_chunk.length - 1] === delimiter)
155-
this._collected_lines -= 1;
156-
}
147+
remove += 1;
157148

158-
this._collected.splice(0, remove);
159-
this._output.write(chunk, null);
160-
this._read_more();
161-
} catch (ex) {
162-
this._reject(ex);
149+
if (remove_chunk.at(-1) === delimiter)
150+
lines -= 1;
163151
}
152+
153+
if (remove > 0)
154+
collected.splice(0, remove);
164155
}
165156

166-
async collect() {
167-
await this._promise;
157+
const decoder = new TextDecoder();
168158

169-
const decoder = new TextDecoder();
170-
return this._collected.map(line => decoder.decode(line)).join('\n');
171-
}
159+
return collected.map(v => decoder.decode(v)).join('');
172160
}
173161

174162
export const Subprocess = GObject.registerClass({
175163
Properties: {
176-
'journal-identifier': GObject.ParamSpec.string(
177-
'journal-identifier',
164+
'app-id': GObject.ParamSpec.string(
165+
'app-id',
178166
null,
179167
null,
180168
GObject.ParamFlags.READWRITE | GObject.ParamFlags.CONSTRUCT_ONLY,
@@ -210,12 +198,8 @@ export const Subprocess = GObject.registerClass({
210198
const journalctl = GLib.find_program_in_path('journalctl');
211199
const logging_to_journald = journalctl && GLib.log_writer_is_journald?.(STDOUT_FD);
212200
const subprocess_launcher = logging_to_journald
213-
? make_subprocess_launcher_journald(this.journal_identifier)
214-
: make_subprocess_launcher_fallback();
215-
216-
const launch_context = global.create_app_launch_context(0, -1);
217-
218-
subprocess_launcher.set_environ(launch_context.get_environment());
201+
? make_subprocess_launcher_journald(this.app_id)
202+
: make_subprocess_launcher_stdio();
219203

220204
for (const extra_env of this.environ) {
221205
const split_pos = extra_env.indexOf('=');
@@ -231,18 +215,16 @@ export const Subprocess = GObject.registerClass({
231215
subprocess_launcher.close();
232216
}
233217

234-
this.log_collector = logging_to_journald
235-
? new JournalctlLogCollector(journalctl, start_date, this._subprocess.get_identifier())
236-
: new TeeLogCollector(this._subprocess.get_stdout_pipe());
218+
const pid = this._subprocess.get_identifier();
237219

238-
GnomeDesktop.start_systemd_scope(
239-
this.journal_identifier,
240-
parseInt(this._subprocess.get_identifier(), 10),
241-
null,
242-
null,
243-
null,
244-
null
245-
);
220+
this._get_logs = logging_to_journald
221+
? collect_journald_logs.bind(globalThis, journalctl, start_date, pid)
222+
: collect_stdio_logs(this._subprocess.get_stdout_pipe()).catch(logError);
223+
224+
if (!pid)
225+
return;
226+
227+
GnomeDesktop.start_systemd_scope(this.app_id, parseInt(pid, 10), null, null, null, null);
246228
}
247229

248230
get g_subprocess() {
@@ -261,33 +243,28 @@ export const Subprocess = GObject.registerClass({
261243
}
262244

263245
wait(cancellable = null) {
264-
return new Promise((resolve, reject) => {
265-
this.g_subprocess.wait_async(cancellable, (source, result) => {
266-
try {
267-
resolve(source.wait_finish(result));
268-
} catch (ex) {
269-
reject(ex);
270-
}
271-
});
272-
});
246+
const { wait_async, wait_finish } = this.g_subprocess;
247+
248+
return promisify(wait_async, wait_finish).call(this.g_subprocess, cancellable);
273249
}
274250

275251
wait_check(cancellable = null) {
276-
return new Promise((resolve, reject) => {
277-
this.g_subprocess.wait_check_async(cancellable, (source, result) => {
278-
try {
279-
resolve(source.wait_check_finish(result));
280-
} catch (ex) {
281-
reject(ex);
282-
}
283-
});
284-
});
252+
const { wait_check_async, wait_check_finish } = this.g_subprocess;
253+
254+
return promisify(wait_check_async, wait_check_finish).call(this.g_subprocess, cancellable);
285255
}
286256

287257
terminate() {
288258
this.g_subprocess.send_signal(SIGTERM);
289259
}
290260

261+
get_logs() {
262+
if (this._get_logs instanceof Function)
263+
return this._get_logs();
264+
265+
return this._get_logs;
266+
}
267+
291268
_spawn(subprocess_launcher) {
292269
log(`Starting subprocess: ${shell_join(this.argv)}`);
293270
return subprocess_launcher.spawnv(this.argv);

0 commit comments

Comments
 (0)