Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,89 @@ public func run<
)
}

public enum PipelineError: Error {
case runErrors([Swift.Error])
}

/// Run a series of `Configuration` asynchronously and returns
/// a `CollectedResult` collecting the result of the final process at the
/// end of a pipeline of all processes joined together with pipes from standard input
/// to standard output.
///
/// - Parameters:
/// - configurations: The `Subprocess` configurations for the start of the pipeline
/// - last: The final configuration that produces the output
/// - input: The input to send to the first executable.
/// - output: The method to use for redirecting the standard output of the last executable.
/// - error: The method to use for redirecting the standard error of all executables.
/// - Returns a CollectedResult containing the result of the pipeline and the termination status of the final executable.
func run<Input: InputProtocol, Output, Error: OutputProtocol>(_ first: Configuration, _ pipeline: Configuration..., to last: Configuration, input: Input = .none, output: Output, error: Error = .discarded) async throws -> CollectedResult<Output, Error> {
var pipes = [try FileDescriptor.pipe()]
for _ in pipeline {
pipes.append(try FileDescriptor.pipe())
}

let finalPipes = pipes

let result = await withTaskGroup { taskGroup -> Result<CollectedResult<Output,Error>,PipelineError> in
taskGroup.addTask { () -> Result<CollectedResult<Output,Error>?,Swift.Error> in
do {
_ = try await run(first, input: input, output: .fileDescriptor(finalPipes.first!.writeEnd, closeAfterSpawningProcess: true), error: error)
return Result.success(nil)
} catch {
return Result.failure(error)
}
}

for (idx, p) in pipeline.enumerated() {
taskGroup.addTask { () -> Result<CollectedResult<Output,Error>?,Swift.Error> in
do {
_ = try await run(p, input: .fileDescriptor(finalPipes[idx+1].readEnd, closeAfterSpawningProcess: true), output: .fileDescriptor(finalPipes[idx].writeEnd, closeAfterSpawningProcess: true), error: error)
return Result.success(nil)
} catch {
return Result.failure(error)
}
}
}

taskGroup.addTask { () -> Result<CollectedResult<Output,Error>?,Swift.Error> in
do {
let collectedResult = try await run(last, input: .fileDescriptor(finalPipes.last!.readEnd, closeAfterSpawningProcess: true), output: output, error: error)
return Result.success(collectedResult)
} catch {
return Result.failure(error)
}
}

var errors: [Swift.Error] = []
var collection: CollectedResult<Output,Error>?

while let taskVal = await taskGroup.next() {
switch taskVal {
case .failure(let error):
errors.append(error)
case .success(.some(let c)):
collection = c
case .success(.none):
continue
}
}

if errors.isEmpty {
return Result.failure(PipelineError.runErrors(errors))
}

return Result.success(collection!)
}

switch result {
case .success(let collection):
return collection
case .failure(let pipelineError):
throw pipelineError
}
}

/// Run an executable with given parameters specified by a `Configuration`
/// - Parameters:
/// - configuration: The `Subprocess` configuration to run.
Expand Down