|
| 1 | +import 'dart:async'; |
1 | 2 | import 'dart:collection';
|
2 | 3 |
|
| 4 | +import 'package:collection/collection.dart'; |
3 | 5 | import 'package:flutter/foundation.dart';
|
4 | 6 |
|
5 | 7 | import '../api/model/events.dart';
|
6 | 8 | import '../api/model/initial_snapshot.dart';
|
7 | 9 | import '../api/model/model.dart';
|
| 10 | +import '../api/route/channels.dart'; |
8 | 11 | import 'store.dart';
|
9 | 12 | import 'user.dart';
|
10 | 13 |
|
| 14 | +final _apiGetStreamTopics = getStreamTopics; // similar to _apiSendMessage in lib/model/message.dart |
| 15 | + |
11 | 16 | /// The portion of [PerAccountStore] for channels, topics, and stuff about them.
|
12 | 17 | ///
|
13 | 18 | /// This type is useful for expressing the needs of other parts of the
|
@@ -38,6 +43,26 @@ mixin ChannelStore on UserStore {
|
38 | 43 | /// and [streamsByName].
|
39 | 44 | Map<int, Subscription> get subscriptions;
|
40 | 45 |
|
| 46 | + /// Fetch topics in a stream from the server. |
| 47 | + /// |
| 48 | + /// The results from the last successful fetch |
| 49 | + /// can be retrieved with [getStreamTopics]. |
| 50 | + Future<void> fetchTopics(int streamId); |
| 51 | + |
| 52 | + /// Pairs of the known topics and its latest message ID, in the given stream. |
| 53 | + /// |
| 54 | + /// Returns null if the data has never been fetched yet. |
| 55 | + /// To fetch it from the server, use [fetchTopics]. |
| 56 | + /// |
| 57 | + /// The result is guaranteed to be sorted by [GetStreamTopicsEntry.maxId], and the |
| 58 | + /// topics are guaranteed to be distinct. |
| 59 | + /// |
| 60 | + /// In some cases, the same maxId affected by message moves can be present in |
| 61 | + /// multiple [GetStreamTopicsEntry] entries. For this reason, the caller |
| 62 | + /// should not rely on [getStreamTopics] to determine which topic the message |
| 63 | + /// is in. Instead, refer to [PerAccountStore.messages]. |
| 64 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId); |
| 65 | + |
41 | 66 | /// The visibility policy that the self-user has for the given topic.
|
42 | 67 | ///
|
43 | 68 | /// This does not incorporate the user's channel-level policy,
|
@@ -199,6 +224,13 @@ mixin ProxyChannelStore on ChannelStore {
|
199 | 224 | @override
|
200 | 225 | Map<int, Subscription> get subscriptions => channelStore.subscriptions;
|
201 | 226 |
|
| 227 | + @override |
| 228 | + Future<void> fetchTopics(int streamId) => channelStore.fetchTopics(streamId); |
| 229 | + |
| 230 | + @override |
| 231 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId) => |
| 232 | + channelStore.getStreamTopics(streamId); |
| 233 | + |
202 | 234 | @override
|
203 | 235 | UserTopicVisibilityPolicy topicVisibilityPolicy(int streamId, TopicName topic) =>
|
204 | 236 | channelStore.topicVisibilityPolicy(streamId, topic);
|
@@ -260,6 +292,34 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore {
|
260 | 292 | @override
|
261 | 293 | final Map<int, Subscription> subscriptions;
|
262 | 294 |
|
| 295 | + /// Maps indexed by stream IDs, of the known latest message IDs in each topic. |
| 296 | + /// |
| 297 | + /// For example: `_latestMessageIdsByStreamTopic[stream.id][topic] = maxId` |
| 298 | + /// |
| 299 | + /// In some cases, the same message IDs, when affected by message moves, can |
| 300 | + /// be present for mutliple stream-topic keys. |
| 301 | + final Map<int, Map<TopicName, int>> _latestMessageIdsByStreamTopic = {}; |
| 302 | + |
| 303 | + @override |
| 304 | + Future<void> fetchTopics(int streamId) async { |
| 305 | + final result = await _apiGetStreamTopics(connection, streamId: streamId, |
| 306 | + allowEmptyTopicName: true); |
| 307 | + _latestMessageIdsByStreamTopic[streamId] = { |
| 308 | + for (final GetStreamTopicsEntry(:name, :maxId) in result.topics) |
| 309 | + name: maxId, |
| 310 | + }; |
| 311 | + } |
| 312 | + |
| 313 | + @override |
| 314 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId) { |
| 315 | + final latestMessageIdsInStream = _latestMessageIdsByStreamTopic[streamId]; |
| 316 | + if (latestMessageIdsInStream == null) return null; |
| 317 | + return [ |
| 318 | + for (final MapEntry(:key, :value) in latestMessageIdsInStream.entries) |
| 319 | + GetStreamTopicsEntry(maxId: value, name: key), |
| 320 | + ].sortedBy((value) => -value.maxId); |
| 321 | + } |
| 322 | + |
263 | 323 | @override
|
264 | 324 | Map<int, TopicKeyedMap<UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility;
|
265 | 325 |
|
@@ -425,6 +485,47 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore {
|
425 | 485 | forStream[event.topicName] = visibilityPolicy;
|
426 | 486 | }
|
427 | 487 | }
|
| 488 | + |
| 489 | + /// Handle a [MessageEvent], returning whether listeners should be notified. |
| 490 | + bool handleMessageEvent(MessageEvent event) { |
| 491 | + if (event.message is! StreamMessage) return false; |
| 492 | + final StreamMessage(:streamId, :topic) = event.message as StreamMessage; |
| 493 | + |
| 494 | + final latestMessageIdsByTopic = _latestMessageIdsByStreamTopic[streamId]; |
| 495 | + if (latestMessageIdsByTopic == null) return false; |
| 496 | + assert(!latestMessageIdsByTopic.containsKey(topic) |
| 497 | + || latestMessageIdsByTopic[topic]! < event.message.id); |
| 498 | + latestMessageIdsByTopic[topic] = event.message.id; |
| 499 | + return true; |
| 500 | + } |
| 501 | + |
| 502 | + /// Handle a [UpdateMessageEvent], returning whether listeners should be |
| 503 | + /// notified. |
| 504 | + bool handleUpdateMessageEvent(UpdateMessageEvent event) { |
| 505 | + if (event.moveData == null) return false; |
| 506 | + final UpdateMessageMoveData( |
| 507 | + :origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode, |
| 508 | + ) = event.moveData!; |
| 509 | + bool shouldNotify = false; |
| 510 | + |
| 511 | + final origLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[origStreamId]; |
| 512 | + if (propagateMode == PropagateMode.changeAll |
| 513 | + && origLatestMessageIdsByTopics != null) { |
| 514 | + shouldNotify = origLatestMessageIdsByTopics.remove(origTopic) != null; |
| 515 | + } |
| 516 | + |
| 517 | + final newLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[newStreamId]; |
| 518 | + if (newLatestMessageIdsByTopics != null) { |
| 519 | + final movedMaxId = event.messageIds.max; |
| 520 | + if (!newLatestMessageIdsByTopics.containsKey(newTopic) |
| 521 | + || newLatestMessageIdsByTopics[newTopic]! < movedMaxId) { |
| 522 | + newLatestMessageIdsByTopics[newTopic] = movedMaxId; |
| 523 | + shouldNotify = true; |
| 524 | + } |
| 525 | + } |
| 526 | + |
| 527 | + return shouldNotify; |
| 528 | + } |
428 | 529 | }
|
429 | 530 |
|
430 | 531 | /// A [Map] with [TopicName] keys and [V] values.
|
|
0 commit comments