Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4fd17d4
MSC4140: finalised delayed events, and more
AndrewFerr Oct 10, 2025
bc0e1a3
Add changelog
AndrewFerr Oct 10, 2025
4a7e78c
Work around Postgres-only error
AndrewFerr Oct 10, 2025
99f8501
Rename experimental config per suggestion
AndrewFerr Oct 31, 2025
96ed87a
Add description comment to new table
AndrewFerr Oct 31, 2025
2ec54e4
Merge finalised_delayed_events into delayed_events
AndrewFerr Nov 1, 2025
048508c
Merge with 'develop': PEP585 and always RETURNING
AndrewFerr Nov 1, 2025
095eda4
Merge with 'develop': Python 3.14 support
AndrewFerr Nov 3, 2025
5cd80ed
Add finalised_ts index in background
AndrewFerr Nov 3, 2025
11c79c2
Include DelayedEventsStore in migrated DB stores
AndrewFerr Nov 3, 2025
d2aba30
Fix error code string
AndrewFerr Nov 3, 2025
e5bef01
Run UPDATE in transaction, and check rowcount
AndrewFerr Nov 3, 2025
6a5369b
Merge with 'develop': PEP604
AndrewFerr Nov 6, 2025
9a345ce
Don't fetch after UPDATE queries
AndrewFerr Nov 7, 2025
98fa5aa
Add missing return type annotation
AndrewFerr Nov 7, 2025
25ca333
Update schema change comment
AndrewFerr Nov 7, 2025
e0c412d
Remove mistaken WHERE condition
AndrewFerr Nov 12, 2025
60549ab
Add ordering info to background update
AndrewFerr Nov 13, 2025
f1383cc
Merge with 'develop' before #19152
AndrewFerr Nov 14, 2025
274d1cf
Merge with #19152
AndrewFerr Nov 14, 2025
9444a71
Bump schema change number/order
AndrewFerr Nov 14, 2025
4de1de6
Fix mistake in DELETE query
AndrewFerr Nov 14, 2025
151e820
Merge with 'develop'
AndrewFerr Nov 14, 2025
b36becc
Return most recent finalised events first
AndrewFerr Nov 14, 2025
c9eb467
Removed leading "finalised_" from object fields
AndrewFerr Nov 14, 2025
7497cb5
Raise errors on manual send of delayed event
AndrewFerr Nov 24, 2025
ded80f4
Merge with 'develop'
AndrewFerr Nov 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19038.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add more support for MSC4140, namely the ability to inspect sent, cancelled, or failed delayed events, aka "finalised" delayed events.
Copy link
Contributor

@MadLittleMods MadLittleMods Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own reference, what is delayed events being used for?

I thought this was related to VoIP stuff (calls) and the new meta is with sticky events.

Are delayed events going to be deprecated in favor of sticky events?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sticky events are replacing not delayed events, but "owned state" i.e. MSC3757 / MSC3779.

Delayed events are still going to be used by MatrixRTC for scheduling cancellable "leave" events for disconnected clients.

https://github.com/matrix-org/matrix-spec-proposals/blob/toger5/matrixRTC/proposals/4143-matrix-rtc.md#dependencies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there Complement changes to go along with this?

I see some TestDelayedEvents Complement tests that are failing: https://github.com/element-hq/synapse/actions/runs/18411628889/job/52465384686?pr=19038

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there Complement changes to go along with this?

No, not yet. I'll add some soon.

I see some TestDelayedEvents Complement tests that are failing

That's an unrelated failure, which has been flaky for a frustratingly long time. Maybe now is a good time to try to tackle it again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem flaky. It's failed all 3 times for both SQLite and Postgres. And it's only TestDelayedEvents

15 changes: 15 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,21 @@ def read_config(
# MSC4133: Custom profile fields
self.msc4133_enabled: bool = experimental.get("msc4133_enabled", False)

# MSC4140: How many delayed events a user is allowed to have scheduled at a time.
self.msc4140_max_delayed_events_per_user = experimental.get(
"msc4140_max_delayed_events_per_user", 100
)

# MSC4140: How long to keep finalised delayed events in the database before deleting them.
self.msc4140_finalised_retention_period = self.parse_duration(
config.get("msc4140_finalised_retention_period", "7d")
)

# MSC4140: How many finalised delayed events to keep per user before deleting them.
self.msc4140_finalised_per_user_retention_limit = experimental.get(
"msc4140_finalised_per_user_retention_limit", 1000
)

# MSC4143: Matrix RTC Transport using Livekit Backend
self.msc4143_enabled: bool = experimental.get("msc4143_enabled", False)

Expand Down
111 changes: 91 additions & 20 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.errors import ShadowBanError, SynapseError, cs_error
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.delayed_events import (
ReplicationAddedDelayedEventRestServlet,
)
Expand All @@ -42,6 +43,7 @@
UserID,
create_requester,
)
from synapse.util.constants import MILLISECONDS_PER_SECOND, ONE_MINUTE_SECONDS
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
Expand Down Expand Up @@ -124,14 +126,20 @@ async def _schedule_db_events() -> None:
else:
self._repl_client = ReplicationAddedDelayedEventRestServlet.make_client(hs)

