From 5d0802b68075991d67ad5fccf91ed849b51e6b1f Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Mon, 10 Nov 2025 18:17:55 +0000 Subject: [PATCH 01/11] add buffer tables --- .../storage_clients/_sql/_client_mixin.py | 190 ++++++++++++++- .../storage_clients/_sql/_dataset_client.py | 95 ++++++-- .../storage_clients/_sql/_db_models.py | 114 ++++++++- .../_sql/_key_value_store_client.py | 98 +++++--- .../_sql/_request_queue_client.py | 222 +++++++++++++----- 5 files changed, 604 insertions(+), 115 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py index c681e3a220..1d98873595 100644 --- a/src/crawlee/storage_clients/_sql/_client_mixin.py +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -2,11 +2,12 @@ from abc import ABC, abstractmethod from contextlib import asynccontextmanager -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from logging import getLogger from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast, overload -from sqlalchemy import delete, select, text, update +from sqlalchemy import CursorResult, delete, select, text, update +from sqlalchemy import func as sql_func from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as lite_insert from sqlalchemy.exc import SQLAlchemyError @@ -25,10 +26,13 @@ from ._db_models import ( DatasetItemDb, + DatasetMetadataBufferDb, DatasetMetadataDb, + KeyValueStoreMetadataBufferDb, KeyValueStoreMetadataDb, KeyValueStoreRecordDb, RequestDb, + RequestQueueMetadataBufferDb, RequestQueueMetadataDb, ) from ._storage_client import SqlStorageClient @@ -57,12 +61,20 @@ class SqlClientMixin(ABC): _METADATA_TABLE: ClassVar[type[DatasetMetadataDb | KeyValueStoreMetadataDb | RequestQueueMetadataDb]] """SQLAlchemy model for metadata.""" + _BUFFER_TABLE: ClassVar[ + type[KeyValueStoreMetadataBufferDb | DatasetMetadataBufferDb | RequestQueueMetadataBufferDb] + ] + """SQLAlchemy model for metadata buffer.""" + _ITEM_TABLE: ClassVar[type[DatasetItemDb | KeyValueStoreRecordDb | RequestDb]] """SQLAlchemy model for items.""" _CLIENT_TYPE: ClassVar[str] """Human-readable client type for error messages.""" + _BLOCK_BUFFER_TIME = timedelta(milliseconds=200) + """Time interval that blocks buffer reading to update metadata.""" + def __init__(self, *, id: str, storage_client: SqlStorageClient) -> None: self._id = id self._storage_client = storage_client @@ -109,7 +121,9 @@ async def _open( if orm_metadata: client = cls(id=orm_metadata.id, storage_client=storage_client) - await client._update_metadata(session, update_accessed_at=True) + await client._add_buffer_record(session) + # Ensure any pending buffer updates are processed + await client._process_buffers() else: now = datetime.now(timezone.utc) metadata = metadata_model( @@ -262,6 +276,9 @@ async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None: Args: metadata_kwargs: Arguments to pass to _update_metadata. """ + # Force process buffers to ensure metadata is up to date before purging + await self._process_buffers(force=True) + stmt = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.storage_id == self._id) async with self.get_session(with_simple_commit=True) as session: await session.execute(stmt) @@ -290,6 +307,9 @@ async def _get_metadata( self, metadata_model: type[DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata] ) -> DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata: """Retrieve client metadata.""" + # Process any pending buffer updates first + await self._process_buffers() + async with self.get_session() as session: orm_metadata = await session.get(self._METADATA_TABLE, self._id) if not orm_metadata: @@ -383,3 +403,167 @@ async def _update_metadata( return True return False + + @abstractmethod + def _prepare_buffer_data(self, **kwargs: Any) -> dict[str, Any]: + """Prepare storage-specific buffer data. Must be implemented by concrete classes.""" + + @abstractmethod + async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: + """Apply aggregated buffer updates to metadata. Must be implemented by concrete classes. + + Args: + session: Active database session. + max_buffer_id: Maximum buffer record ID to process. + """ + + async def _add_buffer_record( + self, + session: AsyncSession, + *, + update_modified_at: bool = False, + **kwargs: Any, + ) -> None: + """Add a record to the buffer table and update metadata. + + Args: + session: Active database session. + update_modified_at: Whether to update modified_at timestamp. + **kwargs: Additional arguments for _prepare_buffer_data. + """ + now = datetime.now(timezone.utc) + values_to_set = { + 'storage_id': self._id, + 'accessed_at': now, # All entries in the buffer require updating `accessed_at` + 'modified_at': now if update_modified_at else None, + } + values_to_set.update(self._prepare_buffer_data(**kwargs)) + + session.add(self._BUFFER_TABLE(**values_to_set)) + + async def _try_acquire_buffer_lock(self, session: AsyncSession, *, force: bool = False) -> bool: + """Try to acquire buffer processing lock for 200ms. + + Args: + session: Active database session. + force: If True, forcefully acquire lock regardless of current state. + + Returns: + True if lock was acquired, False if already locked by another process. + """ + now = datetime.now(timezone.utc) + lock_until = now + self._BLOCK_BUFFER_TIME + dialect = self._storage_client.get_dialect_name() + + if dialect == 'postgresql': + select_stmt = ( + select(self._METADATA_TABLE) + .where( + self._METADATA_TABLE.id == self._id, + (self._METADATA_TABLE.buffer_locked_until.is_(None)) + | (self._METADATA_TABLE.buffer_locked_until < now), + ) + .with_for_update(skip_locked=True) + ) + result = await session.execute(select_stmt) + metadata_row = result.scalar_one_or_none() + + if metadata_row is None: + # Either conditions not met OR row is locked by another process + return False + + if force: + # Force acquire lock regardless of current state + update_stmt = ( + update(self._METADATA_TABLE) + .where(self._METADATA_TABLE.id == self._id) + .values(buffer_locked_until=lock_until) + ) + else: + # Acquire lock only if not currently locked or lock has expired + update_stmt = ( + update(self._METADATA_TABLE) + .where( + self._METADATA_TABLE.id == self._id, + (self._METADATA_TABLE.buffer_locked_until.is_(None)) + | (self._METADATA_TABLE.buffer_locked_until < now), + ) + .values(buffer_locked_until=lock_until) + ) + + result = await session.execute(update_stmt) + result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result + + if result.rowcount > 0: + await session.flush() + return True + + return False + + async def _release_buffer_lock(self, session: AsyncSession) -> None: + """Release buffer processing lock by setting buffer_locked_until to NULL. + + Args: + session: Active database session. + """ + stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(buffer_locked_until=None) + + await session.execute(stmt) + + await session.flush() + + async def _has_pending_buffer_updates(self, session: AsyncSession) -> bool: + """Check if there are pending buffer updates not yet applied to metadata. + + Returns False only when buffer_locked_until is NULL (metadata is consistent). + + Returns: + True if metadata might be inconsistent due to pending buffer updates. + """ + result = await session.execute( + select(self._METADATA_TABLE.buffer_locked_until).where(self._METADATA_TABLE.id == self._id) + ) + + locked_until = result.scalar() + + # Any non-NULL value means there are pending updates + return locked_until is not None + + async def _process_buffers(self, *, force: bool = False) -> None: + """Process pending buffer updates and apply them to metadata. + + Args: + force: If True, forcefully acquire lock and process buffers. + """ + async with self.get_session(with_simple_commit=True) as session: + # Try to acquire buffer processing lock + if not await self._try_acquire_buffer_lock(session, force=force): + # Another process is currently processing buffers or lock acquisition failed + return + + # Get the maximum buffer ID at this moment + # This creates a consistent snapshot - records added during processing won't be included + max_buffer_id_stmt = select(sql_func.max(self._BUFFER_TABLE.id)).where( + self._BUFFER_TABLE.storage_id == self._id + ) + + result = await session.execute(max_buffer_id_stmt) + max_buffer_id = result.scalar() + + if max_buffer_id is None: + # No buffer records to process + return + + # Apply aggregated buffer updates to metadata using only records <= max_buffer_id + # This method is implemented by concrete storage classes + await self._apply_buffer_updates(session, max_buffer_id=max_buffer_id) + + # Clean up only the processed buffer records (those <= max_buffer_id) + delete_stmt = delete(self._BUFFER_TABLE).where( + self._BUFFER_TABLE.storage_id == self._id, self._BUFFER_TABLE.id <= max_buffer_id + ) + + await session.execute(delete_stmt) + + # Release the lock after successful processing + await self._release_buffer_lock(session) diff --git a/src/crawlee/storage_clients/_sql/_dataset_client.py b/src/crawlee/storage_clients/_sql/_dataset_client.py index 61873975c8..b2d4feb6cf 100644 --- a/src/crawlee/storage_clients/_sql/_dataset_client.py +++ b/src/crawlee/storage_clients/_sql/_dataset_client.py @@ -3,19 +3,21 @@ from logging import getLogger from typing import TYPE_CHECKING, Any -from sqlalchemy import Select, insert, select +from sqlalchemy import Select, insert, select, update +from sqlalchemy import func as sql_func from typing_extensions import Self, override from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata from ._client_mixin import MetadataUpdateParams, SqlClientMixin -from ._db_models import DatasetItemDb, DatasetMetadataDb +from ._db_models import DatasetItemDb, DatasetMetadataBufferDb, DatasetMetadataDb if TYPE_CHECKING: from collections.abc import AsyncIterator from sqlalchemy import Select + from sqlalchemy.ext.asyncio import AsyncSession from typing_extensions import NotRequired from ._storage_client import SqlStorageClient @@ -58,6 +60,9 @@ class SqlDatasetClient(DatasetClient, SqlClientMixin): _CLIENT_TYPE = 'Dataset' """Human-readable client type for error messages.""" + _BUFFER_TABLE = DatasetMetadataBufferDb + """SQLAlchemy model for metadata buffer.""" + def __init__( self, *, @@ -126,7 +131,6 @@ async def purge(self) -> None: new_item_count=0, update_accessed_at=True, update_modified_at=True, - force=True, ) ) @@ -135,23 +139,13 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: if not isinstance(data, list): data = [data] - db_items: list[dict[str, Any]] = [] db_items = [{'dataset_id': self._id, 'data': item} for item in data] stmt = insert(self._ITEM_TABLE).values(db_items) async with self.get_session(with_simple_commit=True) as session: await session.execute(stmt) - await self._update_metadata( - session, - **_DatasetMetadataUpdateParams( - update_accessed_at=True, - update_modified_at=True, - delta_item_count=len(data), - new_item_count=len(data), - force=True, - ), - ) + await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data)) @override async def get_data( @@ -183,15 +177,11 @@ async def get_data( view=view, ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: result = await session.execute(stmt) db_items = result.scalars().all() - updated = await self._update_metadata(session, **_DatasetMetadataUpdateParams(update_accessed_at=True)) - - # Commit updates to the metadata - if updated: - await session.commit() + await self._add_buffer_record(session) items = [db_item.data for db_item in db_items] metadata = await self.get_metadata() @@ -230,17 +220,13 @@ async def iterate_items( skip_hidden=skip_hidden, ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: db_items = await session.stream_scalars(stmt) async for db_item in db_items: yield db_item.data - updated = await self._update_metadata(session, **_DatasetMetadataUpdateParams(update_accessed_at=True)) - - # Commit updates to the metadata - if updated: - await session.commit() + await self._add_buffer_record(session) def _prepare_get_stmt( self, @@ -308,3 +294,60 @@ def _specific_update_metadata( values_to_set['item_count'] = self._METADATA_TABLE.item_count + delta_item_count return values_to_set + + def _prepare_buffer_data(self, delta_item_count: int | None = None, **_kwargs: Any) -> dict[str, Any]: + """Prepare key-value store specific buffer data. + + For KeyValueStore, we don't have specific metadata fields to track in buffer, + so we just return empty dict. The base buffer will handle accessed_at/modified_at. + + Args: + delta_item_count: If provided, add this value to the current item count. + **kwargs: Additional arguments (unused for key-value store). + + Returns: + Empty dict as key-value stores don't have specific metadata fields. + """ + buffer_data = {} + if delta_item_count is not None: + buffer_data['delta_item_count'] = delta_item_count + + return buffer_data + + async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: + """Apply aggregated buffer updates to key-value store metadata. + + For KeyValueStore, we aggregate accessed_at and modified_at timestamps + from buffer records and apply them to the metadata. + + Args: + session: Active database session. + max_buffer_id: Maximum buffer record ID to process (inclusive). + """ + # Get aggregated timestamps from buffer records + aggregation_stmt = select( + sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'), + sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'), + sql_func.sum(self._BUFFER_TABLE.delta_item_count).label('delta_item_count'), + ).where(self._BUFFER_TABLE.storage_id == self._id, self._BUFFER_TABLE.id <= max_buffer_id) + + result = await session.execute(aggregation_stmt) + row = result.first() + + if not row: + return + + # Prepare updates for metadata + values_to_update = { + 'accessed_at': row.max_accessed_at, + } + + if row.max_modified_at: + values_to_update['modified_at'] = row.max_modified_at + + if row.delta_item_count: + values_to_update['item_count'] = self._METADATA_TABLE.item_count + row.delta_item_count + + update_stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(**values_to_update) + + await session.execute(update_stmt) diff --git a/src/crawlee/storage_clients/_sql/_db_models.py b/src/crawlee/storage_clients/_sql/_db_models.py index 2a8f8b565b..cabdb80e7c 100644 --- a/src/crawlee/storage_clients/_sql/_db_models.py +++ b/src/crawlee/storage_clients/_sql/_db_models.py @@ -67,6 +67,9 @@ class StorageMetadataDb: modified_at: Mapped[datetime] = mapped_column(AwareDateTime, nullable=False) """Last modification datetime.""" + buffer_locked_until: Mapped[datetime | None] = mapped_column(AwareDateTime, nullable=True) + """Timestamp until which buffer processing is locked for this storage. NULL = unlocked.""" + class DatasetMetadataDb(StorageMetadataDb, Base): """Metadata table for datasets.""" @@ -84,6 +87,11 @@ class DatasetMetadataDb(StorageMetadataDb, Base): back_populates='dataset', cascade='all, delete-orphan', lazy='noload' ) + # Relationship to metadata buffer updates + buffer: Mapped[list[DatasetMetadataBufferDb]] = relationship( + back_populates='dataset', cascade='all, delete-orphan', lazy='noload' + ) + id = synonym('dataset_id') """Alias for dataset_id to match Pydantic expectations.""" @@ -92,6 +100,13 @@ class RequestQueueMetadataDb(StorageMetadataDb, Base): """Metadata table for request queues.""" __tablename__ = 'request_queues' + __table_args__ = ( + Index( + 'idx_buffer_lock', + 'request_queue_id', + 'buffer_locked_until', + ), + ) request_queue_id: Mapped[str] = mapped_column(String(20), nullable=False, primary_key=True) """Unique identifier for the request queue.""" @@ -117,6 +132,11 @@ class RequestQueueMetadataDb(StorageMetadataDb, Base): back_populates='queue', cascade='all, delete-orphan', lazy='noload' ) + # Relationship to metadata buffer updates + buffer: Mapped[list[RequestQueueMetadataBufferDb]] = relationship( + back_populates='queue', cascade='all, delete-orphan', lazy='noload' + ) + id = synonym('request_queue_id') """Alias for request_queue_id to match Pydantic expectations.""" @@ -134,6 +154,11 @@ class KeyValueStoreMetadataDb(StorageMetadataDb, Base): back_populates='kvs', cascade='all, delete-orphan', lazy='noload' ) + # Relationship to metadata buffer updates + buffer: Mapped[list[KeyValueStoreMetadataBufferDb]] = relationship( + back_populates='kvs', cascade='all, delete-orphan', lazy='noload' + ) + id = synonym('key_value_store_id') """Alias for key_value_store_id to match Pydantic expectations.""" @@ -206,7 +231,7 @@ class RequestDb(Base): 'request_queue_id', 'is_handled', 'sequence_number', - postgresql_where=text('is_handled is false'), + postgresql_where=text('is_handled = false'), ), ) @@ -266,3 +291,90 @@ class VersionDb(Base): __tablename__ = 'version' version: Mapped[str] = mapped_column(String(10), nullable=False, primary_key=True) + + +class MetadataBufferDb: + """Base model for metadata update buffer tables.""" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + """Auto-increment primary key for ordering.""" + + # Timestamp fields - use max value when aggregating + accessed_at: Mapped[datetime | None] = mapped_column(AwareDateTime, nullable=False) + """New accessed_at timestamp, if being updated.""" + + modified_at: Mapped[datetime | None] = mapped_column(AwareDateTime, nullable=True) + """New modified_at timestamp, if being updated.""" + + +class KeyValueStoreMetadataBufferDb(MetadataBufferDb, Base): + """Buffer table for deferred key-value store metadata updates to reduce concurrent access issues.""" + + __tablename__ = 'key_value_store_metadata_buffer' + + key_value_store_id: Mapped[str] = mapped_column( + String(20), ForeignKey('key_value_stores.key_value_store_id', ondelete='CASCADE'), nullable=False, index=True + ) + """ID of the key-value store being updated.""" + + # Relationship back to key-value store metadata + kvs: Mapped[KeyValueStoreMetadataDb] = relationship(back_populates='buffer') + + storage_id = synonym('key_value_store_id') + """Alias for key_value_store_id to match SqlClientMixin expectations.""" + + +class DatasetMetadataBufferDb(MetadataBufferDb, Base): + """Buffer table for deferred dataset metadata updates to reduce concurrent access issues.""" + + __tablename__ = 'dataset_metadata_buffer' + + dataset_id: Mapped[str] = mapped_column( + String(20), ForeignKey('datasets.dataset_id', ondelete='CASCADE'), nullable=False, index=True + ) + """ID of the dataset being updated.""" + + # Counter deltas - use SUM when aggregating + delta_item_count: Mapped[int | None] = mapped_column(Integer, nullable=True) + """Delta for dataset item_count.""" + + # Relationship back to dataset metadata + dataset: Mapped[DatasetMetadataDb] = relationship(back_populates='buffer') + + storage_id = synonym('dataset_id') + """Alias for dataset_id to match SqlClientMixin expectations.""" + + +class RequestQueueMetadataBufferDb(MetadataBufferDb, Base): + """Buffer table for deferred request queue metadata updates to reduce concurrent access issues.""" + + __tablename__ = 'request_queue_metadata_buffer' + + request_queue_id: Mapped[str] = mapped_column( + String(20), ForeignKey('request_queues.request_queue_id', ondelete='CASCADE'), nullable=False, index=True + ) + """ID of the request queue being updated.""" + + client_id: Mapped[str] = mapped_column(String(32), nullable=False) + """Identifier of the client making this update.""" + + # Counter deltas - use SUM when aggregating + delta_handled_count: Mapped[int | None] = mapped_column(Integer, nullable=True) + """Delta for handled_request_count.""" + + delta_pending_count: Mapped[int | None] = mapped_column(Integer, nullable=True) + """Delta for pending_request_count.""" + + delta_total_count: Mapped[int | None] = mapped_column(Integer, nullable=True) + """Delta for total_request_count.""" + + need_recalc: Mapped[bool | None] = mapped_column(Boolean, nullable=False, default=False) + """Flag indicating that counters need recalculation from actual data.""" + + # Relationship back to request queue metadata + queue: Mapped[RequestQueueMetadataDb] = relationship(back_populates='buffer') + + __table_args__ = (Index('idx_rq_client', 'request_queue_id', 'client_id'),) + + storage_id = synonym('request_queue_id') + """Alias for request_queue_id to match SqlClientMixin expectations.""" diff --git a/src/crawlee/storage_clients/_sql/_key_value_store_client.py b/src/crawlee/storage_clients/_sql/_key_value_store_client.py index dfa02d8014..d9b3ca04ea 100644 --- a/src/crawlee/storage_clients/_sql/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_sql/_key_value_store_client.py @@ -4,19 +4,26 @@ from logging import getLogger from typing import TYPE_CHECKING, Any, cast -from sqlalchemy import CursorResult, delete, select +from sqlalchemy import CursorResult, delete, select, update +from sqlalchemy import func as sql_func from typing_extensions import Self, override from crawlee._utils.file import infer_mime_type from crawlee.storage_clients._base import KeyValueStoreClient -from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata +from crawlee.storage_clients.models import ( + KeyValueStoreMetadata, + KeyValueStoreRecord, + KeyValueStoreRecordMetadata, +) from ._client_mixin import MetadataUpdateParams, SqlClientMixin -from ._db_models import KeyValueStoreMetadataDb, KeyValueStoreRecordDb +from ._db_models import KeyValueStoreMetadataBufferDb, KeyValueStoreMetadataDb, KeyValueStoreRecordDb if TYPE_CHECKING: from collections.abc import AsyncIterator + from sqlalchemy.ext.asyncio import AsyncSession + from ._storage_client import SqlStorageClient @@ -57,6 +64,9 @@ class SqlKeyValueStoreClient(KeyValueStoreClient, SqlClientMixin): _CLIENT_TYPE = 'Key-value store' """Human-readable client type for error messages.""" + _BUFFER_TABLE = KeyValueStoreMetadataBufferDb + """SQLAlchemy model for metadata buffer.""" + def __init__( self, *, @@ -165,9 +175,7 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No async with self.get_session(with_simple_commit=True) as session: await session.execute(upsert_stmt) - await self._update_metadata( - session, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True) - ) + await self._add_buffer_record(session, update_modified_at=True) @override async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: @@ -175,15 +183,11 @@ async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: stmt = select(self._ITEM_TABLE).where( self._ITEM_TABLE.key_value_store_id == self._id, self._ITEM_TABLE.key == key ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: result = await session.execute(stmt) record_db = result.scalar_one_or_none() - updated = await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) - - # Commit updates to the metadata - if updated: - await session.commit() + await self._add_buffer_record(session) if not record_db: return None @@ -231,11 +235,7 @@ async def delete_value(self, *, key: str) -> None: # Update metadata if we actually deleted something if result.rowcount > 0: - await self._update_metadata( - session, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True) - ) - - await session.commit() + await self._add_buffer_record(session, update_accessed_at=True, update_modified_at=True) @override async def iterate_keys( @@ -259,7 +259,7 @@ async def iterate_keys( if limit is not None: stmt = stmt.limit(limit) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: result = await session.stream(stmt.execution_options(stream_results=True)) async for row in result: @@ -269,26 +269,18 @@ async def iterate_keys( size=row.size, ) - updated = await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) - - # Commit updates to the metadata - if updated: - await session.commit() + await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) @override async def record_exists(self, *, key: str) -> bool: stmt = select(self._ITEM_TABLE.key).where( self._ITEM_TABLE.key_value_store_id == self._id, self._ITEM_TABLE.key == key ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: # Check if record exists result = await session.execute(stmt) - updated = await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) - - # Commit updates to the metadata - if updated: - await session.commit() + await self._add_buffer_record(session) return result.scalar_one_or_none() is not None @@ -298,3 +290,51 @@ async def get_public_url(self, *, key: str) -> str: def _specific_update_metadata(self, **_kwargs: dict[str, Any]) -> dict[str, Any]: return {} + + def _prepare_buffer_data(self, **_kwargs: Any) -> dict[str, Any]: + """Prepare key-value store specific buffer data. + + For KeyValueStore, we don't have specific metadata fields to track in buffer, + so we just return empty dict. The base buffer will handle accessed_at/modified_at. + + Args: + **kwargs: Additional arguments (unused for key-value store). + + Returns: + Empty dict as key-value stores don't have specific metadata fields. + """ + return {} + + async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: + """Apply aggregated buffer updates to key-value store metadata. + + For KeyValueStore, we aggregate accessed_at and modified_at timestamps + from buffer records and apply them to the metadata. + + Args: + session: Active database session. + max_buffer_id: Maximum buffer record ID to process (inclusive). + """ + # Get aggregated timestamps from buffer records + aggregation_stmt = select( + sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'), + sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'), + ).where(self._BUFFER_TABLE.storage_id == self._id, self._BUFFER_TABLE.id <= max_buffer_id) + + result = await session.execute(aggregation_stmt) + row = result.first() + + if not row: + return + + # Prepare updates for metadata + values_to_update = { + 'accessed_at': row.max_accessed_at, + } + + if row.max_modified_at: + values_to_update['modified_at'] = row.max_modified_at + + update_stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(**values_to_update) + + await session.execute(update_stmt) diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index f5a320bb21..35fa59e9df 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -7,7 +7,8 @@ from logging import getLogger from typing import TYPE_CHECKING, Any, cast -from sqlalchemy import CursorResult, func, or_, select, update +from sqlalchemy import CursorResult, exists, func, or_, select, update +from sqlalchemy import func as sql_func from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import load_only from typing_extensions import NotRequired, Self, override @@ -23,12 +24,13 @@ ) from ._client_mixin import MetadataUpdateParams, SqlClientMixin -from ._db_models import RequestDb, RequestQueueMetadataDb, RequestQueueStateDb +from ._db_models import RequestDb, RequestQueueMetadataBufferDb, RequestQueueMetadataDb, RequestQueueStateDb if TYPE_CHECKING: from collections.abc import Sequence from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.sql import ColumnElement from ._storage_client import SqlStorageClient @@ -93,6 +95,9 @@ class SqlRequestQueueClient(RequestQueueClient, SqlClientMixin): """Number of seconds for which a request is considered blocked in the database after being fetched for processing. """ + _BUFFER_TABLE = RequestQueueMetadataBufferDb + """SQLAlchemy model for metadata buffer.""" + def __init__( self, *, @@ -111,6 +116,9 @@ def __init__( self.client_key = crypto_random_object_id(length=32)[:32] """Unique identifier for this client instance.""" + self._had_multiple_clients = False + """Indicates whether the queue has been accessed by multiple clients.""" + @classmethod async def open( cls, @@ -155,7 +163,9 @@ async def open( @override async def get_metadata(self) -> RequestQueueMetadata: # The database is a single place of truth - return await self._get_metadata(RequestQueueMetadata) + metadata = await self._get_metadata(RequestQueueMetadata) + self._had_multiple_clients = metadata.had_multiple_clients + return metadata @override async def drop(self) -> None: @@ -338,14 +348,10 @@ async def add_batch_of_requests( insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values) await session.execute(insert_stmt_with_ignore) - await self._update_metadata( + await self._add_buffer_record( session, - **_QueueMetadataUpdateParams( - recalculate=metadata_recalculate, - update_modified_at=True, - update_accessed_at=True, - force=metadata_recalculate, - ), + recalculate=metadata_recalculate, + update_modified_at=True, ) try: @@ -383,7 +389,7 @@ async def get_request(self, unique_key: str) -> Request | None: stmt = select(self._ITEM_TABLE).where( self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.request_id == request_id ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: result = await session.execute(stmt) request_db = result.scalar_one_or_none() @@ -391,11 +397,7 @@ async def get_request(self, unique_key: str) -> Request | None: logger.warning(f'Request with ID "{unique_key}" not found in the queue.') return None - updated = await self._update_metadata(session, update_accessed_at=True) - - # Commit updates to the metadata - if updated: - await session.commit() + await self._add_buffer_record(session) return Request.model_validate_json(request_db.data) @@ -413,14 +415,14 @@ async def fetch_next_request(self) -> Request | None: select(self._ITEM_TABLE) .where( self._ITEM_TABLE.request_queue_id == self._id, - self._ITEM_TABLE.is_handled.is_(False), + self._ITEM_TABLE.is_handled == False, # noqa: E712 or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), ) .order_by(self._ITEM_TABLE.sequence_number.asc()) .limit(self._MAX_BATCH_FETCH_SIZE) ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: # We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client if dialect == 'postgresql': stmt = stmt.with_for_update(skip_locked=True) @@ -456,7 +458,7 @@ async def fetch_next_request(self) -> Request | None: .where( self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.request_id.in_(request_ids), - self._ITEM_TABLE.is_handled.is_(False), + self._ITEM_TABLE.is_handled == False, # noqa: E712 or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), ) .values(time_blocked_until=block_until, client_key=self.client_key) @@ -470,9 +472,7 @@ async def fetch_next_request(self) -> Request | None: await session.rollback() return None - await self._update_metadata(session, **_QueueMetadataUpdateParams(update_accessed_at=True)) - - await session.commit() + await self._add_buffer_record(session) requests = [Request.model_validate_json(r.data) for r in requests_db if r.request_id in blocked_ids] @@ -497,7 +497,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.request_id == request_id) .values(is_handled=True, time_blocked_until=None, client_key=None, data=request.model_dump_json()) ) - async with self.get_session() as session: + async with self.get_session(with_simple_commit=True) as session: result = await session.execute(stmt) result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result @@ -505,17 +505,9 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | logger.warning(f'Request {request.unique_key} not found in database.') return None - await self._update_metadata( - session, - **_QueueMetadataUpdateParams( - delta_handled_request_count=1, - delta_pending_request_count=-1, - update_modified_at=True, - update_accessed_at=True, - force=True, - ), + await self._add_buffer_record( + session, update_modified_at=True, delta_pending_request_count=-1, delta_handled_request_count=1 ) - await session.commit() return ProcessedRequest( unique_key=request.unique_key, was_already_present=True, @@ -567,9 +559,7 @@ async def reclaim_request( if result.rowcount == 0: logger.warning(f'Request {request.unique_key} not found in database.') return None - await self._update_metadata( - session, **_QueueMetadataUpdateParams(update_modified_at=True, update_accessed_at=True) - ) + await self._add_buffer_record(session, update_modified_at=True) # put the forefront request at the beginning of the cache if forefront: @@ -587,30 +577,30 @@ async def is_empty(self) -> bool: if self._pending_fetch_cache: return False - # Check database for unhandled requests - async with self.get_session() as session: - metadata_orm = await session.get(self._METADATA_TABLE, self._id) - if not metadata_orm: - raise ValueError(f'Request queue with ID "{self._id}" not found.') + metadata = await self.get_metadata() - empty = metadata_orm.pending_request_count == 0 + async with self.get_session(with_simple_commit=True) as session: + # If there are no pending requests, check if there are any buffered updates + if metadata.pending_request_count == 0: + buffer_check_stmt = select( + exists().where( + (self._BUFFER_TABLE.storage_id == self._id) + & ( + (self._BUFFER_TABLE.delta_pending_count != 0) | (self._BUFFER_TABLE.need_recalc == True) # noqa: E712 + ) + ) + ) + buffer_result = await session.execute(buffer_check_stmt) + has_pending_buffer_updates = buffer_result.scalar() - updated = await self._update_metadata( - session, - **_QueueMetadataUpdateParams( - update_accessed_at=True, - # With multi-client access, counters may become out of sync. - # If the queue is not empty, we perform a recalculation to synchronize the counters in the metadata. - recalculate=not empty, - update_modified_at=not empty, - ), - ) + await self._add_buffer_record(session) + # If there are no pending requests and no buffered updates, the queue is empty + return not has_pending_buffer_updates - # Commit updates to the metadata - if updated: - await session.commit() + # There are pending requests (may be inaccurate), ensure recalculated metadata + await self._add_buffer_record(session, update_modified_at=True, recalculate=True) - return empty + return False async def _get_state(self, session: AsyncSession) -> RequestQueueStateDb: """Get the current state of the request queue.""" @@ -718,3 +708,123 @@ def _get_int_id_from_unique_key(unique_key: str) -> int: hashed_key = sha256(unique_key.encode('utf-8')).hexdigest() name_length = 15 return int(hashed_key[:name_length], 16) + + def _prepare_buffer_data( + self, + delta_handled_request_count: int | None = None, + delta_pending_request_count: int | None = None, + delta_total_request_count: int | None = None, + *, + recalculate: bool = False, + **_kwargs: Any, + ) -> dict[str, Any]: + """Prepare key-value store specific buffer data. + + For KeyValueStore, we don't have specific metadata fields to track in buffer, + so we just return empty dict. The base buffer will handle accessed_at/modified_at. + + Args: + delta_handled_request_count: If provided, add this value to the handled_request_count. + delta_pending_request_count: If provided, add this value to the pending_request_count. + delta_total_request_count: If provided, add this value to the total_request_count. + recalculate: If True, recalculate the pending_request_count, and total_request_count on request table. + **kwargs: Additional arguments (unused for key-value store). + + Returns: + Empty dict as key-value stores don't have specific metadata fields. + """ + buffer_data: dict[str, Any] = { + 'client_id': self.client_key, + } + + if delta_handled_request_count: + buffer_data['delta_handled_count'] = delta_handled_request_count + + if delta_pending_request_count: + buffer_data['delta_pending_count'] = delta_pending_request_count + + if delta_total_request_count: + buffer_data['delta_total_count'] = delta_total_request_count + + if recalculate: + buffer_data['need_recalc'] = True + + return buffer_data + + async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: + """Apply aggregated buffer updates to key-value store metadata. + + For KeyValueStore, we aggregate accessed_at and modified_at timestamps + from buffer records and apply them to the metadata. + + Args: + session: Active database session. + max_buffer_id: Maximum buffer record ID to process (inclusive). + """ + # Get aggregated timestamps from buffer records + + aggregations: list[ColumnElement[Any]] = [ + sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'), + sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'), + sql_func.sum(self._BUFFER_TABLE.delta_handled_count).label('delta_handled_count'), + sql_func.sum(self._BUFFER_TABLE.delta_pending_count).label('delta_pending_count'), + sql_func.sum(self._BUFFER_TABLE.delta_total_count).label('delta_total_count'), + ] + + if not self._had_multiple_clients: + aggregations.append( + sql_func.count(sql_func.distinct(self._BUFFER_TABLE.client_id)).label('unique_clients_count') + ) + + if self._storage_client.get_dialect_name() == 'postgresql': + aggregations.append(sql_func.bool_or(self._BUFFER_TABLE.need_recalc).label('need_recalc')) + else: + aggregations.append(sql_func.max(self._BUFFER_TABLE.need_recalc).label('need_recalc')) + + aggregation_stmt = select(*aggregations).where( + self._BUFFER_TABLE.storage_id == self._id, self._BUFFER_TABLE.id <= max_buffer_id + ) + + result = await session.execute(aggregation_stmt) + row = result.first() + + if not row: + return + + # Prepare updates for metadata + values_to_update = { + 'accessed_at': row.max_accessed_at, + } + + if row.max_modified_at: + values_to_update['modified_at'] = row.max_modified_at + + if not self._had_multiple_clients and row.unique_clients_count > 1: + values_to_update['had_multiple_clients'] = True + + if row.need_recalc: + values_to_update['pending_request_count'] = ( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled == False) # noqa: E712 + .scalar_subquery() + ) + else: + if row.delta_handled_count: + values_to_update['handled_request_count'] = ( + self._METADATA_TABLE.handled_request_count + row.delta_handled_count + ) + + if row.delta_pending_count: + values_to_update['pending_request_count'] = ( + self._METADATA_TABLE.pending_request_count + row.delta_pending_count + ) + + if row.delta_total_count: + values_to_update['total_request_count'] = ( + self._METADATA_TABLE.total_request_count + row.delta_total_count + ) + + update_stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(**values_to_update) + + await session.execute(update_stmt) From 4b9386fbaca2eda8b147bcddeae94084e5a8253f Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 11 Nov 2025 03:01:19 +0000 Subject: [PATCH 02/11] some optimization --- .../storage_clients/_sql/_client_mixin.py | 147 +++++------------- .../storage_clients/_sql/_dataset_client.py | 24 +-- .../_sql/_key_value_store_client.py | 19 +-- .../_sql/_request_queue_client.py | 76 ++------- .../storage_clients/_sql/_storage_client.py | 8 - .../_sql/test_sql_dataset_client.py | 3 - .../_sql/test_sql_kvs_client.py | 4 - .../_sql/test_sql_rq_client.py | 4 - 8 files changed, 67 insertions(+), 218 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py index 1d98873595..e71ddeee7d 100644 --- a/src/crawlee/storage_clients/_sql/_client_mixin.py +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -46,7 +46,6 @@ class MetadataUpdateParams(TypedDict, total=False): update_accessed_at: NotRequired[bool] update_modified_at: NotRequired[bool] - force: NotRequired[bool] class SqlClientMixin(ABC): @@ -79,11 +78,6 @@ def __init__(self, *, id: str, storage_client: SqlStorageClient) -> None: self._id = id self._storage_client = storage_client - # Time tracking to reduce database writes during frequent operation - self._accessed_at_allow_update_after: datetime | None = None - self._modified_at_allow_update_after: datetime | None = None - self._accessed_modified_update_interval = storage_client.get_accessed_modified_update_interval() - @classmethod async def _open( cls, @@ -135,8 +129,6 @@ async def _open( **extra_metadata_fields, ) client = cls(id=metadata.id, storage_client=storage_client) - client._accessed_at_allow_update_after = now + client._accessed_modified_update_interval - client._modified_at_allow_update_after = now + client._accessed_modified_update_interval session.add(cls._METADATA_TABLE(**metadata.model_dump(), internal_name=internal_name)) return client @@ -276,8 +268,8 @@ async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None: Args: metadata_kwargs: Arguments to pass to _update_metadata. """ - # Force process buffers to ensure metadata is up to date before purging - await self._process_buffers(force=True) + # Process buffers to ensure metadata is up to date before purging + await self._process_buffers() stmt = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.storage_id == self._id) async with self.get_session(with_simple_commit=True) as session: @@ -317,46 +309,6 @@ async def _get_metadata( return metadata_model.model_validate(orm_metadata) - def _default_update_metadata( - self, *, update_accessed_at: bool = False, update_modified_at: bool = False, force: bool = False - ) -> dict[str, Any]: - """Prepare common metadata updates with rate limiting. - - Args: - update_accessed_at: Whether to update accessed_at timestamp. - update_modified_at: Whether to update modified_at timestamp. - force: Whether to force the update regardless of rate limiting. - """ - values_to_set: dict[str, Any] = {} - now = datetime.now(timezone.utc) - - # If the record must be updated (for example, when updating counters), we update timestamps and shift the time. - if force: - if update_modified_at: - values_to_set['modified_at'] = now - self._modified_at_allow_update_after = now + self._accessed_modified_update_interval - if update_accessed_at: - values_to_set['accessed_at'] = now - self._accessed_at_allow_update_after = now + self._accessed_modified_update_interval - - elif update_modified_at and ( - self._modified_at_allow_update_after is None or now >= self._modified_at_allow_update_after - ): - values_to_set['modified_at'] = now - self._modified_at_allow_update_after = now + self._accessed_modified_update_interval - # The record will be updated, we can update `accessed_at` and shift the time. - if update_accessed_at: - values_to_set['accessed_at'] = now - self._accessed_at_allow_update_after = now + self._accessed_modified_update_interval - - elif update_accessed_at and ( - self._accessed_at_allow_update_after is None or now >= self._accessed_at_allow_update_after - ): - values_to_set['accessed_at'] = now - self._accessed_at_allow_update_after = now + self._accessed_modified_update_interval - - return values_to_set - @abstractmethod def _specific_update_metadata(self, **kwargs: Any) -> dict[str, Any]: """Prepare storage-specific metadata updates. @@ -367,55 +319,51 @@ def _specific_update_metadata(self, **kwargs: Any) -> dict[str, Any]: **kwargs: Storage-specific update parameters. """ + @abstractmethod + def _prepare_buffer_data(self, **kwargs: Any) -> dict[str, Any]: + """Prepare storage-specific buffer data. Must be implemented by concrete classes.""" + + @abstractmethod + async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: + """Apply aggregated buffer updates to metadata. Must be implemented by concrete classes. + + Args: + session: Active database session. + max_buffer_id: Maximum buffer record ID to process. + """ + async def _update_metadata( self, session: AsyncSession, *, update_accessed_at: bool = False, update_modified_at: bool = False, - force: bool = False, **kwargs: Any, - ) -> bool: - """Update storage metadata combining common and specific fields. + ) -> None: + """Directly update storage metadata combining common and specific fields. Args: session: Active database session. update_accessed_at: Whether to update accessed_at timestamp. update_modified_at: Whether to update modified_at timestamp. - force: Whether to force the update timestamps regardless of rate limiting. **kwargs: Additional arguments for _specific_update_metadata. - - Returns: - True if any updates were made, False otherwise """ - values_to_set = self._default_update_metadata( - update_accessed_at=update_accessed_at, update_modified_at=update_modified_at, force=force - ) + values_to_set: dict[str, Any] = {} + now = datetime.now(timezone.utc) + + if update_accessed_at: + values_to_set['accessed_at'] = now + + if update_modified_at: + values_to_set['modified_at'] = now values_to_set.update(self._specific_update_metadata(**kwargs)) if values_to_set: - if (stmt := values_to_set.pop('custom_stmt', None)) is None: - stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id) + stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id) stmt = stmt.values(**values_to_set) await session.execute(stmt) - return True - - return False - - @abstractmethod - def _prepare_buffer_data(self, **kwargs: Any) -> dict[str, Any]: - """Prepare storage-specific buffer data. Must be implemented by concrete classes.""" - - @abstractmethod - async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: - """Apply aggregated buffer updates to metadata. Must be implemented by concrete classes. - - Args: - session: Active database session. - max_buffer_id: Maximum buffer record ID to process. - """ async def _add_buffer_record( self, @@ -441,12 +389,11 @@ async def _add_buffer_record( session.add(self._BUFFER_TABLE(**values_to_set)) - async def _try_acquire_buffer_lock(self, session: AsyncSession, *, force: bool = False) -> bool: + async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool: """Try to acquire buffer processing lock for 200ms. Args: session: Active database session. - force: If True, forcefully acquire lock regardless of current state. Returns: True if lock was acquired, False if already locked by another process. @@ -462,6 +409,7 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession, *, force: bool = self._METADATA_TABLE.id == self._id, (self._METADATA_TABLE.buffer_locked_until.is_(None)) | (self._METADATA_TABLE.buffer_locked_until < now), + select(self._BUFFER_TABLE.id).where(self._BUFFER_TABLE.storage_id == self._id).exists(), ) .with_for_update(skip_locked=True) ) @@ -472,24 +420,16 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession, *, force: bool = # Either conditions not met OR row is locked by another process return False - if force: - # Force acquire lock regardless of current state - update_stmt = ( - update(self._METADATA_TABLE) - .where(self._METADATA_TABLE.id == self._id) - .values(buffer_locked_until=lock_until) - ) - else: - # Acquire lock only if not currently locked or lock has expired - update_stmt = ( - update(self._METADATA_TABLE) - .where( - self._METADATA_TABLE.id == self._id, - (self._METADATA_TABLE.buffer_locked_until.is_(None)) - | (self._METADATA_TABLE.buffer_locked_until < now), - ) - .values(buffer_locked_until=lock_until) + # Acquire lock only if not currently locked or lock has expired + update_stmt = ( + update(self._METADATA_TABLE) + .where( + self._METADATA_TABLE.id == self._id, + (self._METADATA_TABLE.buffer_locked_until.is_(None)) | (self._METADATA_TABLE.buffer_locked_until < now), + select(self._BUFFER_TABLE.id).where(self._BUFFER_TABLE.storage_id == self._id).exists(), ) + .values(buffer_locked_until=lock_until) + ) result = await session.execute(update_stmt) result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result @@ -529,15 +469,11 @@ async def _has_pending_buffer_updates(self, session: AsyncSession) -> bool: # Any non-NULL value means there are pending updates return locked_until is not None - async def _process_buffers(self, *, force: bool = False) -> None: - """Process pending buffer updates and apply them to metadata. - - Args: - force: If True, forcefully acquire lock and process buffers. - """ + async def _process_buffers(self) -> None: + """Process pending buffer updates and apply them to metadata.""" async with self.get_session(with_simple_commit=True) as session: # Try to acquire buffer processing lock - if not await self._try_acquire_buffer_lock(session, force=force): + if not await self._try_acquire_buffer_lock(session): # Another process is currently processing buffers or lock acquisition failed return @@ -551,7 +487,8 @@ async def _process_buffers(self, *, force: bool = False) -> None: max_buffer_id = result.scalar() if max_buffer_id is None: - # No buffer records to process + # No buffer records to process. Release the lock and exit. + await self._release_buffer_lock(session) return # Apply aggregated buffer updates to metadata using only records <= max_buffer_id diff --git a/src/crawlee/storage_clients/_sql/_dataset_client.py b/src/crawlee/storage_clients/_sql/_dataset_client.py index b2d4feb6cf..1552190556 100644 --- a/src/crawlee/storage_clients/_sql/_dataset_client.py +++ b/src/crawlee/storage_clients/_sql/_dataset_client.py @@ -272,13 +272,14 @@ def _prepare_get_stmt( return stmt.offset(offset).limit(limit) + @override def _specific_update_metadata( self, new_item_count: int | None = None, delta_item_count: int | None = None, **_kwargs: dict[str, Any], ) -> dict[str, Any]: - """Update the dataset metadata in the database. + """Directly update the dataset metadata in the database. Args: session: The SQLAlchemy AsyncSession to use for the update. @@ -295,18 +296,12 @@ def _specific_update_metadata( return values_to_set + @override def _prepare_buffer_data(self, delta_item_count: int | None = None, **_kwargs: Any) -> dict[str, Any]: - """Prepare key-value store specific buffer data. - - For KeyValueStore, we don't have specific metadata fields to track in buffer, - so we just return empty dict. The base buffer will handle accessed_at/modified_at. + """Prepare dataset specific buffer data. Args: delta_item_count: If provided, add this value to the current item count. - **kwargs: Additional arguments (unused for key-value store). - - Returns: - Empty dict as key-value stores don't have specific metadata fields. """ buffer_data = {} if delta_item_count is not None: @@ -314,17 +309,8 @@ def _prepare_buffer_data(self, delta_item_count: int | None = None, **_kwargs: A return buffer_data + @override async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: - """Apply aggregated buffer updates to key-value store metadata. - - For KeyValueStore, we aggregate accessed_at and modified_at timestamps - from buffer records and apply them to the metadata. - - Args: - session: Active database session. - max_buffer_id: Maximum buffer record ID to process (inclusive). - """ - # Get aggregated timestamps from buffer records aggregation_stmt = select( sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'), sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'), diff --git a/src/crawlee/storage_clients/_sql/_key_value_store_client.py b/src/crawlee/storage_clients/_sql/_key_value_store_client.py index d9b3ca04ea..c63ed418a2 100644 --- a/src/crawlee/storage_clients/_sql/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_sql/_key_value_store_client.py @@ -288,34 +288,21 @@ async def record_exists(self, *, key: str) -> bool: async def get_public_url(self, *, key: str) -> str: raise NotImplementedError('Public URLs are not supported for SQL key-value stores.') + @override def _specific_update_metadata(self, **_kwargs: dict[str, Any]) -> dict[str, Any]: return {} + @override def _prepare_buffer_data(self, **_kwargs: Any) -> dict[str, Any]: """Prepare key-value store specific buffer data. For KeyValueStore, we don't have specific metadata fields to track in buffer, so we just return empty dict. The base buffer will handle accessed_at/modified_at. - - Args: - **kwargs: Additional arguments (unused for key-value store). - - Returns: - Empty dict as key-value stores don't have specific metadata fields. """ return {} + @override async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: - """Apply aggregated buffer updates to key-value store metadata. - - For KeyValueStore, we aggregate accessed_at and modified_at timestamps - from buffer records and apply them to the metadata. - - Args: - session: Active database session. - max_buffer_id: Maximum buffer record ID to process (inclusive). - """ - # Get aggregated timestamps from buffer records aggregation_stmt = select( sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'), sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'), diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index 35fa59e9df..2208403f48 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -189,7 +189,6 @@ async def purge(self) -> None: update_accessed_at=True, update_modified_at=True, new_pending_request_count=0, - force=True, ) ) @@ -618,6 +617,7 @@ async def _get_state(self, session: AsyncSession) -> RequestQueueStateDb: raise RuntimeError(f'Failed to create or retrieve state for queue {self._id}') return orm_state + @override def _specific_update_metadata( self, new_handled_request_count: int | None = None, @@ -626,11 +626,10 @@ def _specific_update_metadata( delta_handled_request_count: int | None = None, delta_pending_request_count: int | None = None, *, - recalculate: bool = False, update_had_multiple_clients: bool = False, **_kwargs: dict[str, Any], ) -> dict[str, Any]: - """Update the request queue metadata in the database. + """Directly update the request queue metadata in the database. Args: session: The SQLAlchemy session to use for database operations. @@ -639,7 +638,6 @@ def _specific_update_metadata( new_total_request_count: If provided, update the total_request_count to this value. delta_handled_request_count: If provided, add this value to the handled_request_count. delta_pending_request_count: If provided, add this value to the pending_request_count. - recalculate: If True, recalculate the pending_request_count, and total_request_count on request table. update_had_multiple_clients: If True, set had_multiple_clients to True. """ values_to_set: dict[str, Any] = {} @@ -649,49 +647,13 @@ def _specific_update_metadata( if new_handled_request_count is not None: values_to_set['handled_request_count'] = new_handled_request_count - elif delta_handled_request_count is not None: - values_to_set['handled_request_count'] = ( - self._METADATA_TABLE.handled_request_count + delta_handled_request_count - ) if new_pending_request_count is not None: values_to_set['pending_request_count'] = new_pending_request_count - elif delta_pending_request_count is not None: - values_to_set['pending_request_count'] = ( - self._METADATA_TABLE.pending_request_count + delta_pending_request_count - ) if new_total_request_count is not None: values_to_set['total_request_count'] = new_total_request_count - if recalculate: - stmt = ( - update(self._METADATA_TABLE) - .where(self._METADATA_TABLE.request_queue_id == self._id) - .values( - pending_request_count=( - select(func.count()) - .select_from(self._ITEM_TABLE) - .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled.is_(False)) - .scalar_subquery() - ), - total_request_count=( - select(func.count()) - .select_from(self._ITEM_TABLE) - .where(self._ITEM_TABLE.request_queue_id == self._id) - .scalar_subquery() - ), - handled_request_count=( - select(func.count()) - .select_from(self._ITEM_TABLE) - .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled.is_(True)) - .scalar_subquery() - ), - ) - ) - - values_to_set['custom_stmt'] = stmt - return values_to_set @staticmethod @@ -709,6 +671,7 @@ def _get_int_id_from_unique_key(unique_key: str) -> int: name_length = 15 return int(hashed_key[:name_length], 16) + @override def _prepare_buffer_data( self, delta_handled_request_count: int | None = None, @@ -718,20 +681,13 @@ def _prepare_buffer_data( recalculate: bool = False, **_kwargs: Any, ) -> dict[str, Any]: - """Prepare key-value store specific buffer data. - - For KeyValueStore, we don't have specific metadata fields to track in buffer, - so we just return empty dict. The base buffer will handle accessed_at/modified_at. + """Prepare request queue specific buffer data. Args: delta_handled_request_count: If provided, add this value to the handled_request_count. delta_pending_request_count: If provided, add this value to the pending_request_count. delta_total_request_count: If provided, add this value to the total_request_count. recalculate: If True, recalculate the pending_request_count, and total_request_count on request table. - **kwargs: Additional arguments (unused for key-value store). - - Returns: - Empty dict as key-value stores don't have specific metadata fields. """ buffer_data: dict[str, Any] = { 'client_id': self.client_key, @@ -751,18 +707,8 @@ def _prepare_buffer_data( return buffer_data + @override async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) -> None: - """Apply aggregated buffer updates to key-value store metadata. - - For KeyValueStore, we aggregate accessed_at and modified_at timestamps - from buffer records and apply them to the metadata. - - Args: - session: Active database session. - max_buffer_id: Maximum buffer record ID to process (inclusive). - """ - # Get aggregated timestamps from buffer records - aggregations: list[ColumnElement[Any]] = [ sql_func.max(self._BUFFER_TABLE.accessed_at).label('max_accessed_at'), sql_func.max(self._BUFFER_TABLE.modified_at).label('max_modified_at'), @@ -809,6 +755,18 @@ async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled == False) # noqa: E712 .scalar_subquery() ) + values_to_update['total_request_count'] = ( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_queue_id == self._id) + .scalar_subquery() + ) + values_to_update['handled_request_count'] = ( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled == True) # noqa: E712 + .scalar_subquery() + ) else: if row.delta_handled_count: values_to_update['handled_request_count'] = ( diff --git a/src/crawlee/storage_clients/_sql/_storage_client.py b/src/crawlee/storage_clients/_sql/_storage_client.py index 57607d1f74..36435bea98 100644 --- a/src/crawlee/storage_clients/_sql/_storage_client.py +++ b/src/crawlee/storage_clients/_sql/_storage_client.py @@ -1,7 +1,6 @@ from __future__ import annotations import warnings -from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING @@ -67,9 +66,6 @@ def __init__( self._initialized = False self.session_maker: None | async_sessionmaker[AsyncSession] = None - # Minimum interval to reduce database load from frequent concurrent metadata updates - self._accessed_modified_update_interval = timedelta(seconds=1) - # Flag needed to apply optimizations only for default database self._default_flag = self._engine is None and self._connection_string is None self._dialect_name: str | None = None @@ -105,10 +101,6 @@ def get_dialect_name(self) -> str | None: """Get the database dialect name.""" return self._dialect_name - def get_accessed_modified_update_interval(self) -> timedelta: - """Get the interval for accessed and modified updates.""" - return self._accessed_modified_update_interval - async def initialize(self, configuration: Configuration) -> None: """Initialize the database schema. diff --git a/tests/unit/storage_clients/_sql/test_sql_dataset_client.py b/tests/unit/storage_clients/_sql/test_sql_dataset_client.py index 6b94e146d3..2cb1aec7d1 100644 --- a/tests/unit/storage_clients/_sql/test_sql_dataset_client.py +++ b/tests/unit/storage_clients/_sql/test_sql_dataset_client.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -from datetime import timedelta from typing import TYPE_CHECKING import pytest @@ -38,11 +37,9 @@ def get_tables(sync_conn: Connection) -> list[str]: @pytest.fixture async def dataset_client( configuration: Configuration, - monkeypatch: pytest.MonkeyPatch, ) -> AsyncGenerator[SqlDatasetClient, None]: """A fixture for a SQL dataset client.""" async with SqlStorageClient() as storage_client: - monkeypatch.setattr(storage_client, '_accessed_modified_update_interval', timedelta(seconds=0)) client = await storage_client.create_dataset_client( name='test-dataset', configuration=configuration, diff --git a/tests/unit/storage_clients/_sql/test_sql_kvs_client.py b/tests/unit/storage_clients/_sql/test_sql_kvs_client.py index 6bd02df750..ac32b78ee3 100644 --- a/tests/unit/storage_clients/_sql/test_sql_kvs_client.py +++ b/tests/unit/storage_clients/_sql/test_sql_kvs_client.py @@ -2,7 +2,6 @@ import asyncio import json -from datetime import timedelta from typing import TYPE_CHECKING import pytest @@ -34,16 +33,13 @@ def configuration(tmp_path: Path) -> Configuration: @pytest.fixture async def kvs_client( configuration: Configuration, - monkeypatch: pytest.MonkeyPatch, ) -> AsyncGenerator[SqlKeyValueStoreClient, None]: """A fixture for a SQL key-value store client.""" async with SqlStorageClient() as storage_client: - monkeypatch.setattr(storage_client, '_accessed_modified_update_interval', timedelta(seconds=0)) client = await storage_client.create_kvs_client( name='test-kvs', configuration=configuration, ) - monkeypatch.setattr(client, '_accessed_modified_update_interval', timedelta(seconds=0)) yield client await client.drop() diff --git a/tests/unit/storage_clients/_sql/test_sql_rq_client.py b/tests/unit/storage_clients/_sql/test_sql_rq_client.py index 8885f3cf88..d80a92db41 100644 --- a/tests/unit/storage_clients/_sql/test_sql_rq_client.py +++ b/tests/unit/storage_clients/_sql/test_sql_rq_client.py @@ -2,7 +2,6 @@ import asyncio import json -from datetime import timedelta from typing import TYPE_CHECKING import pytest @@ -35,16 +34,13 @@ def configuration(tmp_path: Path) -> Configuration: @pytest.fixture async def rq_client( configuration: Configuration, - monkeypatch: pytest.MonkeyPatch, ) -> AsyncGenerator[SqlRequestQueueClient, None]: """A fixture for a SQL request queue client.""" async with SqlStorageClient() as storage_client: - monkeypatch.setattr(storage_client, '_accessed_modified_update_interval', timedelta(seconds=0)) client = await storage_client.create_rq_client( name='test-request-queue', configuration=configuration, ) - monkeypatch.setattr(client, '_accessed_modified_update_interval', timedelta(seconds=0)) yield client await client.drop() From 7848a21b765cd5ba1d607ffe7aa23bb8a7596dc6 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 11 Nov 2025 11:56:42 +0000 Subject: [PATCH 03/11] index optimization --- src/crawlee/storage_clients/_sql/_db_models.py | 9 +++++++-- .../storage_clients/_sql/_request_queue_client.py | 13 +++++++++++++ src/crawlee/storage_clients/_sql/_storage_client.py | 10 ++++++---- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_db_models.py b/src/crawlee/storage_clients/_sql/_db_models.py index cabdb80e7c..2ada223ab4 100644 --- a/src/crawlee/storage_clients/_sql/_db_models.py +++ b/src/crawlee/storage_clients/_sql/_db_models.py @@ -233,6 +233,11 @@ class RequestDb(Base): 'sequence_number', postgresql_where=text('is_handled = false'), ), + Index( + 'idx_count_aggregate', + 'request_queue_id', + 'is_handled', + ), ) request_id: Mapped[int] = mapped_column(BigInteger, primary_key=True) @@ -350,6 +355,8 @@ class RequestQueueMetadataBufferDb(MetadataBufferDb, Base): __tablename__ = 'request_queue_metadata_buffer' + __table_args__ = (Index('idx_rq_client', 'request_queue_id', 'client_id'),) + request_queue_id: Mapped[str] = mapped_column( String(20), ForeignKey('request_queues.request_queue_id', ondelete='CASCADE'), nullable=False, index=True ) @@ -374,7 +381,5 @@ class RequestQueueMetadataBufferDb(MetadataBufferDb, Base): # Relationship back to request queue metadata queue: Mapped[RequestQueueMetadataDb] = relationship(back_populates='buffer') - __table_args__ = (Index('idx_rq_client', 'request_queue_id', 'client_id'),) - storage_id = synonym('request_queue_id') """Alias for request_queue_id to match SqlClientMixin expectations.""" diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index 2208403f48..fae100f20f 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -581,6 +581,19 @@ async def is_empty(self) -> bool: async with self.get_session(with_simple_commit=True) as session: # If there are no pending requests, check if there are any buffered updates if metadata.pending_request_count == 0: + # Check for active buffer lock (indicates pending buffer processing) + buffer_lock_stmt = select(self._METADATA_TABLE.buffer_locked_until).where( + self._METADATA_TABLE.id == self._id + ) + buffer_lock_result = await session.execute(buffer_lock_stmt) + buffer_locked_until = buffer_lock_result.scalar() + + # If buffer is locked, there are pending updates being processed + if buffer_locked_until is not None: + await self._add_buffer_record(session) + return False + + # Check if there are any buffered updates that might change the pending count buffer_check_stmt = select( exists().where( (self._BUFFER_TABLE.storage_id == self._id) diff --git a/src/crawlee/storage_clients/_sql/_storage_client.py b/src/crawlee/storage_clients/_sql/_storage_client.py index 36435bea98..77a6813242 100644 --- a/src/crawlee/storage_clients/_sql/_storage_client.py +++ b/src/crawlee/storage_clients/_sql/_storage_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import warnings +from logging import getLogger from pathlib import Path from typing import TYPE_CHECKING @@ -24,6 +25,9 @@ from sqlalchemy.ext.asyncio import AsyncSession +logger = getLogger(__name__) + + @docs_group('Storage clients') class SqlStorageClient(StorageClient): """SQL implementation of the storage client. @@ -131,9 +135,7 @@ async def initialize(self, configuration: Configuration) -> None: await conn.execute(text('PRAGMA mmap_size=268435456')) # 256MB memory mapping await conn.execute(text('PRAGMA foreign_keys=ON')) # Enforce constraints await conn.execute(text('PRAGMA busy_timeout=30000')) # 30s busy timeout - await conn.run_sync(Base.metadata.create_all, checkfirst=True) - from crawlee import __version__ # Noqa: PLC0415 db_version = (await conn.execute(select(VersionDb))).scalar_one_or_none() @@ -149,9 +151,9 @@ async def initialize(self, configuration: Configuration) -> None: ) elif not db_version: await conn.execute(insert(VersionDb).values(version=__version__)) - - except (IntegrityError, OperationalError): + except (IntegrityError, OperationalError) as e: await conn.rollback() + raise RuntimeError('Error initializing database') from e self._initialized = True From 3665a4c7b6e8a8007f83859bc7a17f8e3c67a05f Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 11 Nov 2025 13:47:46 +0000 Subject: [PATCH 04/11] fix --- src/crawlee/storage_clients/_sql/_storage_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_storage_client.py b/src/crawlee/storage_clients/_sql/_storage_client.py index 77a6813242..8242082a1e 100644 --- a/src/crawlee/storage_clients/_sql/_storage_client.py +++ b/src/crawlee/storage_clients/_sql/_storage_client.py @@ -151,9 +151,8 @@ async def initialize(self, configuration: Configuration) -> None: ) elif not db_version: await conn.execute(insert(VersionDb).values(version=__version__)) - except (IntegrityError, OperationalError) as e: + except (IntegrityError, OperationalError): await conn.rollback() - raise RuntimeError('Error initializing database') from e self._initialized = True From 4e74b7064c239cde700ee0d1fee1cdb2c2fa40bb Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 11 Nov 2025 21:26:26 +0000 Subject: [PATCH 05/11] recalculate only for is_empty --- .../storage_clients/_sql/_request_queue_client.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index fae100f20f..a5f79378b3 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -211,7 +211,7 @@ async def add_batch_of_requests( transaction_processed_requests = [] transaction_processed_requests_unique_keys = set() - metadata_recalculate = False + approximate_new_request = 0 # Deduplicate requests by unique_key upfront unique_requests = {} @@ -263,7 +263,6 @@ async def add_batch_of_requests( state.sequence_counter += 1 insert_values.append(value) - metadata_recalculate = True transaction_processed_requests.append( ProcessedRequest( unique_key=request.unique_key, @@ -341,16 +340,20 @@ async def add_batch_of_requests( update_columns=['sequence_number'], conflict_cols=['request_id', 'request_queue_id'], ) - await session.execute(upsert_stmt) + result = await session.execute(upsert_stmt) else: # If the request already exists in the database, we ignore this request when inserting. insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values) - await session.execute(insert_stmt_with_ignore) + result = await session.execute(insert_stmt_with_ignore) + + result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result + approximate_new_request += result.rowcount await self._add_buffer_record( session, - recalculate=metadata_recalculate, update_modified_at=True, + delta_pending_request_count=approximate_new_request, + delta_total_request_count=approximate_new_request, ) try: From 251253c2d4308a419bb29d8ae4f056776843c9e5 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Fri, 14 Nov 2025 19:47:08 +0000 Subject: [PATCH 06/11] add lenght for all String fields --- src/crawlee/storage_clients/_sql/_db_models.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_db_models.py b/src/crawlee/storage_clients/_sql/_db_models.py index 2ada223ab4..71ce6ddad3 100644 --- a/src/crawlee/storage_clients/_sql/_db_models.py +++ b/src/crawlee/storage_clients/_sql/_db_models.py @@ -52,10 +52,10 @@ class Base(DeclarativeBase): class StorageMetadataDb: """Base database model for storage metadata.""" - internal_name: Mapped[str] = mapped_column(String, nullable=False, index=True, unique=True) + internal_name: Mapped[str] = mapped_column(String(255), nullable=False, index=True, unique=True) """Internal unique name for a storage instance based on a name or alias.""" - name: Mapped[str | None] = mapped_column(String, nullable=True, unique=True) + name: Mapped[str | None] = mapped_column(String(255), nullable=True, unique=True) """Human-readable name. None becomes 'default' in database to enforce uniqueness.""" accessed_at: Mapped[datetime] = mapped_column(AwareDateTime, nullable=False) @@ -248,7 +248,7 @@ class RequestDb(Base): ) """Foreign key to metadata request queue record.""" - data: Mapped[str] = mapped_column(String, nullable=False) + data: Mapped[str] = mapped_column(String(1000), nullable=False) """JSON-serialized Request object.""" sequence_number: Mapped[int] = mapped_column(Integer, nullable=False) From c8054b9538f076eea96ec66c1374fd04274d3c54 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Sun, 16 Nov 2025 22:10:05 +0000 Subject: [PATCH 07/11] consistent use `_update_metadata` --- .../storage_clients/_sql/_client_mixin.py | 30 ++-- .../storage_clients/_sql/_dataset_client.py | 31 ++-- .../_sql/_key_value_store_client.py | 27 ++-- .../_sql/_request_queue_client.py | 134 ++++++++++-------- 4 files changed, 117 insertions(+), 105 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py index e71ddeee7d..a599e72f66 100644 --- a/src/crawlee/storage_clients/_sql/_client_mixin.py +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -44,8 +44,8 @@ class MetadataUpdateParams(TypedDict, total=False): """Parameters for updating metadata.""" - update_accessed_at: NotRequired[bool] - update_modified_at: NotRequired[bool] + accessed_at: NotRequired[datetime] + modified_at: NotRequired[datetime] class SqlClientMixin(ABC): @@ -271,9 +271,11 @@ async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None: # Process buffers to ensure metadata is up to date before purging await self._process_buffers() - stmt = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.storage_id == self._id) + stmt_records = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.storage_id == self._id) + stmt_buffers = delete(self._BUFFER_TABLE).where(self._BUFFER_TABLE.storage_id == self._id) async with self.get_session(with_simple_commit=True) as session: - await session.execute(stmt) + await session.execute(stmt_records) + await session.execute(stmt_buffers) await self._update_metadata(session, **metadata_kwargs) async def _drop(self) -> None: @@ -336,31 +338,31 @@ async def _update_metadata( self, session: AsyncSession, *, - update_accessed_at: bool = False, - update_modified_at: bool = False, + accessed_at: datetime | None = None, + modified_at: datetime | None = None, **kwargs: Any, ) -> None: """Directly update storage metadata combining common and specific fields. Args: session: Active database session. - update_accessed_at: Whether to update accessed_at timestamp. - update_modified_at: Whether to update modified_at timestamp. + accessed_at: Datetime to set as accessed_at timestamp. + modified_at: Datetime to set as modified_at timestamp. **kwargs: Additional arguments for _specific_update_metadata. """ values_to_set: dict[str, Any] = {} - now = datetime.now(timezone.utc) - if update_accessed_at: - values_to_set['accessed_at'] = now + if accessed_at is not None: + values_to_set['accessed_at'] = accessed_at - if update_modified_at: - values_to_set['modified_at'] = now + if modified_at is not None: + values_to_set['modified_at'] = modified_at values_to_set.update(self._specific_update_metadata(**kwargs)) if values_to_set: - stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id) + if (stmt := values_to_set.pop('custom_stmt', None)) is None: + stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id) stmt = stmt.values(**values_to_set) await session.execute(stmt) diff --git a/src/crawlee/storage_clients/_sql/_dataset_client.py b/src/crawlee/storage_clients/_sql/_dataset_client.py index 1552190556..1ea9d6b7cb 100644 --- a/src/crawlee/storage_clients/_sql/_dataset_client.py +++ b/src/crawlee/storage_clients/_sql/_dataset_client.py @@ -1,9 +1,10 @@ from __future__ import annotations +from datetime import datetime, timezone from logging import getLogger from typing import TYPE_CHECKING, Any -from sqlalchemy import Select, insert, select, update +from sqlalchemy import Select, insert, select from sqlalchemy import func as sql_func from typing_extensions import Self, override @@ -42,6 +43,7 @@ class SqlDatasetClient(DatasetClient, SqlClientMixin): The dataset data is stored in SQL database tables following the pattern: - `datasets` table: Contains dataset metadata (id, name, timestamps, item_count) - `dataset_records` table: Contains individual items with JSON data and auto-increment ordering + - `dataset_metadata_buffer` table: Buffers metadata updates for performance optimization Items are stored as a JSON object in SQLite and as JSONB in PostgreSQL. These objects must be JSON-serializable. The `item_id` auto-increment primary key ensures insertion order is preserved. @@ -126,11 +128,12 @@ async def purge(self) -> None: Resets item_count to 0 and deletes all records from dataset_records table. """ + now = datetime.now(timezone.utc) await self._purge( metadata_kwargs=_DatasetMetadataUpdateParams( new_item_count=0, - update_accessed_at=True, - update_modified_at=True, + accessed_at=now, + modified_at=now, ) ) @@ -323,17 +326,11 @@ async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) if not row: return - # Prepare updates for metadata - values_to_update = { - 'accessed_at': row.max_accessed_at, - } - - if row.max_modified_at: - values_to_update['modified_at'] = row.max_modified_at - - if row.delta_item_count: - values_to_update['item_count'] = self._METADATA_TABLE.item_count + row.delta_item_count - - update_stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(**values_to_update) - - await session.execute(update_stmt) + await self._update_metadata( + session, + **_DatasetMetadataUpdateParams( + accessed_at=row.max_accessed_at, + modified_at=row.max_modified_at, + delta_item_count=row.delta_item_count, + ), + ) diff --git a/src/crawlee/storage_clients/_sql/_key_value_store_client.py b/src/crawlee/storage_clients/_sql/_key_value_store_client.py index c63ed418a2..3a06a98cc2 100644 --- a/src/crawlee/storage_clients/_sql/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_sql/_key_value_store_client.py @@ -1,10 +1,11 @@ from __future__ import annotations import json +from datetime import datetime, timezone from logging import getLogger from typing import TYPE_CHECKING, Any, cast -from sqlalchemy import CursorResult, delete, select, update +from sqlalchemy import CursorResult, delete, select from sqlalchemy import func as sql_func from typing_extensions import Self, override @@ -41,6 +42,7 @@ class SqlKeyValueStoreClient(KeyValueStoreClient, SqlClientMixin): - `key_value_stores` table: Contains store metadata (id, name, timestamps) - `key_value_store_records` table: Contains individual key-value pairs with binary value storage, content type, and size information + - `key_value_store_metadata_buffer` table: Buffers metadata updates for performance optimization Values are serialized based on their type: JSON objects are stored as formatted JSON, text values as UTF-8 encoded strings, and binary data as-is in the `LargeBinary` column. @@ -134,7 +136,8 @@ async def purge(self) -> None: Remove all records from key_value_store_records table. """ - await self._purge(metadata_kwargs=MetadataUpdateParams(update_accessed_at=True, update_modified_at=True)) + now = datetime.now(timezone.utc) + await self._purge(metadata_kwargs=MetadataUpdateParams(accessed_at=now, modified_at=now)) @override async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: @@ -269,7 +272,7 @@ async def iterate_keys( size=row.size, ) - await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) + await self._add_buffer_record(session) @override async def record_exists(self, *, key: str) -> bool: @@ -314,14 +317,10 @@ async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) if not row: return - # Prepare updates for metadata - values_to_update = { - 'accessed_at': row.max_accessed_at, - } - - if row.max_modified_at: - values_to_update['modified_at'] = row.max_modified_at - - update_stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(**values_to_update) - - await session.execute(update_stmt) + await self._update_metadata( + session, + **MetadataUpdateParams( + accessed_at=row.max_accessed_at, + modified_at=row.max_modified_at, + ), + ) diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index a5f79378b3..9930ea9e33 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -46,6 +46,7 @@ class _QueueMetadataUpdateParams(MetadataUpdateParams): new_total_request_count: NotRequired[int] delta_handled_request_count: NotRequired[int] delta_pending_request_count: NotRequired[int] + delta_total_request_count: NotRequired[int] recalculate: NotRequired[bool] update_had_multiple_clients: NotRequired[bool] @@ -66,6 +67,7 @@ class SqlRequestQueueClient(RequestQueueClient, SqlClientMixin): - `request_queue_records` table: Contains individual requests with JSON data, unique keys for deduplication, sequence numbers for ordering, and processing status flags - `request_queue_state` table: Maintains counters for sequence numbers to ensure proper ordering of requests. + - `request_queue_metadata_buffer` table: Buffers metadata updates for performance optimization Requests are serialized to JSON for storage and maintain proper ordering through sequence numbers. The implementation provides concurrent access safety through transaction @@ -184,10 +186,11 @@ async def purge(self) -> None: Resets pending_request_count and handled_request_count to 0 and deletes all records from request_queue_records table. """ + now = datetime.now(timezone.utc) await self._purge( metadata_kwargs=_QueueMetadataUpdateParams( - update_accessed_at=True, - update_modified_at=True, + accessed_at=now, + modified_at=now, new_pending_request_count=0, ) ) @@ -362,8 +365,11 @@ async def add_batch_of_requests( except SQLAlchemyError as e: await session.rollback() logger.warning(f'Failed to commit session: {e}') - await self._update_metadata( - session, recalculate=True, update_modified_at=True, update_accessed_at=True, force=True + await self._add_buffer_record( + session, + update_modified_at=True, + delta_pending_request_count=approximate_new_request, + delta_total_request_count=approximate_new_request, ) await session.commit() transaction_processed_requests.clear() @@ -641,11 +647,13 @@ def _specific_update_metadata( new_total_request_count: int | None = None, delta_handled_request_count: int | None = None, delta_pending_request_count: int | None = None, + delta_total_request_count: int | None = None, *, + recalculate: bool = False, update_had_multiple_clients: bool = False, **_kwargs: dict[str, Any], ) -> dict[str, Any]: - """Directly update the request queue metadata in the database. + """Update the request queue metadata in the database. Args: session: The SQLAlchemy session to use for database operations. @@ -654,6 +662,8 @@ def _specific_update_metadata( new_total_request_count: If provided, update the total_request_count to this value. delta_handled_request_count: If provided, add this value to the handled_request_count. delta_pending_request_count: If provided, add this value to the pending_request_count. + delta_total_request_count: If provided, add this value to the total_request_count. + recalculate: If True, recalculate the pending_request_count, and total_request_count on request table. update_had_multiple_clients: If True, set had_multiple_clients to True. """ values_to_set: dict[str, Any] = {} @@ -661,14 +671,55 @@ def _specific_update_metadata( if update_had_multiple_clients: values_to_set['had_multiple_clients'] = True - if new_handled_request_count is not None: - values_to_set['handled_request_count'] = new_handled_request_count + if recalculate: + stmt = ( + update(self._METADATA_TABLE) + .where(self._METADATA_TABLE.request_queue_id == self._id) + .values( + pending_request_count=( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled.is_(False)) + .scalar_subquery() + ), + total_request_count=( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_queue_id == self._id) + .scalar_subquery() + ), + handled_request_count=( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled.is_(True)) + .scalar_subquery() + ), + ) + ) + + values_to_set['custom_stmt'] = stmt + + else: + if new_handled_request_count is not None: + values_to_set['handled_request_count'] = new_handled_request_count + elif delta_handled_request_count is not None: + values_to_set['handled_request_count'] = ( + self._METADATA_TABLE.handled_request_count + delta_handled_request_count + ) - if new_pending_request_count is not None: - values_to_set['pending_request_count'] = new_pending_request_count + if new_pending_request_count is not None: + values_to_set['pending_request_count'] = new_pending_request_count + elif delta_pending_request_count is not None: + values_to_set['pending_request_count'] = ( + self._METADATA_TABLE.pending_request_count + delta_pending_request_count + ) - if new_total_request_count is not None: - values_to_set['total_request_count'] = new_total_request_count + if new_total_request_count is not None: + values_to_set['total_request_count'] = new_total_request_count + elif delta_total_request_count is not None: + values_to_set['total_request_count'] = ( + self._METADATA_TABLE.total_request_count + delta_total_request_count + ) return values_to_set @@ -753,52 +804,15 @@ async def _apply_buffer_updates(self, session: AsyncSession, max_buffer_id: int) if not row: return - # Prepare updates for metadata - values_to_update = { - 'accessed_at': row.max_accessed_at, - } - - if row.max_modified_at: - values_to_update['modified_at'] = row.max_modified_at - - if not self._had_multiple_clients and row.unique_clients_count > 1: - values_to_update['had_multiple_clients'] = True - - if row.need_recalc: - values_to_update['pending_request_count'] = ( - select(func.count()) - .select_from(self._ITEM_TABLE) - .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled == False) # noqa: E712 - .scalar_subquery() - ) - values_to_update['total_request_count'] = ( - select(func.count()) - .select_from(self._ITEM_TABLE) - .where(self._ITEM_TABLE.request_queue_id == self._id) - .scalar_subquery() - ) - values_to_update['handled_request_count'] = ( - select(func.count()) - .select_from(self._ITEM_TABLE) - .where(self._ITEM_TABLE.request_queue_id == self._id, self._ITEM_TABLE.is_handled == True) # noqa: E712 - .scalar_subquery() - ) - else: - if row.delta_handled_count: - values_to_update['handled_request_count'] = ( - self._METADATA_TABLE.handled_request_count + row.delta_handled_count - ) - - if row.delta_pending_count: - values_to_update['pending_request_count'] = ( - self._METADATA_TABLE.pending_request_count + row.delta_pending_count - ) - - if row.delta_total_count: - values_to_update['total_request_count'] = ( - self._METADATA_TABLE.total_request_count + row.delta_total_count - ) - - update_stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id).values(**values_to_update) - - await session.execute(update_stmt) + await self._update_metadata( + session, + **_QueueMetadataUpdateParams( + accessed_at=row.max_accessed_at, + modified_at=row.max_modified_at, + update_had_multiple_clients=not self._had_multiple_clients and row.unique_clients_count > 1, + delta_handled_request_count=row.delta_handled_count, + delta_pending_request_count=row.delta_pending_count, + delta_total_request_count=row.delta_total_count, + recalculate=row.need_recalc, + ), + ) From 83209ce067ed7f556cfd2a29f973e4147881484a Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Sun, 16 Nov 2025 22:12:41 +0000 Subject: [PATCH 08/11] up block time interval --- src/crawlee/storage_clients/_sql/_client_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py index a599e72f66..eb1f043c38 100644 --- a/src/crawlee/storage_clients/_sql/_client_mixin.py +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -71,7 +71,7 @@ class SqlClientMixin(ABC): _CLIENT_TYPE: ClassVar[str] """Human-readable client type for error messages.""" - _BLOCK_BUFFER_TIME = timedelta(milliseconds=200) + _BLOCK_BUFFER_TIME = timedelta(seconds=1) """Time interval that blocks buffer reading to update metadata.""" def __init__(self, *, id: str, storage_client: SqlStorageClient) -> None: From 1ee46da28ec2f5124be40c9d421c6e795dfb71e8 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Sun, 16 Nov 2025 22:43:48 +0000 Subject: [PATCH 09/11] fix --- src/crawlee/storage_clients/_sql/_client_mixin.py | 2 -- src/crawlee/storage_clients/_sql/_request_queue_client.py | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py index eb1f043c38..516e8818d3 100644 --- a/src/crawlee/storage_clients/_sql/_client_mixin.py +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -452,8 +452,6 @@ async def _release_buffer_lock(self, session: AsyncSession) -> None: await session.execute(stmt) - await session.flush() - async def _has_pending_buffer_updates(self, session: AsyncSession) -> bool: """Check if there are pending buffer updates not yet applied to metadata. diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index 9930ea9e33..cba4f18702 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -368,8 +368,7 @@ async def add_batch_of_requests( await self._add_buffer_record( session, update_modified_at=True, - delta_pending_request_count=approximate_new_request, - delta_total_request_count=approximate_new_request, + reclaculate=True, ) await session.commit() transaction_processed_requests.clear() From d4cd262130ed49dbefe51a371be8b331060118fa Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Sun, 16 Nov 2025 22:50:11 +0000 Subject: [PATCH 10/11] up lengrh for data --- src/crawlee/storage_clients/_sql/_db_models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/storage_clients/_sql/_db_models.py b/src/crawlee/storage_clients/_sql/_db_models.py index 71ce6ddad3..00f38f200b 100644 --- a/src/crawlee/storage_clients/_sql/_db_models.py +++ b/src/crawlee/storage_clients/_sql/_db_models.py @@ -248,7 +248,7 @@ class RequestDb(Base): ) """Foreign key to metadata request queue record.""" - data: Mapped[str] = mapped_column(String(1000), nullable=False) + data: Mapped[str] = mapped_column(String(5000), nullable=False) """JSON-serialized Request object.""" sequence_number: Mapped[int] = mapped_column(Integer, nullable=False) From eae231daefefcf4b47a2c5a17314c9c76c63b08d Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Mon, 17 Nov 2025 13:34:43 +0000 Subject: [PATCH 11/11] up docs --- docs/guides/storage_clients.mdx | 33 +++++++++++++++++++ .../storage_clients/_sql/_client_mixin.py | 2 -- .../storage_clients/_sql/_storage_client.py | 7 ++-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/docs/guides/storage_clients.mdx b/docs/guides/storage_clients.mdx index 70c4964192..9e777782f4 100644 --- a/docs/guides/storage_clients.mdx +++ b/docs/guides/storage_clients.mdx @@ -208,6 +208,15 @@ class dataset_records { + data } +class dataset_metadata_buffer { + <> + + id (PK) + + accessed_at + + modified_at + + dataset_id (FK) + + delta_item_count +} + %% ======================== %% Key-Value Store Tables %% ======================== @@ -231,15 +240,25 @@ class key_value_store_records { + size } +class key_value_store_metadata_buffer { + <
> + + id (PK) + + accessed_at + + modified_at + + key_value_store_id (FK) +} + %% ======================== %% Client to Table arrows %% ======================== SqlDatasetClient --> datasets SqlDatasetClient --> dataset_records +SqlDatasetClient --> dataset_metadata_buffer SqlKeyValueStoreClient --> key_value_stores SqlKeyValueStoreClient --> key_value_store_records +SqlKeyValueStoreClient --> key_value_store_metadata_buffer ``` ```mermaid --- @@ -294,6 +313,19 @@ class request_queue_state { + forefront_sequence_counter } +class request_queue_metadata_buffer { + <
> + + id (PK) + + accessed_at + + modified_at + + request_queue_id (FK) + + client_id + + delta_handled_count + + delta_pending_count + + delta_total_count + + need_recalc +} + %% ======================== %% Client to Table arrows %% ======================== @@ -301,6 +333,7 @@ class request_queue_state { SqlRequestQueueClient --> request_queues SqlRequestQueueClient --> request_queue_records SqlRequestQueueClient --> request_queue_state +SqlRequestQueueClient --> request_queue_metadata_buffer ``` Configuration options for the `SqlStorageClient` can be set through environment variables or the `Configuration` class: diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py index 516e8818d3..234a0abdad 100644 --- a/src/crawlee/storage_clients/_sql/_client_mixin.py +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -272,10 +272,8 @@ async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None: await self._process_buffers() stmt_records = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.storage_id == self._id) - stmt_buffers = delete(self._BUFFER_TABLE).where(self._BUFFER_TABLE.storage_id == self._id) async with self.get_session(with_simple_commit=True) as session: await session.execute(stmt_records) - await session.execute(stmt_buffers) await self._update_metadata(session, **metadata_kwargs) async def _drop(self) -> None: diff --git a/src/crawlee/storage_clients/_sql/_storage_client.py b/src/crawlee/storage_clients/_sql/_storage_client.py index ee923ee462..54209d4302 100644 --- a/src/crawlee/storage_clients/_sql/_storage_client.py +++ b/src/crawlee/storage_clients/_sql/_storage_client.py @@ -273,10 +273,9 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine: self._engine = create_async_engine( connection_string, future=True, - pool_size=5, - max_overflow=10, - pool_timeout=30, - pool_recycle=600, + pool_size=10, + max_overflow=50, + pool_timeout=60, pool_pre_ping=True, echo=False, connect_args={'timeout': 30},