Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions tensorrt_llm/_torch/pyexecutor/py_executor_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
ContextChunkingPolicy,
GuidedDecodingConfig)
from tensorrt_llm.bindings.internal.batch_manager import ContextChunkingConfig
from tensorrt_llm.llmapi.llm_args import (KvCacheConnectorConfig, LoadFormat,
PybindMirror, TorchLlmArgs)
from tensorrt_llm.llmapi.llm_args import LoadFormat, PybindMirror, TorchLlmArgs
from tensorrt_llm.llmapi.tokenizer import (TokenizerBase,
_llguidance_tokenizer_info,
_xgrammar_tokenizer_info)
from tensorrt_llm.logger import logger
from tensorrt_llm.lora_helper import LoraConfig
from tensorrt_llm.mapping import Mapping
from tensorrt_llm.quantization import QuantAlgo

Expand Down Expand Up @@ -205,12 +203,12 @@ def create_py_executor(
llm_args: TorchLlmArgs,
checkpoint_dir: str = None,
tokenizer: Optional[TokenizerBase] = None,
lora_config: Optional[LoraConfig] = None,
kv_connector_config: Optional[KvCacheConnectorConfig] = None,
profiling_stage_data: Optional[dict] = None,
) -> PyExecutor:

garbage_collection_gen0_threshold = llm_args.garbage_collection_gen0_threshold
lora_config = llm_args.lora_config
kv_connector_config = llm_args.kv_connector_config

pytorch_backend_config = llm_args.get_pytorch_backend_config()
if pytorch_backend_config is None:
Expand Down
14 changes: 2 additions & 12 deletions tensorrt_llm/executor/base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
nvtx_range_debug)
from ..bindings import executor as tllm
from ..builder import ConfigEncoder, Engine, EngineConfig
from ..llmapi.llm_args import BaseLlmArgs, KvCacheConnectorConfig, PybindMirror
from ..llmapi.llm_args import BaseLlmArgs, PybindMirror
from ..llmapi.tokenizer import TokenizerBase
from ..llmapi.tracer import global_tracer
from ..llmapi.utils import _SyncQueue, print_colored_debug
from ..lora_helper import LoraConfig
from ..lora_manager import LoraManager
from ..metrics import RequestEventTiming
from ..prompt_adapter_manager import PromptAdapterManager
Expand Down Expand Up @@ -54,8 +53,6 @@ def __init__(
batched_logits_processor: Optional[BatchedLogitsProcessor] = None,
postproc_worker_config: Optional[PostprocWorkerConfig] = None,
is_llm_executor: Optional[bool] = None,
lora_config: Optional[LoraConfig] = None,
kv_connector_config: Optional[KvCacheConnectorConfig] = None,
hf_model_dir: Optional[Path] = None,
tokenizer: Optional[TokenizerBase] = None,
llm_args: Optional[BaseLlmArgs] = None,
Expand All @@ -73,8 +70,6 @@ def __init__(
self._batched_logits_processor = batched_logits_processor
self._postproc_worker_config = postproc_worker_config
self._is_llm_executor = is_llm_executor
self._lora_config = lora_config
self._kv_connector_config = kv_connector_config
self._hf_model_dir = hf_model_dir
self._tokenizer = tokenizer
self.llm_args = llm_args
Expand All @@ -92,10 +87,7 @@ def __init__(
self._is_pytorch_backend = llm_args is not None and llm_args.backend in [
"pytorch", "_autodeploy"
]

if not self._is_pytorch_backend and kv_connector_config is not None:
raise ValueError(
"KV connector config is only supported for PyTorch backend")
self._lora_config = llm_args.lora_config if self._is_pytorch_backend else None

if global_mpi_size() > 1:
logger.set_rank(self.global_rank)
Expand Down Expand Up @@ -130,8 +122,6 @@ def _create_py_executor():
args["llm_args"] = self.llm_args
args["checkpoint_dir"] = self._hf_model_dir
args["tokenizer"] = self._tokenizer
args["lora_config"] = self._lora_config
args["kv_connector_config"] = self._kv_connector_config
elif self.llm_args.backend == "_autodeploy":
from tensorrt_llm._torch.auto_deploy.llm_args import \
LlmArgs as ADLlmArgs
Expand Down
85 changes: 33 additions & 52 deletions tensorrt_llm/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@

from tensorrt_llm.inputs.multimodal import MultimodalParams
from tensorrt_llm.logger import logger, set_level
from tensorrt_llm.lora_helper import LoraConfig

from .._utils import mpi_world_size
from ..bindings import executor as tllm
from ..builder import Engine
from ..disaggregated_params import DisaggregatedParams
from ..llmapi.llm_args import BaseLlmArgs, KvCacheConnectorConfig, TorchLlmArgs
from ..llmapi.llm_args import BaseLlmArgs, TorchLlmArgs
from ..llmapi.llm_utils import KvCacheRetentionConfig
from ..llmapi.mpi_session import (MpiSession, external_mpi_comm_available,
need_spawn_mpi_workers)
Expand Down Expand Up @@ -360,48 +359,46 @@ def aget_kv_events(self, timeout=None) -> IterationResult:

@staticmethod
def _create_ray_executor(
worker_kwargs: Dict,
model_world_size: int,
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
tp_size: int,
kv_connector_config: Optional[KvCacheConnectorConfig] = None):
worker_kwargs: Dict,
model_world_size: int,
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
tp_size: int,
):
from .ray_executor import RayExecutor

return RayExecutor(worker_kwargs,
model_world_size=model_world_size,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
tp_size=tp_size,
kv_connector_config=kv_connector_config)
tp_size=tp_size)

@staticmethod
def _create_rpc_executor(
worker_kwargs: Dict,
model_world_size: int,
mpi_session: Optional[MpiSession],
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
kv_connector_config: Optional[KvCacheConnectorConfig] = None):
worker_kwargs: Dict,
model_world_size: int,
mpi_session: Optional[MpiSession],
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
):
"""Create RPC-based executor (GenerationExecutorRpcProxy)."""
from .rpc_proxy import GenerationExecutorRpcProxy
return GenerationExecutorRpcProxy(
worker_kwargs,
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
is_llm_executor=is_llm_executor)

@staticmethod
def _create_ipc_executor(
worker_kwargs: Dict,
model_world_size: int,
mpi_session: Optional[MpiSession],
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
use_worker: bool = False,
kv_connector_config: Optional[KvCacheConnectorConfig] = None):
worker_kwargs: Dict,
model_world_size: int,
mpi_session: Optional[MpiSession],
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
use_worker: bool = False,
):
"""Create IPC-based executor (GenerationExecutorProxy or GenerationExecutorWorker).

Args:
Expand All @@ -410,19 +407,16 @@ def _create_ipc_executor(
"""
if use_worker:
from .worker import GenerationExecutorWorker
return GenerationExecutorWorker(
**worker_kwargs,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
return GenerationExecutorWorker(**worker_kwargs,
is_llm_executor=is_llm_executor)
else:
from .proxy import GenerationExecutorProxy
return GenerationExecutorProxy(
worker_kwargs,
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
is_llm_executor=is_llm_executor)

@staticmethod
def create(
Expand All @@ -436,8 +430,6 @@ def create(
return_logits: bool = False,
postproc_worker_config: Optional[PostprocWorkerConfig] = None,
is_llm_executor: Optional[bool] = None,
lora_config: Optional[LoraConfig] = None,
kv_connector_config: Optional[KvCacheConnectorConfig] = None,
hf_model_dir: Optional[Path] = None,
tokenizer: Optional[TokenizerBase] = None,
llm_args: Optional[BaseLlmArgs] = None,
Expand Down Expand Up @@ -469,9 +461,6 @@ def create(
"llm_args": llm_args,
}

if lora_config:
worker_kwargs["lora_config"] = lora_config

orchestrator_type = None if not isinstance(
llm_args, TorchLlmArgs) else llm_args.orchestrator_type
if orchestrator_type == "ray":
Expand All @@ -480,8 +469,7 @@ def create(
model_world_size,
postproc_worker_config,
is_llm_executor=is_llm_executor,
tp_size=args.get("tp_size", 1),
kv_connector_config=kv_connector_config)
tp_size=args.get("tp_size", 1))
elif orchestrator_type is not None and orchestrator_type != "rpc":
raise ValueError(
f"Unsupported orchestrator_type: {orchestrator_type}")
Expand All @@ -502,17 +490,15 @@ def create(
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
is_llm_executor=is_llm_executor)

return GenerationExecutor._create_ipc_executor(
worker_kwargs,
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
use_worker=False,
kv_connector_config=kv_connector_config)
use_worker=False)

# WAR: For the performance of gathering logits, we use single process worker
# for TP1 to avoid the large overhead of IPC.
Expand All @@ -528,17 +514,15 @@ def create(
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
is_llm_executor=is_llm_executor)

return GenerationExecutor._create_ipc_executor(
worker_kwargs,
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
use_worker=True,
kv_connector_config=kv_connector_config)
use_worker=True)

# For single-gpu case:
# Partition the workload to multiple process for streaming performance.
Expand All @@ -551,17 +535,15 @@ def create(
model_world_size=model_world_size,
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
is_llm_executor=is_llm_executor)

return GenerationExecutor._create_ipc_executor(
worker_kwargs,
model_world_size=model_world_size,
mpi_session=None, # use mpi4py
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
use_worker=False,
kv_connector_config=kv_connector_config)
use_worker=False)
else:
ctx = multiprocessing.get_context("spawn")
# The ProcessPoolExecutorSession is used to support Windows, as mpi4py cannot.
Expand All @@ -574,8 +556,7 @@ def create(
mpi_session=mpi_session,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
use_worker=False,
kv_connector_config=kv_connector_config)
use_worker=False)

