Skip to content

Commit a469904

Browse files
committed
Create a runPipe that sets up a multi-process pipeline from multiple Configurations
1 parent 8e5ddd2 commit a469904

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

Sources/Subprocess/API.swift

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,100 @@ public func run<
557557
)
558558
}
559559

560+
public enum PipeConfigurationError: Error {
561+
case emptyConfiguration
562+
}
563+
564+
/// Run a series of `Configuration` asynchronously and returns
565+
/// a `CollectedResult` collecting the result of the final process at the
566+
/// end of a pipeline of all processes joined together with pipes from standard input
567+
/// to standard output.
568+
///
569+
/// - Parameters:
570+
/// - configurations: The `Subprocess` configuration to run in a pipeline.
571+
/// - input: The input to send to the first executable.
572+
/// - output: The method to use for redirecting the standard output of the last executable.
573+
/// - error: The method to use for redirecting the standard error of all executables.
574+
/// - Returns a CollectedResult containing the result of the pipeline and the termination status of the final executable.
575+
public func runPipe<
576+
Input: InputProtocol,
577+
Output: OutputProtocol,
578+
Error: OutputProtocol
579+
>(
580+
_ pipeline: Configuration...,
581+
input: Input = .none,
582+
output: Output,
583+
error: Error = .discarded
584+
) async throws -> CollectedResult<Output, Error> {
585+
try await runPipe(pipeline, input: input, output: output, error: error)
586+
}
587+
588+
/// Run a series of `Configuration` asynchronously and returns
589+
/// a `CollectedResult` collecting the result of the final process at the
590+
/// end of a pipeline of all processes joined together with pipes from standard input
591+
/// to standard output.
592+
///
593+
/// - Parameters:
594+
/// - configurations: The `Subprocess` configuration to run in a pipeline.
595+
/// - input: The input to send to the first executable.
596+
/// - output: The method to use for redirecting the standard output of the last executable.
597+
/// - error: The method to use for redirecting the standard error of all executables.
598+
/// - Returns a CollectedResult containing the result of the pipeline and the termination status of the final executable.
599+
public func runPipe<
600+
Input: InputProtocol,
601+
Output: OutputProtocol,
602+
Error: OutputProtocol
603+
>(
604+
_ pipeline: [Configuration],
605+
input: Input = .none,
606+
output: Output,
607+
error: Error = .discarded
608+
) async throws -> CollectedResult<Output, Error> {
609+
guard !pipeline.isEmpty else {
610+
throw PipeConfigurationError.emptyConfiguration
611+
}
612+
613+
var prevInput: InputProtocol = input
614+
615+
for (idx, cmd) in pipeline.enumerated() {
616+
let currentInput = prevInput
617+
let currentOutput: any OutputProtocol
618+
619+
if pipeline.count == 1 {
620+
currentOutput = output
621+
} else if idx != pipeline.count - 1 {
622+
let (reader, writer) = try FileDescriptor.pipe()
623+
prevInput = .fileDescriptor(reader, closeAfterSpawningProcess: true)
624+
currentOutput = .fileDescriptor(writer, closeAfterSpawningProcess: true)
625+
} else {
626+
currentOutput = output
627+
}
628+
629+
if idx == pipeline.count - 1 {
630+
let result = await Task {
631+
try await run(cmd, input: currentInput, output: output, error: error)
632+
}.result
633+
634+
return try result.get()
635+
} else {
636+
Task {
637+
if let currentOutput = currentOutput as? FileDescriptorOutput {
638+
_ = try await run(cmd, input: currentInput, output: currentOutput, error: error)
639+
} else {
640+
// In this case the task is guaranteed to not be the final configuration,
641+
// and therefore the output must be a file descriptor output from a pipe that was
642+
// created in the previous run through the loop.
643+
fatalError()
644+
}
645+
}
646+
}
647+
}
648+
649+
// This should never happen because the list of configuration is guaranteed to be non-empty
650+
// from the guard above.
651+
fatalError()
652+
}
653+
560654
/// Run an executable with given parameters specified by a `Configuration`
561655
/// - Parameters:
562656
/// - configuration: The `Subprocess` configuration to run.

0 commit comments

Comments
 (0)