-
Notifications
You must be signed in to change notification settings - Fork 32
Open
Labels
Description
Hi team,
i believe i am not the only one whe needs streaming_callback from multiple components within a pipeline.
Today there ist the streaming_generator function wich loops all streaming components but only register the last one.
I have added a streaming_generator_all function wich registers each streaming compoenten.
i have attached a test-pipline where you can see the process. (e.g. with OpenWeb-UI)
Maybe this could be a new feature for the next release?
Regards
Holger
new function in /hayhooks/server/pipelines/utils.py
def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator: queue: Queue[str] = Queue()
def streaming_callback(chunk):
queue.put(chunk.content)
pipeline_run_args = pipeline_run_args.copy()
for name, component in pipeline.walk():
if hasattr(component, "streaming_callback"):
if name not in pipeline_run_args:
pipeline_run_args[name] = {}
pipeline_run_args[name]["streaming_callback"] = streaming_callback
def run_pipeline():
try:
pipeline.run(pipeline_run_args)
finally:
queue.put(None)
thread = threading.Thread(target=run_pipeline)
thread.start()
while True:
chunk = queue.get()
if chunk is None:
break
yield chunk
thread.join()
Test-Pipeline
`from pathlib import Path
import logging
import time
from typing import Type, Dict, Any, Optional, List,Union, Generator, Callable
from haystack import Pipeline, component
from hayhooks import BasePipelineWrapper, get_last_user_message
from hayhooks import streaming_generator_all as streaming_generator #Modifikation von HS
from haystack.core.serialization import DeserializationCallbacks
from haystack.dataclasses.document import Document
from haystack.dataclasses.chat_message import ChatMessage
from haystack.dataclasses import StreamingChunk
@component
class EchoA:
def __init__(self):
self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None
@component.output_types(output=str)
def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
for i in range(3):
time.sleep(0.5)
print(f"++++++ Streamin-Callback von EchoA",streaming_callback)
if streaming_callback:
print(f"************* Schritt (1) {i+1}: {text.lower()}\n")
streaming_callback(StreamingChunk(content=f"🧠 Schritt (1) {i+1}: {text.lower()}\n"))
final_answer = f"✅ Antwort von Echo1 auf '{text}': {text.upper()}"
if streaming_callback:
streaming_callback(StreamingChunk(content=final_answer + "\n"))
meta = [{
"index": 0,
"model": "test_callback",
"finish_reason": "stop"
# "finish_reason": "content_filter"
}]
output= {
"query": text,
"replies": [final_answer],
"meta": meta,
}
print("echo",output)
return {"output": text}
@component
class EchoB:
def __init__(self):
self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None
@component.output_types(output=dict)
def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
print("IN echo2",text)
for i in range(3):
time.sleep(0.5)
print(f"++++++ Streamin-Callback von EchoB",streaming_callback)
if streaming_callback:
print(f"************* Schritt (2) {i+1}: {text.lower()}")
streaming_callback(StreamingChunk(content=f"🧠 Schritt (2) {i+1}: {text.lower()}\n"))
final_answer = f"✅ Antwort ECHO2 auf '{text}': {text.lower()}"
if streaming_callback:
streaming_callback(StreamingChunk(content=final_answer + "\n"))
meta = [{
"index": 0,
"model": "test_callback",
"finish_reason": "stop"
}]
output= {
"query": text,
"replies": [final_answer],
"meta": meta,
}
print("echo2",output)
return output
# 🧩 PipelineWrapper für Hayhooks/OpenWeb-UI
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
callbacks = DeserializationCallbacks(component_pre_init=PipelineWrapper.component_pre_init_callback)
self.pipeline = Pipeline.loads(pipeline_yaml, callbacks=callbacks)
@staticmethod
def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
custom_components = {
"echo1": EchoA,
"echo2": EchoB,
}
print ("SSSSSSSSSSSSSSSSSSSSSSSSSSSS component_pre_init_callback",EchoA)
if component_cls.__name__ in custom_components:
return custom_components[component_cls.__name__](**init_params)
return component_cls(**init_params)
def run_api(self, query: str) -> dict:
result = self.pipeline.run({"text": query})
return result["output"]
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
question = get_last_user_message(messages)
stream = body.get("stream", True)
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={
"echo1": {"text": question} ,
# "echo2": {"text": question}
}
)
#yml-file
components:
echo1:
type: pipeline_wrapper.EchoA
echo2:
type: pipeline_wrapper.EchoB
connections:
- sender: echo1.output
receiver: echo2
outputs:
- receiver: echo1.output
- receiver: echo2.output
`