Skip to content

Commit 59b9d09

Browse files
authored
refactor(loader): clarify qdrant retry metrics (#158)
1 parent d712b2b commit 59b9d09

File tree

7 files changed

+81
-16
lines changed

7 files changed

+81
-16
lines changed

docker/pyproject.deps.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "mcp-plex"
3-
version = "2.0.18"
3+
version = "2.0.19"
44
requires-python = ">=3.11,<3.13"
55
dependencies = [
66
"fastmcp>=2.11.2",

docs/loader.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Loader Operations
2+
3+
The loader emits structured log records throughout ingestion so operators can
4+
monitor progress without scraping stdout dumps.
5+
6+
## Qdrant retry summary
7+
8+
After every run the loader processes the in-memory Qdrant retry queue and
9+
reports the outcome via a structured ``INFO`` log:
10+
11+
```
12+
Qdrant retry summary
13+
```
14+
15+
The log record is tagged with ``event="qdrant_retry_summary"`` and includes two
16+
integer attributes:
17+
18+
- ``succeeded_points`` – number of points that were reindexed successfully after
19+
retrying.
20+
- ``failed_points`` – number of points that still failed after exhausting all
21+
retry attempts and therefore remain missing from the collection.
22+
23+
Use your logging aggregator or ``caplog`` when testing to filter on the
24+
``qdrant_retry_summary`` event and confirm ingestion health.

mcp_plex/loader/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,12 +437,20 @@ async def run(
437437
if not items:
438438
logger.info("No points to upsert")
439439

440-
await _process_qdrant_retry_queue(
440+
succeeded_points, failed_points = await _process_qdrant_retry_queue(
441441
client,
442442
collection_name,
443443
qdrant_retry_queue,
444444
config=qdrant_config,
445445
)
446+
logger.info(
447+
"Qdrant retry summary",
448+
extra={
449+
"event": "qdrant_retry_summary",
450+
"succeeded_points": succeeded_points,
451+
"failed_points": failed_points,
452+
},
453+
)
446454

447455
if imdb_queue_path:
448456
_persist_imdb_retry_queue(imdb_queue_path, imdb_config.retry_queue)

mcp_plex/loader/qdrant.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -309,18 +309,24 @@ async def _process_qdrant_retry_queue(
309309
retry_queue: asyncio.Queue[list[models.PointStruct]],
310310
*,
311311
config: "QdrantRuntimeConfig",
312-
) -> None:
313-
"""Retry failed Qdrant batches with exponential backoff."""
312+
) -> tuple[int, int]:
313+
"""Retry failed Qdrant batches with exponential backoff.
314+
315+
Returns a tuple containing the number of points that were retried successfully
316+
and the number that still failed after exhausting ``config.retry_attempts``.
317+
"""
314318

315319
if retry_queue.empty():
316-
return
320+
return 0, 0
317321

318322
pending = retry_queue.qsize()
319323
logger.info("Retrying %d failed Qdrant batches", pending)
324+
succeeded_points = 0
325+
failed_points = 0
320326
while not retry_queue.empty():
321327
batch = await retry_queue.get()
322-
attempt = 1
323-
while attempt <= config.retry_attempts:
328+
batch_size = len(batch)
329+
for attempt in range(1, config.retry_attempts + 1):
324330
try:
325331
await client.upsert(
326332
collection_name=collection_name,
@@ -331,26 +337,29 @@ async def _process_qdrant_retry_queue(
331337
"Retry %d/%d failed for Qdrant batch of %d points",
332338
attempt,
333339
config.retry_attempts,
334-
len(batch),
340+
batch_size,
335341
)
336-
attempt += 1
337-
if attempt > config.retry_attempts:
342+
if attempt == config.retry_attempts:
338343
logger.error(
339344
"Giving up on Qdrant batch after %d attempts; %d points were not indexed",
340345
config.retry_attempts,
341-
len(batch),
346+
batch_size,
342347
)
348+
failed_points += batch_size
343349
break
344-
await asyncio.sleep(config.retry_backoff * attempt)
345-
continue
350+
351+
next_attempt = attempt + 1
352+
await asyncio.sleep(config.retry_backoff * next_attempt)
346353
else:
347354
logger.info(
348355
"Successfully retried Qdrant batch of %d points on attempt %d",
349-
len(batch),
356+
batch_size,
350357
attempt,
351358
)
359+
succeeded_points += batch_size
352360
break
353361

362+
return succeeded_points, failed_points
354363

355364
__all__ = [
356365
"_DENSE_MODEL_PARAMS",

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "mcp-plex"
7-
version = "2.0.18"
7+
version = "2.0.19"
88

99
description = "Plex-Oriented Model Context Protocol Server"
1010
requires-python = ">=3.11,<3.13"

tests/test_loader_logging.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,30 @@ def test_run_logs_no_points(monkeypatch, caplog):
6262
assert "Ingestion stage finished" in caplog.text
6363

6464

65+
def test_run_logs_qdrant_retry_summary(monkeypatch, caplog):
66+
monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient)
67+
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
68+
69+
async def fake_process_retry_queue(*args, **kwargs):
70+
return 7, 3
71+
72+
monkeypatch.setattr(loader, "_process_qdrant_retry_queue", fake_process_retry_queue)
73+
74+
with caplog.at_level(logging.INFO):
75+
asyncio.run(loader.run(None, None, None, sample_dir, None, None))
76+
77+
summary_records = [
78+
record
79+
for record in caplog.records
80+
if record.levelno == logging.INFO
81+
and getattr(record, "event", None) == "qdrant_retry_summary"
82+
]
83+
assert summary_records, "Expected a qdrant retry summary log record"
84+
record = summary_records[-1]
85+
assert getattr(record, "succeeded_points", None) == 7
86+
assert getattr(record, "failed_points", None) == 3
87+
88+
6589
def test_run_rejects_invalid_upsert_buffer_size(monkeypatch):
6690
monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient)
6791
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)