Skip to content

Streaming Callback within a pipeline #113

@hoschmieder

Description

@hoschmieder

Hi team,
i believe i am not the only one whe needs streaming_callback from multiple components within a pipeline.
Today there ist the streaming_generator function wich loops all streaming components but only register the last one.
I have added a streaming_generator_all function wich registers each streaming compoenten.

i have attached a test-pipline where you can see the process. (e.g. with OpenWeb-UI)

Maybe this could be a new feature for the next release?

Regards
Holger

new function in /hayhooks/server/pipelines/utils.py

def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator: queue: Queue[str] = Queue()

    def streaming_callback(chunk):
        queue.put(chunk.content)

    pipeline_run_args = pipeline_run_args.copy()

    for name, component in pipeline.walk():
        if hasattr(component, "streaming_callback"):
            if name not in pipeline_run_args:
                pipeline_run_args[name] = {}
            pipeline_run_args[name]["streaming_callback"] = streaming_callback

    def run_pipeline():
        try:
            pipeline.run(pipeline_run_args)
        finally:
            queue.put(None)

    thread = threading.Thread(target=run_pipeline)
    thread.start()

    while True:
        chunk = queue.get()
        if chunk is None:
            break
        yield chunk

    thread.join()

Test-Pipeline

`from pathlib import Path
import logging
import time
from typing import Type, Dict, Any, Optional, List,Union, Generator, Callable
from haystack import Pipeline, component
from hayhooks import BasePipelineWrapper, get_last_user_message
from hayhooks import streaming_generator_all as streaming_generator #Modifikation von HS
from haystack.core.serialization import DeserializationCallbacks
from haystack.dataclasses.document import Document
from haystack.dataclasses.chat_message import ChatMessage
from haystack.dataclasses import StreamingChunk



@component
class EchoA:    
    def __init__(self):
        self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None

    @component.output_types(output=str)
    def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
        for i in range(3):
            time.sleep(0.5)
            print(f"++++++ Streamin-Callback von EchoA",streaming_callback)
            if streaming_callback:
                print(f"************* Schritt (1) {i+1}: {text.lower()}\n")
                streaming_callback(StreamingChunk(content=f"🧠 Schritt (1) {i+1}: {text.lower()}\n"))

        final_answer = f"✅ Antwort von Echo1 auf '{text}': {text.upper()}"
        if streaming_callback:            
            streaming_callback(StreamingChunk(content=final_answer + "\n"))
        meta = [{
            "index": 0,
            "model": "test_callback",
            "finish_reason": "stop"
#            "finish_reason": "content_filter"
        }]

        output= {
                    "query": text,
                    "replies": [final_answer],
                    "meta": meta,                    
                }
        print("echo",output)
        return {"output": text}
        

@component
class EchoB:    
    def __init__(self):
        self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None

    @component.output_types(output=dict)
    def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
        print("IN echo2",text)
        for i in range(3):
            time.sleep(0.5)
            print(f"++++++ Streamin-Callback von EchoB",streaming_callback)
            if streaming_callback:
                print(f"************* Schritt (2) {i+1}: {text.lower()}")
                streaming_callback(StreamingChunk(content=f"🧠 Schritt (2) {i+1}: {text.lower()}\n"))

        final_answer = f"✅ Antwort ECHO2 auf  '{text}': {text.lower()}"
        if streaming_callback:
            streaming_callback(StreamingChunk(content=final_answer + "\n"))
        meta = [{
            "index": 0,
            "model": "test_callback",
            "finish_reason": "stop"
        }]
        output= {
                    "query": text,
                    "replies": [final_answer],
                    "meta": meta,                    
                }
        print("echo2",output)
        return output  



# 🧩 PipelineWrapper für Hayhooks/OpenWeb-UI
class PipelineWrapper(BasePipelineWrapper):
    def setup(self) -> None:
        pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
        callbacks = DeserializationCallbacks(component_pre_init=PipelineWrapper.component_pre_init_callback)
        self.pipeline = Pipeline.loads(pipeline_yaml, callbacks=callbacks)

    @staticmethod
    def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
        custom_components = {
            "echo1": EchoA,
            "echo2": EchoB,
        }
        print ("SSSSSSSSSSSSSSSSSSSSSSSSSSSS component_pre_init_callback",EchoA)
        if component_cls.__name__ in custom_components:
            return custom_components[component_cls.__name__](**init_params)
        return component_cls(**init_params)

    def run_api(self, query: str) -> dict:
        result = self.pipeline.run({"text": query})
        return result["output"]

    def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
        question = get_last_user_message(messages)
        stream = body.get("stream", True)

        return streaming_generator(
            pipeline=self.pipeline,
            pipeline_run_args={
                "echo1": {"text": question} ,
#                "echo2": {"text": question}                
            }
        )

#yml-file
components:
  echo1:
    type: pipeline_wrapper.EchoA

  echo2:
    type: pipeline_wrapper.EchoB

connections:
  - sender: echo1.output
    receiver: echo2

outputs:
  - receiver: echo1.output
  - receiver: echo2.output
`

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions