From 5a691ab525127dbd041dc3226c7df9215c1f24e3 Mon Sep 17 00:00:00 2001 From: irexyc Date: Fri, 17 Oct 2025 20:08:25 +0800 Subject: [PATCH 01/10] better format for logprobs serialization with ray mp engine backend --- lmdeploy/pytorch/engine/engine.py | 10 +++++++-- .../pytorch/engine/mp_engine/ray_engine.py | 21 ++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/lmdeploy/pytorch/engine/engine.py b/lmdeploy/pytorch/engine/engine.py index 4bea7e5a64..aa11744d3c 100644 --- a/lmdeploy/pytorch/engine/engine.py +++ b/lmdeploy/pytorch/engine/engine.py @@ -959,9 +959,15 @@ def __send_resp(out: InferOutput): # logprobs to dict vals = cur_logprobs[0].tolist() indices = cur_logprobs[1].tolist() - cur_logprobs = dict(zip(indices, vals)) logprobs = [] if out.resp.data is None else out.resp.data.get('logprobs', []) - logprobs = logprobs + [cur_logprobs] + if self.engine_config.mp_engine_backend == 'ray': # how to check enable_mp_engine? + logprobs = logprobs or [[], []] + logprobs[0].append(len(vals)) + logprobs[1].extend(indices) + logprobs[1].extend(vals) + else: + cur_logprobs = dict(zip(indices, vals)) + logprobs = logprobs + [cur_logprobs] self._response(out.resp, resp_type, data=dict(token_ids=out.token_ids, diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 0d133abfad..d4556be08a 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -5,7 +5,7 @@ import ray from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from lmdeploy.messages import PytorchEngineConfig +from lmdeploy.messages import EngineOutput, PytorchEngineConfig from lmdeploy.pytorch import envs as _envs from lmdeploy.pytorch.ray import RayContext, get_device_str, get_resource_kwargs from lmdeploy.utils import get_logger @@ -142,9 +142,28 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) + def __handle_logprobs(result): + if not isinstance(result, EngineOutput) or not result.logprobs: + result + + offset = 0 + logprobs = [] + ns, indice_vals = result.logprobs + for n in ns: + indices = indice_vals[offset:offset + n] + vals = indice_vals[offset + n:offset + 2 * n] + offset += 2 * n + logprobs.append(dict(zip(indices, vals))) + + result.logprobs = logprobs + return result + stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) + if isinstance(func, str) and func == 'instance_async_stream_infer': + result = __handle_logprobs(result) + yield result def close(self) -> None: From 7cebf0a732af9598202e4140f0aca8b413d3977a Mon Sep 17 00:00:00 2001 From: irexyc Date: Fri, 17 Oct 2025 20:16:09 +0800 Subject: [PATCH 02/10] update --- lmdeploy/pytorch/engine/mp_engine/ray_engine.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index d4556be08a..093e29d560 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -161,8 +161,7 @@ def __handle_logprobs(result): stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) - if isinstance(func, str) and func == 'instance_async_stream_infer': - result = __handle_logprobs(result) + result = __handle_logprobs(result) yield result From da7c7bcefe27f5fa501a843775f632cd72d9211e Mon Sep 17 00:00:00 2001 From: irexyc Date: Fri, 17 Oct 2025 21:30:50 +0800 Subject: [PATCH 03/10] update --- lmdeploy/pytorch/engine/engine.py | 2 +- lmdeploy/pytorch/engine/mp_engine/__init__.py | 1 + .../pytorch/engine/mp_engine/base_worker.py | 19 ++++++++++++++++++ .../pytorch/engine/mp_engine/ray_engine.py | 20 +------------------ 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/lmdeploy/pytorch/engine/engine.py b/lmdeploy/pytorch/engine/engine.py index aa11744d3c..f444f7a27a 100644 --- a/lmdeploy/pytorch/engine/engine.py +++ b/lmdeploy/pytorch/engine/engine.py @@ -960,7 +960,7 @@ def __send_resp(out: InferOutput): vals = cur_logprobs[0].tolist() indices = cur_logprobs[1].tolist() logprobs = [] if out.resp.data is None else out.resp.data.get('logprobs', []) - if self.engine_config.mp_engine_backend == 'ray': # how to check enable_mp_engine? + if self.engine_config.mp_engine_backend.startswith('_'): # indicate using mp engine logprobs = logprobs or [[], []] logprobs[0].append(len(vals)) logprobs[1].extend(indices) diff --git a/lmdeploy/pytorch/engine/mp_engine/__init__.py b/lmdeploy/pytorch/engine/mp_engine/__init__.py index d7611ea2dd..9820f189ab 100644 --- a/lmdeploy/pytorch/engine/mp_engine/__init__.py +++ b/lmdeploy/pytorch/engine/mp_engine/__init__.py @@ -4,6 +4,7 @@ def build_mp_engine(backend: str, model_path: str, engine_config: PytorchEngineConfig = None, **kwargs): """Build mp engine.""" + engine_config.mp_engine_backend = f'_{backend}' if backend == 'mp': from .zmq_engine import ZMQMPEngine return ZMQMPEngine(model_path, engine_config=engine_config, **kwargs) diff --git a/lmdeploy/pytorch/engine/mp_engine/base_worker.py b/lmdeploy/pytorch/engine/mp_engine/base_worker.py index b194cdbfc4..9b479f146e 100644 --- a/lmdeploy/pytorch/engine/mp_engine/base_worker.py +++ b/lmdeploy/pytorch/engine/mp_engine/base_worker.py @@ -3,6 +3,7 @@ from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any, List, Optional +from lmdeploy.messages import EngineOutput from lmdeploy.pytorch.disagg.conn.protocol import (DistServeConnectionRequest, DistServeDropConnectionRequest, DistServeInitRequest) from lmdeploy.utils import get_logger @@ -125,5 +126,23 @@ async def instance_async_cancel(self, session_id: int): async def instance_async_stream_infer(self, *args, **kwargs): """Send stream inference request.""" + + def __handle_logprobs(result): + if not isinstance(result, EngineOutput) or not result.logprobs: + return result + + offset = 0 + logprobs = [] + ns, indice_vals = result.logprobs + for n in ns: + indices = indice_vals[offset:offset + n] + vals = indice_vals[offset + n:offset + 2 * n] + offset += 2 * n + logprobs.append(dict(zip(indices, vals))) + + result.logprobs = logprobs + return result + async for result in self.instance_pool.async_stream_infer(*args, **kwargs): + result = __handle_logprobs(result) yield result diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 093e29d560..0d133abfad 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -5,7 +5,7 @@ import ray from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from lmdeploy.messages import EngineOutput, PytorchEngineConfig +from lmdeploy.messages import PytorchEngineConfig from lmdeploy.pytorch import envs as _envs from lmdeploy.pytorch.ray import RayContext, get_device_str, get_resource_kwargs from lmdeploy.utils import get_logger @@ -142,27 +142,9 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) - def __handle_logprobs(result): - if not isinstance(result, EngineOutput) or not result.logprobs: - result - - offset = 0 - logprobs = [] - ns, indice_vals = result.logprobs - for n in ns: - indices = indice_vals[offset:offset + n] - vals = indice_vals[offset + n:offset + 2 * n] - offset += 2 * n - logprobs.append(dict(zip(indices, vals))) - - result.logprobs = logprobs - return result - stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) - result = __handle_logprobs(result) - yield result def close(self) -> None: From d4b82a824e2b29033f00b82b77446798143e2c6c Mon Sep 17 00:00:00 2001 From: irexyc Date: Sat, 18 Oct 2025 05:56:08 +0800 Subject: [PATCH 04/10] Revert "update" This reverts commit da7c7bcefe27f5fa501a843775f632cd72d9211e. --- lmdeploy/pytorch/engine/engine.py | 2 +- lmdeploy/pytorch/engine/mp_engine/__init__.py | 1 - .../pytorch/engine/mp_engine/base_worker.py | 19 ------------------ .../pytorch/engine/mp_engine/ray_engine.py | 20 ++++++++++++++++++- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/lmdeploy/pytorch/engine/engine.py b/lmdeploy/pytorch/engine/engine.py index f444f7a27a..aa11744d3c 100644 --- a/lmdeploy/pytorch/engine/engine.py +++ b/lmdeploy/pytorch/engine/engine.py @@ -960,7 +960,7 @@ def __send_resp(out: InferOutput): vals = cur_logprobs[0].tolist() indices = cur_logprobs[1].tolist() logprobs = [] if out.resp.data is None else out.resp.data.get('logprobs', []) - if self.engine_config.mp_engine_backend.startswith('_'): # indicate using mp engine + if self.engine_config.mp_engine_backend == 'ray': # how to check enable_mp_engine? logprobs = logprobs or [[], []] logprobs[0].append(len(vals)) logprobs[1].extend(indices) diff --git a/lmdeploy/pytorch/engine/mp_engine/__init__.py b/lmdeploy/pytorch/engine/mp_engine/__init__.py index 9820f189ab..d7611ea2dd 100644 --- a/lmdeploy/pytorch/engine/mp_engine/__init__.py +++ b/lmdeploy/pytorch/engine/mp_engine/__init__.py @@ -4,7 +4,6 @@ def build_mp_engine(backend: str, model_path: str, engine_config: PytorchEngineConfig = None, **kwargs): """Build mp engine.""" - engine_config.mp_engine_backend = f'_{backend}' if backend == 'mp': from .zmq_engine import ZMQMPEngine return ZMQMPEngine(model_path, engine_config=engine_config, **kwargs) diff --git a/lmdeploy/pytorch/engine/mp_engine/base_worker.py b/lmdeploy/pytorch/engine/mp_engine/base_worker.py index 9b479f146e..b194cdbfc4 100644 --- a/lmdeploy/pytorch/engine/mp_engine/base_worker.py +++ b/lmdeploy/pytorch/engine/mp_engine/base_worker.py @@ -3,7 +3,6 @@ from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any, List, Optional -from lmdeploy.messages import EngineOutput from lmdeploy.pytorch.disagg.conn.protocol import (DistServeConnectionRequest, DistServeDropConnectionRequest, DistServeInitRequest) from lmdeploy.utils import get_logger @@ -126,23 +125,5 @@ async def instance_async_cancel(self, session_id: int): async def instance_async_stream_infer(self, *args, **kwargs): """Send stream inference request.""" - - def __handle_logprobs(result): - if not isinstance(result, EngineOutput) or not result.logprobs: - return result - - offset = 0 - logprobs = [] - ns, indice_vals = result.logprobs - for n in ns: - indices = indice_vals[offset:offset + n] - vals = indice_vals[offset + n:offset + 2 * n] - offset += 2 * n - logprobs.append(dict(zip(indices, vals))) - - result.logprobs = logprobs - return result - async for result in self.instance_pool.async_stream_infer(*args, **kwargs): - result = __handle_logprobs(result) yield result diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 0d133abfad..093e29d560 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -5,7 +5,7 @@ import ray from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from lmdeploy.messages import PytorchEngineConfig +from lmdeploy.messages import EngineOutput, PytorchEngineConfig from lmdeploy.pytorch import envs as _envs from lmdeploy.pytorch.ray import RayContext, get_device_str, get_resource_kwargs from lmdeploy.utils import get_logger @@ -142,9 +142,27 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) + def __handle_logprobs(result): + if not isinstance(result, EngineOutput) or not result.logprobs: + result + + offset = 0 + logprobs = [] + ns, indice_vals = result.logprobs + for n in ns: + indices = indice_vals[offset:offset + n] + vals = indice_vals[offset + n:offset + 2 * n] + offset += 2 * n + logprobs.append(dict(zip(indices, vals))) + + result.logprobs = logprobs + return result + stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) + result = __handle_logprobs(result) + yield result def close(self) -> None: From 4a0b287e993cd36b1e02efcef74640093e9e6692 Mon Sep 17 00:00:00 2001 From: irexyc Date: Sat, 18 Oct 2025 05:56:16 +0800 Subject: [PATCH 05/10] Revert "update" This reverts commit 7cebf0a732af9598202e4140f0aca8b413d3977a. --- lmdeploy/pytorch/engine/mp_engine/ray_engine.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 093e29d560..d4556be08a 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -161,7 +161,8 @@ def __handle_logprobs(result): stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) - result = __handle_logprobs(result) + if isinstance(func, str) and func == 'instance_async_stream_infer': + result = __handle_logprobs(result) yield result From e2d51242cee779de45b8cfe7705a71dd7ac9bfb2 Mon Sep 17 00:00:00 2001 From: irexyc Date: Sat, 18 Oct 2025 05:56:17 +0800 Subject: [PATCH 06/10] Revert "better format for logprobs serialization with ray mp engine backend" This reverts commit 5a691ab525127dbd041dc3226c7df9215c1f24e3. --- lmdeploy/pytorch/engine/engine.py | 10 ++------- .../pytorch/engine/mp_engine/ray_engine.py | 21 +------------------ 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/lmdeploy/pytorch/engine/engine.py b/lmdeploy/pytorch/engine/engine.py index aa11744d3c..4bea7e5a64 100644 --- a/lmdeploy/pytorch/engine/engine.py +++ b/lmdeploy/pytorch/engine/engine.py @@ -959,15 +959,9 @@ def __send_resp(out: InferOutput): # logprobs to dict vals = cur_logprobs[0].tolist() indices = cur_logprobs[1].tolist() + cur_logprobs = dict(zip(indices, vals)) logprobs = [] if out.resp.data is None else out.resp.data.get('logprobs', []) - if self.engine_config.mp_engine_backend == 'ray': # how to check enable_mp_engine? - logprobs = logprobs or [[], []] - logprobs[0].append(len(vals)) - logprobs[1].extend(indices) - logprobs[1].extend(vals) - else: - cur_logprobs = dict(zip(indices, vals)) - logprobs = logprobs + [cur_logprobs] + logprobs = logprobs + [cur_logprobs] self._response(out.resp, resp_type, data=dict(token_ids=out.token_ids, diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index d4556be08a..0d133abfad 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -5,7 +5,7 @@ import ray from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from lmdeploy.messages import EngineOutput, PytorchEngineConfig +from lmdeploy.messages import PytorchEngineConfig from lmdeploy.pytorch import envs as _envs from lmdeploy.pytorch.ray import RayContext, get_device_str, get_resource_kwargs from lmdeploy.utils import get_logger @@ -142,28 +142,9 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) - def __handle_logprobs(result): - if not isinstance(result, EngineOutput) or not result.logprobs: - result - - offset = 0 - logprobs = [] - ns, indice_vals = result.logprobs - for n in ns: - indices = indice_vals[offset:offset + n] - vals = indice_vals[offset + n:offset + 2 * n] - offset += 2 * n - logprobs.append(dict(zip(indices, vals))) - - result.logprobs = logprobs - return result - stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) - if isinstance(func, str) and func == 'instance_async_stream_infer': - result = __handle_logprobs(result) - yield result def close(self) -> None: From 256d03637de3a48f1567121baee4fc28673a8166 Mon Sep 17 00:00:00 2001 From: irexyc Date: Sat, 18 Oct 2025 06:05:54 +0800 Subject: [PATCH 07/10] increament send / recv EngineOutput in ray mp engine --- .../pytorch/engine/mp_engine/ray_engine.py | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 0d133abfad..69a8afd8f8 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -1,14 +1,14 @@ # Copyright (c) OpenMMLab. All rights reserved. import asyncio +from collections import defaultdict from typing import Dict import ray -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy - -from lmdeploy.messages import PytorchEngineConfig +from lmdeploy.messages import EngineOutput, PytorchEngineConfig from lmdeploy.pytorch import envs as _envs from lmdeploy.pytorch.ray import RayContext, get_device_str, get_resource_kwargs from lmdeploy.utils import get_logger +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from .base import MPEngine from .base_worker import EngineWorkerBase @@ -35,6 +35,7 @@ def __init__(self, self._stream_id = 0 self._stream_aiter = dict() self._stream_task = dict() + self._engine_output_offset = defaultdict(int) async def _stream_task_wrapper(self, stream_id: int, func: str, *args, **kwargs): """Create a stream task.""" @@ -67,6 +68,22 @@ async def get_stream_task_result(self, stream_id: int): result, stopped = self._stream_aiter[stream_id][1] event.clear() + def __return_engine_output_increamental(result): + if not isinstance(result, EngineOutput): + return + old_offset = self._engine_output_offset[stream_id] + new_offset = len(result.logprobs) + if old_offset: + if result.token_ids: + result.token_ids = result.token_ids[old_offset:] + if result.logprobs: + result.logprobs = result.logprobs[old_offset:] + self._engine_output_offset[stream_id] = new_offset + if stopped: + self._engine_output_offset.pop(stream_id, None) + + __return_engine_output_increamental(result) + if stopped: self._stream_aiter.pop(stream_id, None) self._stream_task.pop(stream_id, None) @@ -92,6 +109,7 @@ def __init__(self, model_path: str, engine_config: PytorchEngineConfig = None, * self.worker = self._create_worker(model_path, engine_config, log_level=logger.level, **kwargs) super().__init__() + self._engine_output = defaultdict(lambda: EngineOutput(status=None, token_ids=[], num_token=0, logprobs=[])) def _init_ray(self, engine_config: PytorchEngineConfig = None): """Initialize Ray.""" @@ -142,9 +160,22 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) + def __merge_engine_output(result): + if not isinstance(result, EngineOutput): + return + + output = self._engine_output[stream_id] + output.token_ids.extend(result.token_ids or []) + output.logprobs.extend(result.logprobs or []) + result.token_ids = output.token_ids or [] + result.logprobs = output.logprobs or None + if stopped: + self._engine_output.pop(stream_id, None) + stopped = False while not stopped: result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) + __merge_engine_output(result) yield result def close(self) -> None: From 324232e719ed55029d2ce754d3141353ad717fe0 Mon Sep 17 00:00:00 2001 From: irexyc Date: Fri, 17 Oct 2025 22:28:38 +0000 Subject: [PATCH 08/10] fix lint --- lmdeploy/pytorch/engine/mp_engine/ray_engine.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 69a8afd8f8..68362588e9 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -4,11 +4,12 @@ from typing import Dict import ray +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + from lmdeploy.messages import EngineOutput, PytorchEngineConfig from lmdeploy.pytorch import envs as _envs from lmdeploy.pytorch.ray import RayContext, get_device_str, get_resource_kwargs from lmdeploy.utils import get_logger -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from .base import MPEngine from .base_worker import EngineWorkerBase From c24991f3b3a233d1372fc8a597d9c572b203ebc3 Mon Sep 17 00:00:00 2001 From: irexyc Date: Sun, 19 Oct 2025 17:42:40 +0800 Subject: [PATCH 09/10] fix offset when not return logprobs --- lmdeploy/pytorch/engine/mp_engine/ray_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 68362588e9..7b83d3fac3 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -73,7 +73,7 @@ def __return_engine_output_increamental(result): if not isinstance(result, EngineOutput): return old_offset = self._engine_output_offset[stream_id] - new_offset = len(result.logprobs) + new_offset = len(result.token_ids) if old_offset: if result.token_ids: result.token_ids = result.token_ids[old_offset:] From ff459d93c63997cd51eacdb3842abc0c6384b8fe Mon Sep 17 00:00:00 2001 From: irexyc Date: Mon, 20 Oct 2025 10:58:54 +0800 Subject: [PATCH 10/10] update --- lmdeploy/pytorch/engine/mp_engine/ray_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 7b83d3fac3..cfc7a53a37 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -69,7 +69,7 @@ async def get_stream_task_result(self, stream_id: int): result, stopped = self._stream_aiter[stream_id][1] event.clear() - def __return_engine_output_increamental(result): + def __return_engine_output_incrementally(result): if not isinstance(result, EngineOutput): return old_offset = self._engine_output_offset[stream_id] @@ -83,7 +83,7 @@ def __return_engine_output_increamental(result): if stopped: self._engine_output_offset.pop(stream_id, None) - __return_engine_output_increamental(result) + __return_engine_output_incrementally(result) if stopped: self._stream_aiter.pop(stream_id, None)