Skip to content
This repository was archived by the owner on Sep 23, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions examples/inference/api_server_openai/query_openai_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
help="If set to float < 1, only the smallest set of most probable tokens with probabilities that add up to`Top p` or higher are kept for generation",
)

parser.add_argument(
"--debug_mode",
action="store_true",
help="If debug mode is enabled, debug logs will be printed",
)

args = parser.parse_args()

if "OPENAI_API_KEY" in os.environ:
Expand All @@ -65,6 +71,7 @@ def stream_chat():
max_tokens=args.max_new_tokens,
temperature=args.temperature,
top_p=args.top_p,
debug_mode=args.debug_mode,
):
content = chunk.choices[0].delta.content
if content is not None:
Expand All @@ -81,6 +88,7 @@ def chunk_chat():
max_tokens=args.max_new_tokens,
temperature=args.temperature,
top_p=args.top_p,
debug_mode=args.debug_mode,
)
for chunk in [output]:
try:
Expand Down
10 changes: 6 additions & 4 deletions llm_on_ray/inference/api_openai_backend/query_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ async def query(self, model: str, prompt: Prompt, request_id: str, streaming_rep
top_p = request_config.get("top_p", 1.0)
max_new_tokens = request_config.get("max_tokens", None)
gen_config = {"max_new_tokens": max_new_tokens, "temperature": temperature, "top_p": top_p}
if temperature != 1.0 or top_p != 1.0:
gen_config.update({"do_sample": True})
if request_config.get("ignore_eos", False):
gen_config.update({"ignore_eos": True})
gen_config.update({"do_sample": temperature != 1.0 or top_p != 1.0})
gen_config.update({"ignore_eos": request_config.get("ignore_eos", False)})

if request_config.get("debug_mode", False):
print("DEBUG: print request_config:", request_config)
# TODO: set debug mode in request_config, add and set debug mode to gen_config, since gen_config is the config to be passed down

async for x in handle_request(
model=model,
Expand Down
1 change: 1 addition & 0 deletions llm_on_ray/inference/inference_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class InferenceConfig(BaseModel):
ipex: Ipex = Ipex()
hpu_model_config: HpuModelConfig = HpuModelConfig()
model_description: ModelDescription = ModelDescription()
debug_mode: bool = False

# prevent warning of protected namespaces
# DO NOT TOUCH
Expand Down
17 changes: 14 additions & 3 deletions llm_on_ray/inference/predictor_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,13 @@ async def __call__(self, http_request: Request) -> Union[StreamingResponse, JSON
content="Empty prompt is not supported.",
)
config = json_request["config"] if "config" in json_request else {}
if config.get("debug_mode", False):
print("DEBUG:predictor_deployment.py:print config received from json:", config)
print("DEBUG:predictor_deployment.py::print inputs for prompts:", input)
# return prompt or list of prompts preprocessed
prompts = self.preprocess_prompts(input)
if config.get("debug_mode", False):
print("DEBUG:predictor_deployment.py::print prompts from inputs:", prompts)

# Handle streaming response
if streaming_response:
Expand All @@ -416,12 +421,18 @@ async def openai_call(
):
self.use_openai = True

# TODO: Pass down config into preprocess_prompts for more logs.
if config.get("debug_mode", False):
print("DEBUG:predictor_deployment.py:print config received from query_client:", config)
print("DEBUG:predictor_deployment.py::print inputs for prompts:", input)
# return prompt or list of prompts preprocessed
input = self.preprocess_prompts(input, tools, tool_choice)
prompts = self.preprocess_prompts(input, tools, tool_choice)
if config.get("debug_mode", False):
print("DEBUG:predictor_deployment.py::print prompts from inputs:", prompts)

# Handle streaming response
if streaming_response:
async for result in self.handle_streaming(input, config):
async for result in self.handle_streaming(prompts, config):
yield result
else:
yield await self.handle_non_streaming(input, config)
yield await self.handle_non_streaming(prompts, config)
26 changes: 15 additions & 11 deletions llm_on_ray/inference/predictors/hpu_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,19 @@ def __init__(self, infer_conf: InferenceConfig):
# decide correct torch dtype for loading HF model
decide_torch_dtype(infer_conf)

debug_mode = infer_conf.debug_mode

if debug_mode:
print("DEBUG:hpu_predictor:print inference config:", infer_conf)

self.use_lazy_mode = not infer_conf.hpu_model_config.torch_compile
self.use_hpu_graphs = infer_conf.hpu_model_config.use_hpu_graphs

# optimize transformers for gaudi
from optimum.habana.transformers.modeling_utils import adapt_transformers_to_gaudi

adapt_transformers_to_gaudi()

Comment on lines +90 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this function here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this function out of this if:

Both with deepspeed or not will execute this function.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand from the PR title, this PR is to add debug mode, why touch other code? Could you submit a separate PR to address other issues.

if infer_conf.deepspeed:
# DeepSpeed is enabled, start worker group
# Prepare placement group
Expand All @@ -105,13 +115,6 @@ def __init__(self, infer_conf: InferenceConfig):

htcore.hpu_set_env()

# Tweak transformer to optimize performance on Gaudi
from optimum.habana.transformers.modeling_utils import (
adapt_transformers_to_gaudi,
)

adapt_transformers_to_gaudi()

self.device = torch.device("hpu")
model = AutoModelForCausalLM.from_pretrained(
model_desc.model_id_or_path, **model_desc.config.dict()
Expand Down Expand Up @@ -181,6 +184,7 @@ def _process_config(self, config):

def get_streamer(self):
if self.infer_conf.deepspeed:
# Q2: Why always use the first worker?
return ray.get(self.deepspeed_workers[0].get_streamer.remote())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @xwu99 , Could you please help explain this question, I think it is the same idea as in deepspeed_predictor.py.

Copy link

@xwu-intel xwu-intel Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a distributed inference which involving a group of worker processes, worker 0 is assigned as rank 0 (according to the standard MPI model) which is the main rank to return the result, other ranks only engage in calculation, not returning the result. In fact, all ranks holding the same result in this case. You can consider rank 0 is head process of the distributed worker group that is usually used to return the result.

else:
return TextIteratorStreamer(
Expand All @@ -196,6 +200,8 @@ def generate(self, input: GenerateInput, **config) -> GenerateOutput:

self._process_config(config)

# TODO: Maybe we should get realtime load info of all cards, set a heathy usage ratio and pick the usable cards for serving.
# So that some errors like OOM can be prevented, and the server will be more robust.
if self.infer_conf.deepspeed:
return ray.get(
[worker.generate.remote(prompt, **config) for worker in self.deepspeed_workers]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using the fixed work, maybe we should spread the load to all cards when deepspeed is enabled.

Copy link

@xwu-intel xwu-intel Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently tensor parallelism is used to process single request, 1 process per card is the industrial best practice given the load is balanced. You might think each request send to different card, that is not the case.

Expand All @@ -219,7 +225,9 @@ def generate(self, input: GenerateInput, **config) -> GenerateOutput:

def streaming_generate(self, prompt, streamer, **config):
self._process_config(config)
# Q1: Why it is handled here when using both deepspeed and hpu?
if self.infer_conf.deepspeed:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here in hpu_predictor,py, it is a little bit confused since we have another predictor called deepspeed_predicotr.
Two predictors are for hpu and cpu, maybe we can change the name of deepspeed_predicotr, like cpu or base predictor.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a TODO comment to consolidate these two predictors.

# Q2: Why always use the first worker?
self.deepspeed_workers[0].streaming_generate.remote(prompt, streamer, **config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xwu99 Same here.

for worker in self.deepspeed_workers[1:]:
worker.streaming_generate.remote(prompt, self._create_dummy_streamer(), **config)
Expand Down Expand Up @@ -284,10 +292,6 @@ def load_model_and_tokenizer(self):
self.world_size = int(os.environ["WORLD_SIZE"])
self.local_rank = int(os.environ["LOCAL_RANK"])
self.device = torch.device("hpu")
# optimize transformers for gaudi
from optimum.habana.transformers.modeling_utils import adapt_transformers_to_gaudi

adapt_transformers_to_gaudi()
Comment on lines -287 to -290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function is not executed in every worker, will it work as expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, this function will be executed earlier.

self.load_model()
model_desc = self.infer_conf.model_description
self.tokenizer = load_tokenizer(self.model, model_desc.tokenizer_name_or_path)
Expand Down
29 changes: 22 additions & 7 deletions llm_on_ray/inference/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,23 @@ def get_deployed_models(args):
set(all_models_name)
), f"models must be a subset of {all_models_name} predefined by inference/models/*.yaml, but found {models}."
model_list = {model: all_models[model] for model in models}
if args.debug_mode:
print(
"DEBUG:serve.py: --config_file is not set while --models is set, serving model(s):",
model_list,
)
else:
model_list = all_models
if args.debug_mode:
print(
"DEBUG:serve.py: --config_file and --models is not set, serving all models:",
model_list,
)
else:
# config_file has precedence over others
if args.config_file:
print("Reading from config file, " + args.config_file)
with open(args.config_file, "r") as f:
infer_conf = parse_yaml_raw_as(InferenceConfig, f)
if args.debug_mode:
print("DEBUG:serve.py: Reading from config file, " + args.config_file)
with open(args.config_file, "r") as f:
infer_conf = parse_yaml_raw_as(InferenceConfig, f)
model_list = {}
model_list[infer_conf.name] = infer_conf

Expand Down Expand Up @@ -131,6 +140,11 @@ def main(argv=None):
parser.add_argument(
"--max_batch_size", default=None, type=int, help="The max batch size for dynamic batching."
)
parser.add_argument(
"--debug_mode",
action="store_true",
help="If debug mode is enabled, debug logs will be printed",
)

# Print help if no arguments were provided
if len(sys.argv) == 1:
Expand All @@ -147,6 +161,9 @@ def main(argv=None):

ray.init(address="auto")
deployments, model_list = get_deployed_models(args)
if args.debug_mode:
print("DEBUG:serve.py: Service is running with deployments:" + str(deployments))
print("DEBUG:serve.py: Service is running models:" + str(model_list))
if args.simple:
# provide simple model endpoint
# models can be served to customed URLs according to configuration files.
Expand All @@ -156,8 +173,6 @@ def main(argv=None):
# all models are served under the same URL and then accessed
# through model_id, so it needs to pass in a unified URL.
host = "127.0.0.1" if args.serve_local_only else "0.0.0.0"
print("Service is running with deployments:" + str(deployments))
print("Service is running models:" + str(model_list))
openai_serve_run(deployments, model_list, host, "/", args.port, args.max_ongoing_requests)

msg = "Service is deployed successfully."
Expand Down