def wait_first_completed(
self, futures: List[GenerationResult]
Expand Down
5 changes: 1 addition & 4 deletions tensorrt_llm/executor/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from tensorrt_llm.logger import logger

from .._utils import customized_gc_thresholds, mpi_rank, nvtx_range_debug
from ..llmapi.llm_args import KvCacheConnectorConfig
from ..llmapi.mpi_session import (MpiCommSession, MpiPoolSession, MpiSession,
RemoteMpiCommSessionClient)
from ..llmapi.tracer import enable_llm_tracer, get_tracer, global_tracer
Expand Down Expand Up @@ -46,7 +45,6 @@ def __init__(
worker_cls: type = GenerationExecutorWorker,
postproc_worker_config: Optional[PostprocWorkerConfig] = None,
is_llm_executor: Optional[bool] = None,
kv_connector_config: Optional[KvCacheConnectorConfig] = None,
) -> None:
postproc_worker_config = postproc_worker_config or PostprocWorkerConfig(
)
Expand Down Expand Up @@ -95,8 +93,7 @@ def __init__(
worker_kwargs = dict(**worker_kwargs,
worker_queues=self._setup_queues(),
postproc_worker_config=postproc_worker_config,
is_llm_executor=False,
kv_connector_config=kv_connector_config)
is_llm_executor=False)

if "log_level" not in worker_kwargs:
worker_kwargs["log_level"] = logger.level
Expand Down
7 changes: 2 additions & 5 deletions tensorrt_llm/executor/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from tensorrt_llm.logger import logger

from .._utils import nvtx_range_debug
from ..llmapi.llm_args import KvCacheConnectorConfig
from .executor import GenerationExecutor
from .postproc_worker import PostprocWorkerConfig
from .ray_gpu_worker import RayGPUWorker, RayWorkerWrapper
Expand All @@ -36,8 +35,7 @@ def __init__(self,
model_world_size: int,
postproc_worker_config: PostprocWorkerConfig,
is_llm_executor: bool,
tp_size=1,
kv_connector_config: Optional[KvCacheConnectorConfig] = None):
tp_size=1):
os.environ['RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES'] = '1'
os.environ["RAY_DEDUP_LOGS"] = "0" # for debug

Expand Down Expand Up @@ -97,8 +95,7 @@ def __init__(self,

worker_kwargs = dict(**worker_kwargs,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
kv_connector_config=kv_connector_config)
is_llm_executor=is_llm_executor)

self.create_workers(RayGPUWorker, worker_kwargs)
except Exception as e:
Expand Down
7 changes: 1 addition & 6 deletions tensorrt_llm/executor/ray_gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

from ..bindings import executor as tllm
from ..builder import Engine
from ..llmapi.llm_args import BaseLlmArgs, KvCacheConnectorConfig
from ..llmapi.llm_args import BaseLlmArgs
from ..llmapi.tokenizer import TokenizerBase
from ..lora_helper import LoraConfig
from ..sampling_params import BatchedLogitsProcessor
from .base_worker import BaseWorker
from .postproc_worker import PostprocWorkerConfig
Expand Down Expand Up @@ -116,8 +115,6 @@ def __init__(
batched_logits_processor: Optional[BatchedLogitsProcessor] = None,
postproc_worker_config: Optional[PostprocWorkerConfig] = None,
is_llm_executor: Optional[bool] = None,
lora_config: Optional[LoraConfig] = None,
kv_connector_config: Optional[KvCacheConnectorConfig] = None,
hf_model_dir: Optional[Path] = None,
tokenizer: Optional[TokenizerBase] = None,
llm_args: Optional[BaseLlmArgs] = None,
Expand All @@ -131,8 +128,6 @@ def __init__(
batched_logits_processor=batched_logits_processor,
postproc_worker_config=postproc_worker_config,
is_llm_executor=is_llm_executor,
lora_config=lora_config,
kv_connector_config=kv_connector_config,
hf_model_dir=hf_model_dir,
tokenizer=tokenizer,
llm_args=llm_args,
Expand Down
3 changes: 0 additions & 3 deletions tensorrt_llm/executor/rpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import threading
from typing import Optional

from ..llmapi.llm_args import KvCacheConnectorConfig
from ..llmapi.mpi_session import MpiPoolSession, MpiSession
from ..llmapi.tracer import global_tracer
from ..llmapi.utils import (AsyncQueue, _SyncQueue, logger_debug,
Expand Down Expand Up @@ -33,7 +32,6 @@ def __init__(
*,
postproc_worker_config: Optional[PostprocWorkerConfig] = None,
is_llm_executor: Optional[bool] = None,
kv_connector_config: Optional[KvCacheConnectorConfig] = None,
):
"""
Args:
Expand All @@ -42,7 +40,6 @@ def __init__(
mpi_session: the mpi session to use
postproc_worker_config: the postproc worker config
is_llm_executor: whether this is an llm executor
kv_connector_config: the kv cache connector config
"""
GenerationExecutorRpcProxy.INSTANCE_COUNTER += 1
self.rpc_addr = self.gen_uniq_rpc_addr()
Expand Down
Loading
Loading