Skip to content

Commit 7b24e51

Browse files
committed
Refactor runPipe to use a task group for all asynchronous tasks
Change runPipe into another variant of run using variadic parameters for the configuration
1 parent a469904 commit 7b24e51

File tree

1 file changed

+59
-70
lines changed

1 file changed

+59
-70
lines changed

Sources/Subprocess/API.swift

Lines changed: 59 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,8 @@ public func run<
557557
)
558558
}
559559

560-
public enum PipeConfigurationError: Error {
561-
case emptyConfiguration
560+
public enum PipelineError: Error {
561+
case runErrors([Swift.Error])
562562
}
563563

564564
/// Run a series of `Configuration` asynchronously and returns
@@ -567,88 +567,77 @@ public enum PipeConfigurationError: Error {
567567
/// to standard output.
568568
///
569569
/// - Parameters:
570-
/// - configurations: The `Subprocess` configuration to run in a pipeline.
570+
/// - configurations: The `Subprocess` configurations for the start of the pipeline
571+
/// - last: The final configuration that produces the output
571572
/// - input: The input to send to the first executable.
572573
/// - output: The method to use for redirecting the standard output of the last executable.
573574
/// - error: The method to use for redirecting the standard error of all executables.
574575
/// - Returns a CollectedResult containing the result of the pipeline and the termination status of the final executable.
575-
public func runPipe<
576-
Input: InputProtocol,
577-
Output: OutputProtocol,
578-
Error: OutputProtocol
579-
>(
580-
_ pipeline: Configuration...,
581-
input: Input = .none,
582-
output: Output,
583-
error: Error = .discarded
584-
) async throws -> CollectedResult<Output, Error> {
585-
try await runPipe(pipeline, input: input, output: output, error: error)
586-
}
587-
588-
/// Run a series of `Configuration` asynchronously and returns
589-
/// a `CollectedResult` collecting the result of the final process at the
590-
/// end of a pipeline of all processes joined together with pipes from standard input
591-
/// to standard output.
592-
///
593-
/// - Parameters:
594-
/// - configurations: The `Subprocess` configuration to run in a pipeline.
595-
/// - input: The input to send to the first executable.
596-
/// - output: The method to use for redirecting the standard output of the last executable.
597-
/// - error: The method to use for redirecting the standard error of all executables.
598-
/// - Returns a CollectedResult containing the result of the pipeline and the termination status of the final executable.
599-
public func runPipe<
600-
Input: InputProtocol,
601-
Output: OutputProtocol,
602-
Error: OutputProtocol
603-
>(
604-
_ pipeline: [Configuration],
605-
input: Input = .none,
606-
output: Output,
607-
error: Error = .discarded
608-
) async throws -> CollectedResult<Output, Error> {
609-
guard !pipeline.isEmpty else {
610-
throw PipeConfigurationError.emptyConfiguration
576+
func run<Input: InputProtocol, Output, Error: OutputProtocol>(_ first: Configuration, _ pipeline: Configuration..., to last: Configuration, input: Input = .none, output: Output, error: Error = .discarded) async throws -> CollectedResult<Output, Error> {
577+
var pipes = [try FileDescriptor.pipe()]
578+
for _ in pipeline {
579+
pipes.append(try FileDescriptor.pipe())
611580
}
612581

613-
var prevInput: InputProtocol = input
614-
615-
for (idx, cmd) in pipeline.enumerated() {
616-
let currentInput = prevInput
617-
let currentOutput: any OutputProtocol
582+
let finalPipes = pipes
618583

619-
if pipeline.count == 1 {
620-
currentOutput = output
621-
} else if idx != pipeline.count - 1 {
622-
let (reader, writer) = try FileDescriptor.pipe()
623-
prevInput = .fileDescriptor(reader, closeAfterSpawningProcess: true)
624-
currentOutput = .fileDescriptor(writer, closeAfterSpawningProcess: true)
625-
} else {
626-
currentOutput = output
584+
let result = await withTaskGroup { taskGroup -> Result<CollectedResult<Output,Error>,PipelineError> in
585+
taskGroup.addTask { () -> Result<CollectedResult<Output,Error>?,Swift.Error> in
586+
do {
587+
_ = try await run(first, input: input, output: .fileDescriptor(finalPipes.first!.writeEnd, closeAfterSpawningProcess: true), error: error)
588+
return Result.success(nil)
589+
} catch {
590+
return Result.failure(error)
591+
}
627592
}
628593

629-
if idx == pipeline.count - 1 {
630-
let result = await Task {
631-
try await run(cmd, input: currentInput, output: output, error: error)
632-
}.result
633-
634-
return try result.get()
635-
} else {
636-
Task {
637-
if let currentOutput = currentOutput as? FileDescriptorOutput {
638-
_ = try await run(cmd, input: currentInput, output: currentOutput, error: error)
639-
} else {
640-
// In this case the task is guaranteed to not be the final configuration,
641-
// and therefore the output must be a file descriptor output from a pipe that was
642-
// created in the previous run through the loop.
643-
fatalError()
594+
for (idx, p) in pipeline.enumerated() {
595+
taskGroup.addTask { () -> Result<CollectedResult<Output,Error>?,Swift.Error> in
596+
do {
597+
_ = try await run(p, input: .fileDescriptor(finalPipes[idx+1].readEnd, closeAfterSpawningProcess: true), output: .fileDescriptor(finalPipes[idx].writeEnd, closeAfterSpawningProcess: true), error: error)
598+
return Result.success(nil)
599+
} catch {
600+
return Result.failure(error)
644601
}
645602
}
646603
}
604+
605+
taskGroup.addTask { () -> Result<CollectedResult<Output,Error>?,Swift.Error> in
606+
do {
607+
let collectedResult = try await run(last, input: .fileDescriptor(finalPipes.last!.readEnd, closeAfterSpawningProcess: true), output: output, error: error)
608+
return Result.success(collectedResult)
609+
} catch {
610+
return Result.failure(error)
611+
}
612+
}
613+
614+
var errors: [Swift.Error] = []
615+
var collection: CollectedResult<Output,Error>?
616+
617+
while let taskVal = await taskGroup.next() {
618+
switch taskVal {
619+
case .failure(let error):
620+
errors.append(error)
621+
case .success(.some(let c)):
622+
collection = c
623+
case .success(.none):
624+
continue
625+
}
626+
}
627+
628+
if errors.isEmpty {
629+
return Result.failure(PipelineError.runErrors(errors))
630+
}
631+
632+
return Result.success(collection!)
647633
}
648634

649-
// This should never happen because the list of configuration is guaranteed to be non-empty
650-
// from the guard above.
651-
fatalError()
635+
switch result {
636+
case .success(let collection):
637+
return collection
638+
case .failure(let pipelineError):
639+
throw pipelineError
640+
}
652641
}
653642

654643
/// Run an executable with given parameters specified by a `Configuration`

0 commit comments

Comments
 (0)