if hs.config.worker.run_background_tasks:
self._clock.looping_call(
self._prune_finalised_events,
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
)

@property
def _is_master(self) -> bool:
return self._repl_client is None

def notify_new_event(self) -> None:
"""
Called when there may be more state event deltas to process,
which should cancel pending delayed events for the same state.
which should cancel scheduled delayed events for the same state.
"""
if self._event_processing:
return
Expand All @@ -155,8 +163,7 @@ async def _unsafe_process_new_event(self) -> None:
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
if not await self._store.has_scheduled_delayed_events():
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
Expand Down Expand Up @@ -227,7 +234,7 @@ async def _unsafe_process_new_event(self) -> None:

async def _handle_state_deltas(self, deltas: list[StateDelta]) -> None:
"""
Process current state deltas to cancel other users' pending delayed events
Process current state deltas to cancel other users' scheduled delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
Expand Down Expand Up @@ -315,11 +322,20 @@ async def _handle_state_deltas(self, deltas: list[StateDelta]) -> None:
if sender.domain == self._config.server.server_name
else ""
),
finalised_ts=self._get_current_ts(),
)

if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)

@wrap_as_background_process("_prune_finalised_events")
async def _prune_finalised_events(self) -> None:
await self._store.prune_finalised_delayed_events(
self._get_current_ts(),
self.hs.config.experimental.msc4140_finalised_retention_period,
self.hs.config.experimental.msc4140_finalised_per_user_retention_limit,
)

async def add(
self,
requester: Requester,
Expand Down Expand Up @@ -379,6 +395,7 @@ async def add(
origin_server_ts=origin_server_ts,
content=content,
delay=delay,
limit=self.hs.config.experimental.msc4140_max_delayed_events_per_user,
)

if self._repl_client is not None:
Expand Down Expand Up @@ -411,7 +428,9 @@ async def cancel(self, request: SynapseRequest, delay_id: str) -> None:
)
await make_deferred_yieldable(self._initialized_from_db)

next_send_ts = await self._store.cancel_delayed_event(delay_id)
next_send_ts = await self._store.cancel_delayed_event(
delay_id, self._get_current_ts()
)

if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)
Expand Down Expand Up @@ -454,7 +473,8 @@ async def send(self, request: SynapseRequest, delay_id: str) -> None:
if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)

await self._send_event(event)
if event:
await self._send_event(event, False)

async def _send_on_timeout(self) -> None:
self._next_delayed_event_call = None
Expand All @@ -479,17 +499,19 @@ async def _send_events(self, events: list[DelayedEventDetails]) -> None:
state_info = None
try:
# TODO: send in background if message event or non-conflicting state event
await self._send_event(event)
finalised_ts = await self._send_event(event, True)
if state_info is not None:
sent_state.add(state_info)
except Exception:
logger.exception("Failed to send delayed event")
finalised_ts = self._get_current_ts()

for room_id, event_type, state_key in sent_state:
await self._store.delete_processed_delayed_state_events(
await self._store.finalise_processed_delayed_state_events(
room_id=str(room_id),
event_type=event_type,
state_key=state_key,
finalised_ts=finalised_ts,
)

def _schedule_next_at_or_none(self, next_send_ts: Timestamp | None) -> None:
Expand All @@ -513,21 +535,49 @@ def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
else:
self._next_delayed_event_call.reset(delay_sec)

async def get_all_for_user(self, requester: Requester) -> list[JsonDict]:
"""Return all pending delayed events requested by the given user."""
async def get_delayed_events_for_user(
self,
requester: Requester,
delay_ids: list[str] | None,
get_scheduled: bool,
get_finalised: bool,
) -> dict[str, list[JsonDict]]:
"""
Return all scheduled delayed events for the given user.

