Skip to content

Commit f5f5b8b

Browse files
frank-weifacebook-github-bot
authored andcommitted
add scoping for better trace (vllm-project#28329)
Summary: Add more scoping in the hotspot area which can greatly help us to the find the cycle heavy area Reviewed By: henryoier Differential Revision: D86436375
1 parent 608bb14 commit f5f5b8b

File tree

4 files changed

+169
-147
lines changed

4 files changed

+169
-147
lines changed

vllm/v1/core/sched/scheduler.py

Lines changed: 61 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from vllm.v1.request import Request, RequestStatus
3939
from vllm.v1.spec_decode.metrics import SpecDecodingStats
4040
from vllm.v1.structured_output import StructuredOutputManager
41+
from vllm.v1.utils import record_function_or_nullcontext
4142

4243
logger = init_logger(__name__)
4344

@@ -259,49 +260,52 @@ def schedule(self) -> SchedulerOutput:
259260
continue
260261

261262
# Schedule newly needed KV blocks for the request.
262-
while True:
263-
new_blocks = self.kv_cache_manager.allocate_slots(
264-
request,
265-
num_new_tokens,
266-
num_lookahead_tokens=self.num_lookahead_tokens,
267-
)
268-
269-
if new_blocks is not None:
270-
# The request can be scheduled.
271-
break
272-
273-
# The request cannot be scheduled.
274-
# Preempt the lowest-priority request.
275-
if self.policy == SchedulingPolicy.PRIORITY:
276-
preempted_req = max(
277-
self.running,
278-
key=lambda r: (r.priority, r.arrival_time),
263+
with record_function_or_nullcontext("schedule: allocate_slots"):
264+
while True:
265+
new_blocks = self.kv_cache_manager.allocate_slots(
266+
request,
267+
num_new_tokens,
268+
num_lookahead_tokens=self.num_lookahead_tokens,
279269
)
280-
self.running.remove(preempted_req)
281-
if preempted_req in scheduled_running_reqs:
282-
scheduled_running_reqs.remove(preempted_req)
283-
token_budget += num_scheduled_tokens[preempted_req.request_id]
284-
req_to_new_blocks.pop(preempted_req.request_id)
285-
num_scheduled_tokens.pop(preempted_req.request_id)
286-
req_index -= 1
287-
else:
288-
preempted_req = self.running.pop()
289270

290-
self.kv_cache_manager.free(preempted_req)
291-
self.encoder_cache_manager.free(preempted_req)
292-
preempted_req.status = RequestStatus.PREEMPTED
293-
preempted_req.num_computed_tokens = 0
294-
preempted_req.num_preemptions += 1
295-
if self.log_stats:
296-
preempted_req.record_event(
297-
EngineCoreEventType.PREEMPTED, scheduled_timestamp
298-
)
271+
if new_blocks is not None:
272+
# The request can be scheduled.
273+
break
299274

300-
self.waiting.prepend_request(preempted_req)
301-
preempted_reqs.append(preempted_req)
302-
if preempted_req == request:
303-
# No more request to preempt. Cannot schedule this request.
304-
break
275+
# The request cannot be scheduled.
276+
# Preempt the lowest-priority request.
277+
if self.policy == SchedulingPolicy.PRIORITY:
278+
preempted_req = max(
279+
self.running,
280+
key=lambda r: (r.priority, r.arrival_time),
281+
)
282+
self.running.remove(preempted_req)
283+
if preempted_req in scheduled_running_reqs:
284+
scheduled_running_reqs.remove(preempted_req)
285+
token_budget += num_scheduled_tokens[
286+
preempted_req.request_id
287+
]
288+
req_to_new_blocks.pop(preempted_req.request_id)
289+
num_scheduled_tokens.pop(preempted_req.request_id)
290+
req_index -= 1
291+
else:
292+
preempted_req = self.running.pop()
293+
294+
self.kv_cache_manager.free(preempted_req)
295+
self.encoder_cache_manager.free(preempted_req)
296+
preempted_req.status = RequestStatus.PREEMPTED
297+
preempted_req.num_computed_tokens = 0
298+
preempted_req.num_preemptions += 1
299+
if self.log_stats:
300+
preempted_req.record_event(
301+
EngineCoreEventType.PREEMPTED, scheduled_timestamp
302+
)
303+
304+
self.waiting.prepend_request(preempted_req)
305+
preempted_reqs.append(preempted_req)
306+
if preempted_req == request:
307+
# No more request to preempt. Cannot schedule this request.
308+
break
305309

306310
if new_blocks is None:
307311
# Cannot schedule this request.
@@ -596,13 +600,14 @@ def schedule(self) -> SchedulerOutput:
596600
# Get the longest common prefix among all requests in the running queue.
597601
# This can be potentially used for cascade attention.
598602
num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups)
599-
if self.running:
600-
any_request = self.running[0]
601-
num_common_prefix_blocks = (
602-
self.kv_cache_manager.get_num_common_prefix_blocks(
603-
any_request.request_id
603+
with record_function_or_nullcontext("schedule: get_num_common_prefix_blocks"):
604+
if self.running:
605+
any_request = self.running[0]
606+
num_common_prefix_blocks = (
607+
self.kv_cache_manager.get_num_common_prefix_blocks(
608+
any_request.request_id
609+
)
604610
)
605-
)
606611

607612
# Construct the scheduler output.
608613
new_reqs_data = [
@@ -611,13 +616,14 @@ def schedule(self) -> SchedulerOutput:
611616
)
612617
for req in scheduled_new_reqs
613618
]
614-
cached_reqs_data = self._make_cached_request_data(
615-
scheduled_running_reqs,
616-
scheduled_resumed_reqs,
617-
num_scheduled_tokens,
618-
scheduled_spec_decode_tokens,
619-
req_to_new_blocks,
620-
)
619+
with record_function_or_nullcontext("schedule: make_cached_request_data"):
620+
cached_reqs_data = self._make_cached_request_data(
621+
scheduled_running_reqs,
622+
scheduled_resumed_reqs,
623+
num_scheduled_tokens,
624+
scheduled_spec_decode_tokens,
625+
req_to_new_blocks,
626+
)
621627

622628
# Record the request ids that were scheduled in this step.
623629
self.prev_step_scheduled_req_ids.clear()
@@ -646,8 +652,8 @@ def schedule(self) -> SchedulerOutput:
646652
if self.connector is not None:
647653
meta = self.connector.build_connector_meta(scheduler_output)
648654
scheduler_output.kv_connector_metadata = meta
649-
650-
self._update_after_schedule(scheduler_output)
655+
with record_function_or_nullcontext("schedule: update_after_schedule"):
656+
self._update_after_schedule(scheduler_output)
651657
return scheduler_output
652658

653659
def _update_after_schedule(

vllm/v1/engine/core.py

Lines changed: 55 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
from vllm.v1.request import Request, RequestStatus
6262
from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder
6363
from vllm.v1.structured_output import StructuredOutputManager
64+
from vllm.v1.utils import record_function_or_nullcontext
6465
from vllm.version import __version__ as VLLM_VERSION
6566

6667
logger = init_logger(__name__)
@@ -315,17 +316,21 @@ def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
315316
# or finished and not yet removed from the batch.
316317
if not self.scheduler.has_requests():
317318
return {}, False
318-
scheduler_output = self.scheduler.schedule()
319-
future = self.model_executor.execute_model(scheduler_output, non_block=True)
320-
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
321-
with self.log_error_detail(scheduler_output):
322-
model_output = future.result()
323-
if model_output is None:
324-
model_output = self.model_executor.sample_tokens(grammar_output)
325-
326-
engine_core_outputs = self.scheduler.update_from_output(
327-
scheduler_output, model_output
328-
)
319+
with record_function_or_nullcontext("core step: schedule"):
320+
scheduler_output = self.scheduler.schedule()
321+
322+
with record_function_or_nullcontext("core step: execute_model"):
323+
future = self.model_executor.execute_model(scheduler_output, non_block=True)
324+
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
325+
with self.log_error_detail(scheduler_output):
326+
model_output = future.result()
327+
if model_output is None:
328+
model_output = self.model_executor.sample_tokens(grammar_output)
329+
330+
with record_function_or_nullcontext("core step: update_from_output"):
331+
engine_core_outputs = self.scheduler.update_from_output(
332+
scheduler_output, model_output
333+
)
329334

330335
return engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0
331336

@@ -363,32 +368,37 @@ def step_with_batch_queue(
363368
model_executed = False
364369
deferred_scheduler_output = None
365370
if self.scheduler.has_requests():
366-
scheduler_output = self.scheduler.schedule()
367-
exec_future = self.model_executor.execute_model(
368-
scheduler_output, non_block=True
369-
)
371+
with record_function_or_nullcontext("core step_with_batch_queue: schedule"):
372+
scheduler_output = self.scheduler.schedule()
373+
with record_function_or_nullcontext("core step_with_batch_queue: execute_model"):
374+
exec_future = self.model_executor.execute_model(
375+
scheduler_output, non_block=True
376+
)
370377
model_executed = scheduler_output.total_num_scheduled_tokens > 0
371378

372379
if scheduler_output.pending_structured_output_tokens:
373-
# We need to defer sampling until we have processed the model output
374-
# from the prior step.
375-
deferred_scheduler_output = scheduler_output
376-
# Block-wait for execute to return (continues running async on the GPU).
377-
with self.log_error_detail(scheduler_output):
378-
exec_result = exec_future.result()
379-
assert exec_result is None
380+
with record_function_or_nullcontext("core step_with_batch_queue: pending_structured_output_tokens"):
381+
# We need to defer sampling until we have processed the model output
382+
# from the prior step.
383+
deferred_scheduler_output = scheduler_output
384+
# Block-wait for execute to return (continues running async on the GPU).
385+
with self.log_error_detail(scheduler_output):
386+
exec_result = exec_future.result()
387+
assert exec_result is None
380388
else:
381-
# We aren't waiting for any tokens, get any grammar output immediately.
382-
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
389+
with record_function_or_nullcontext("core step_with_batch_queue: get_grammar_bitmask"):
390+
# We aren't waiting for any tokens, get any grammar output immediately.
391+
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
383392
# Block-wait for execute to return (continues running async on the GPU).
384393
with self.log_error_detail(scheduler_output):
385394
exec_result = exec_future.result()
386395

387396
if exec_result is None:
388-
# Call sample tokens.
389-
future = self.model_executor.sample_tokens(
390-
grammar_output, non_block=True
391-
)
397+
with record_function_or_nullcontext("core step_with_batch_queue: sample_tokens"):
398+
# Call sample tokens.
399+
future = self.model_executor.sample_tokens(
400+
grammar_output, non_block=True
401+
)
392402
else:
393403
# No sampling required (e.g. all requests finished).
394404
future = cast(Future[ModelRunnerOutput], exec_future)
@@ -408,27 +418,28 @@ def step_with_batch_queue(
408418
# only be called when the scheduler contains requests or the queue
409419
# is non-empty.
410420
return None, False
411-
412-
# Block until the next result is available.
413-
future, scheduler_output = batch_queue.pop()
414-
with self.log_error_detail(scheduler_output):
415-
model_output = future.result()
416-
417-
engine_core_outputs = self.scheduler.update_from_output(
418-
scheduler_output, model_output
419-
)
421+
with record_function_or_nullcontext("core step_with_batch_queue: model_output"):
422+
# Block until the next result is available.
423+
future, scheduler_output = batch_queue.pop()
424+
with self.log_error_detail(scheduler_output):
425+
model_output = future.result()
426+
with record_function_or_nullcontext("core step_with_batch_queue: update_from_output"):
427+
engine_core_outputs = self.scheduler.update_from_output(
428+
scheduler_output, model_output
429+
)
420430

421431
# NOTE(nick): We can either handle the deferred tasks here or save
422432
# in a field and do it immediately once step_with_batch_queue is
423433
# re-called. The latter slightly favors TTFT over TPOT/throughput.
424434
if deferred_scheduler_output:
425-
# We now have the tokens needed to compute the bitmask for the
426-
# deferred request. Get the bitmask and call sample tokens.
427-
grammar_output = self.scheduler.get_grammar_bitmask(
428-
deferred_scheduler_output
429-
)
430-
future = self.model_executor.sample_tokens(grammar_output, non_block=True)
431-
batch_queue.appendleft((future, deferred_scheduler_output))
435+
with record_function_or_nullcontext("core step_with_batch_queue: deferred_scheduler_output"):
436+
# We now have the tokens needed to compute the bitmask for the
437+
# deferred request. Get the bitmask and call sample tokens.
438+
grammar_output = self.scheduler.get_grammar_bitmask(
439+
deferred_scheduler_output
440+
)
441+
future = self.model_executor.sample_tokens(grammar_output, non_block=True)
442+
batch_queue.appendleft((future, deferred_scheduler_output))
432443

433444
return engine_core_outputs, model_executed
434445

vllm/v1/engine/llm_engine.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager
3737
from vllm.v1.metrics.reader import Metric, get_metrics_snapshot
3838
from vllm.v1.metrics.stats import IterationStats
39+
from vllm.v1.utils import record_function_or_nullcontext
3940
from vllm.v1.worker.worker_base import WorkerBase
4041

4142
logger = init_logger(__name__)
@@ -280,27 +281,31 @@ def step(self) -> list[RequestOutput | PoolingRequestOutput]:
280281
return []
281282

282283
# 1) Get EngineCoreOutput from the EngineCore.
283-
outputs = self.engine_core.get_output()
284+
with record_function_or_nullcontext("llm_genine step: get_output"):
285+
outputs = self.engine_core.get_output()
284286

285287
# 2) Process EngineCoreOutputs.
286-
iteration_stats = IterationStats() if self.log_stats else None
287-
processed_outputs = self.output_processor.process_outputs(
288-
outputs.outputs,
289-
engine_core_timestamp=outputs.timestamp,
290-
iteration_stats=iteration_stats,
291-
)
288+
with record_function_or_nullcontext("llm_genine step: process_outputs"):
289+
iteration_stats = IterationStats() if self.log_stats else None
290+
processed_outputs = self.output_processor.process_outputs(
291+
outputs.outputs,
292+
engine_core_timestamp=outputs.timestamp,
293+
iteration_stats=iteration_stats,
294+
)
292295

293296
# 3) Abort any reqs that finished due to stop strings.
294-
self.engine_core.abort_requests(processed_outputs.reqs_to_abort)
297+
with record_function_or_nullcontext("llm_genine step: abort_requests"):
298+
self.engine_core.abort_requests(processed_outputs.reqs_to_abort)
295299

296300
# 4) Record stats
297-
if self.logger_manager is not None and outputs.scheduler_stats is not None:
298-
self.logger_manager.record(
299-
scheduler_stats=outputs.scheduler_stats,
300-
iteration_stats=iteration_stats,
301-
mm_cache_stats=self.processor.stat_mm_cache(),
302-
)
303-
self.do_log_stats_with_interval()
301+
with record_function_or_nullcontext("llm_genine step: record_stats"):
302+
if self.logger_manager is not None and outputs.scheduler_stats is not None:
303+
self.logger_manager.record(
304+
scheduler_stats=outputs.scheduler_stats,
305+
iteration_stats=iteration_stats,
306+
mm_cache_stats=self.processor.stat_mm_cache(),
307+
)
308+
self.do_log_stats_with_interval()
304309

305310
return processed_outputs.request_outputs
306311

0 commit comments

Comments
 (0)