|
| 1 | +import 'dart:async'; |
1 | 2 | import 'dart:collection';
|
2 | 3 |
|
| 4 | +import 'package:collection/collection.dart'; |
3 | 5 | import 'package:flutter/foundation.dart';
|
4 | 6 |
|
5 | 7 | import '../api/model/events.dart';
|
6 | 8 | import '../api/model/initial_snapshot.dart';
|
7 | 9 | import '../api/model/model.dart';
|
| 10 | +import '../api/route/channels.dart'; |
| 11 | +import 'store.dart'; |
| 12 | + |
| 13 | +final _apiGetStreamTopics = getStreamTopics; // similar to _apiSendMessage in lib/model/message.dart |
8 | 14 |
|
9 | 15 | /// The portion of [PerAccountStore] for channels, topics, and stuff about them.
|
10 | 16 | ///
|
@@ -36,6 +42,26 @@ mixin ChannelStore {
|
36 | 42 | /// and [streamsByName].
|
37 | 43 | Map<int, Subscription> get subscriptions;
|
38 | 44 |
|
| 45 | + /// Fetch topics in a stream from the server. |
| 46 | + /// |
| 47 | + /// The results from the last successful fetch |
| 48 | + /// can be retrieved with [getStreamTopics]. |
| 49 | + Future<void> fetchTopics(int streamId); |
| 50 | + |
| 51 | + /// Pairs of the known topics and its latest message ID, in the given stream. |
| 52 | + /// |
| 53 | + /// Returns null if the data has never been fetched yet. |
| 54 | + /// To fetch it from the server, use [fetchTopics]. |
| 55 | + /// |
| 56 | + /// The result is guaranteed to be sorted by [GetStreamTopicsEntry.maxId], and the |
| 57 | + /// topics are guaranteed to be distinct. |
| 58 | + /// |
| 59 | + /// In some cases, the same maxId affected by message moves can be present in |
| 60 | + /// multiple [GetStreamTopicsEntry] entries. For this reason, the caller |
| 61 | + /// should not rely on [getStreamTopics] to determine which topic the message |
| 62 | + /// is in. Instead, refer to [PerAccountStore.messages]. |
| 63 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId); |
| 64 | + |
39 | 65 | /// The visibility policy that the self-user has for the given topic.
|
40 | 66 | ///
|
41 | 67 | /// This does not incorporate the user's channel-level policy,
|
@@ -212,6 +238,34 @@ class ChannelStoreImpl extends PerAccountStoreBase with ChannelStore {
|
212 | 238 | @override
|
213 | 239 | final Map<int, Subscription> subscriptions;
|
214 | 240 |
|
| 241 | + /// Maps indexed by stream IDs, of the known latest message IDs in each topic. |
| 242 | + /// |
| 243 | + /// For example: `_latestMessageIdsByStreamTopic[stream.id][topic] = maxId` |
| 244 | + /// |
| 245 | + /// In some cases, the same message IDs, when affected by message moves, can |
| 246 | + /// be present for mutliple stream-topic keys. |
| 247 | + final Map<int, Map<TopicName, int>> _latestMessageIdsByStreamTopic = {}; |
| 248 | + |
| 249 | + @override |
| 250 | + Future<void> fetchTopics(int streamId) async { |
| 251 | + final result = await _apiGetStreamTopics(connection, streamId: streamId, |
| 252 | + allowEmptyTopicName: true); |
| 253 | + _latestMessageIdsByStreamTopic[streamId] = { |
| 254 | + for (final GetStreamTopicsEntry(:name, :maxId) in result.topics) |
| 255 | + name: maxId, |
| 256 | + }; |
| 257 | + } |
| 258 | + |
| 259 | + @override |
| 260 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId) { |
| 261 | + final latestMessageIdsInStream = _latestMessageIdsByStreamTopic[streamId]; |
| 262 | + if (latestMessageIdsInStream == null) return null; |
| 263 | + return [ |
| 264 | + for (final MapEntry(:key, :value) in latestMessageIdsInStream.entries) |
| 265 | + GetStreamTopicsEntry(maxId: value, name: key), |
| 266 | + ].sortedBy((value) => -value.maxId); |
| 267 | + } |
| 268 | + |
215 | 269 | @override
|
216 | 270 | Map<int, TopicKeyedMap<UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility;
|
217 | 271 |
|
@@ -377,6 +431,47 @@ class ChannelStoreImpl extends PerAccountStoreBase with ChannelStore {
|
377 | 431 | forStream[event.topicName] = visibilityPolicy;
|
378 | 432 | }
|
379 | 433 | }
|
| 434 | + |
| 435 | + /// Handle a [MessageEvent], returning whether listeners should be notified. |
| 436 | + bool handleMessageEvent(MessageEvent event) { |
| 437 | + if (event.message is! StreamMessage) return false; |
| 438 | + final StreamMessage(:streamId, :topic) = event.message as StreamMessage; |
| 439 | + |
| 440 | + final latestMessageIdsByTopic = _latestMessageIdsByStreamTopic[streamId]; |
| 441 | + if (latestMessageIdsByTopic == null) return false; |
| 442 | + assert(!latestMessageIdsByTopic.containsKey(topic) |
| 443 | + || latestMessageIdsByTopic[topic]! < event.message.id); |
| 444 | + latestMessageIdsByTopic[topic] = event.message.id; |
| 445 | + return true; |
| 446 | + } |
| 447 | + |
| 448 | + /// Handle a [UpdateMessageEvent], returning whether listeners should be |
| 449 | + /// notified. |
| 450 | + bool handleUpdateMessageEvent(UpdateMessageEvent event) { |
| 451 | + if (event.moveData == null) return false; |
| 452 | + final UpdateMessageMoveData( |
| 453 | + :origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode, |
| 454 | + ) = event.moveData!; |
| 455 | + bool shouldNotify = false; |
| 456 | + |
| 457 | + final origLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[origStreamId]; |
| 458 | + if (propagateMode == PropagateMode.changeAll |
| 459 | + && origLatestMessageIdsByTopics != null) { |
| 460 | + shouldNotify = origLatestMessageIdsByTopics.remove(origTopic) != null; |
| 461 | + } |
| 462 | + |
| 463 | + final newLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[newStreamId]; |
| 464 | + if (newLatestMessageIdsByTopics != null) { |
| 465 | + final movedMaxId = event.messageIds.max; |
| 466 | + if (!newLatestMessageIdsByTopics.containsKey(newTopic) |
| 467 | + || newLatestMessageIdsByTopics[newTopic]! < movedMaxId) { |
| 468 | + newLatestMessageIdsByTopics[newTopic] = movedMaxId; |
| 469 | + shouldNotify = true; |
| 470 | + } |
| 471 | + } |
| 472 | + |
| 473 | + return shouldNotify; |
| 474 | + } |
380 | 475 | }
|
381 | 476 |
|
382 | 477 | /// A [Map] with [TopicName] keys and [V] values.
|
|
0 commit comments