Args:
requester: The user whose delayed events to get.
delay_ids: The IDs of the delayed events to get, or None to get all of them.
get_scheduled: Whether to look up scheduled delayed events.
get_finalised: Whether to look up finalised delayed events.
"""
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
)
return await self._store.get_all_delayed_events_for_user(
requester.user.localpart
)

# TODO: Support Pagination stream API
ret = {}
if get_scheduled:
ret["scheduled"] = await self._store.get_scheduled_delayed_events_for_user(
requester.user.localpart,
delay_ids,
)
if get_finalised:
ret["finalised"] = await self._store.get_finalised_delayed_events_for_user(
requester.user.localpart,
delay_ids,
self._get_current_ts(),
self.hs.config.experimental.msc4140_finalised_retention_period,
self.hs.config.experimental.msc4140_finalised_per_user_retention_limit,
)
return ret

async def _send_event(
self,
event: DelayedEventDetails,
txn_id: str | None = None,
) -> None:
finalise_error: bool,
) -> Timestamp:
user_id = UserID(event.user_localpart, self._config.server.server_name)
user_id_str = user_id.to_string()
# Create a new requester from what data is currently available
Expand All @@ -537,6 +587,7 @@ async def _send_event(
device_id=event.device_id,
)

finalised_ts = None
try:
if event.state_key is not None and event.type == EventTypes.Member:
membership = event.content.get("membership")
Expand Down Expand Up @@ -569,19 +620,39 @@ async def _send_event(
) = await self._event_creation_handler.create_and_send_nonmember_event(
requester,
event_dict,
txn_id=txn_id,
)
event_id = sent_event.event_id
if event.origin_server_ts is None:
finalised_ts = Timestamp(sent_event.origin_server_ts)
except ShadowBanError:
event_id = generate_fake_event_id()
send_error = None
except Exception as e:
if finalise_error:
if isinstance(e, SynapseError):
send_error = e.error_dict(None)
else:
send_error = cs_error("Internal server error")
else:
raise
else:
send_error = None
finally:
# TODO: If this is a temporary error, retry. Otherwise, consider notifying clients of the failure
if finalised_ts is None:
finalised_ts = self._get_current_ts()
try:
await self._store.delete_processed_delayed_event(event.delay_id)
await self._store.finalise_processed_delayed_event(
event.delay_id,
send_error or event_id,
finalised_ts,
)
except Exception:
logger.exception("Failed to delete processed delayed event")
logger.exception("Failed to finalise processed delayed event")

set_tag("event_id", event_id)
if send_error is None:
set_tag("event_id", event_id)
return finalised_ts

def _get_current_ts(self) -> Timestamp:
return Timestamp(self._clock.time_msec())
Expand Down
35 changes: 31 additions & 4 deletions synapse/rest/client/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

from synapse.api.errors import Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import (
RestServlet,
parse_json_object_from_request,
parse_string_from_args,
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.types import JsonDict
Expand All @@ -38,6 +43,11 @@ class _UpdateDelayedEventAction(Enum):
SEND = "send"


class _DelayedEventStatus(Enum):
SCHEDULED = "scheduled"
FINALISED = "finalised"


class UpdateDelayedEventServlet(RestServlet):
PATTERNS = client_patterns(
r"/org\.matrix\.msc4140/delayed_events/(?P<delay_id>[^/]+)$",
Expand Down Expand Up @@ -148,10 +158,27 @@ def __init__(self, hs: "HomeServer"):

async def on_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
# TODO: Support Pagination stream API ("from" query parameter)
delayed_events = await self.delayed_events_handler.get_all_for_user(requester)

ret = {"delayed_events": delayed_events}
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: dict[bytes, list[bytes]] = request.args # type: ignore
statuses = parse_strings_from_args(
args,
"status",
allowed_values=tuple(s.value for s in _DelayedEventStatus),
)
delay_ids = parse_strings_from_args(args, "delay_id")
# TODO: Support Pagination stream API
_from_token = parse_string_from_args(args, "from")

ret = await self.delayed_events_handler.get_delayed_events_for_user(
requester,
delay_ids,
statuses is None or _DelayedEventStatus.SCHEDULED.value in statuses,
statuses is None or _DelayedEventStatus.FINALISED.value in statuses,
)
# TODO: This is here for backwards compatibility. Remove eventually
if statuses is None:
ret["delayed_events"] = ret[_DelayedEventStatus.SCHEDULED.value]
return 200, ret


Expand Down
Loading
Loading