33namespace BeyondCode \LaravelWebSockets \ChannelManagers ;
44
55use BeyondCode \LaravelWebSockets \Channels \Channel ;
6+ use BeyondCode \LaravelWebSockets \Helpers ;
7+ use BeyondCode \LaravelWebSockets \Server \MockableConnection ;
8+ use Carbon \Carbon ;
69use Clue \React \Redis \Client ;
710use Clue \React \Redis \Factory ;
11+ use Illuminate \Cache \RedisLock ;
12+ use Illuminate \Support \Facades \Redis ;
813use Illuminate \Support \Str ;
914use Ratchet \ConnectionInterface ;
1015use React \EventLoop \LoopInterface ;
@@ -41,6 +46,21 @@ class RedisChannelManager extends LocalChannelManager
4146 */
4247 protected $ subscribeClient ;
4348
49+ /**
50+ * The Redis manager instance.
51+ *
52+ * @var \Illuminate\Redis\RedisManager
53+ */
54+ protected $ redis ;
55+
56+ /**
57+ * The lock name to use on Redis to avoid multiple
58+ * actions that might lead to multiple processings.
59+ *
60+ * @var string
61+ */
62+ protected static $ redisLockName = 'laravel-websockets:channel-manager:lock ' ;
63+
4464 /**
4565 * Create a new channel manager instance.
4666 *
@@ -52,6 +72,10 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
5272 {
5373 $ this ->loop = $ loop ;
5474
75+ $ this ->redis = Redis::connection (
76+ config ('websockets.replication.modes.redis.connection ' , 'default ' )
77+ );
78+
5579 $ connectionUri = $ this ->getConnectionUri ();
5680
5781 $ factoryClass = $ factoryClass ?: Factory::class;
@@ -67,6 +91,17 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
6791 $ this ->serverId = Str::uuid ()->toString ();
6892 }
6993
94+ /**
95+ * Get the local connections, regardless of the channel
96+ * they are connected to.
97+ *
98+ * @return \React\Promise\PromiseInterface
99+ */
100+ public function getLocalConnections (): PromiseInterface
101+ {
102+ return parent ::getLocalConnections ();
103+ }
104+
70105 /**
71106 * Get all channels for a specific app
72107 * for the current instance.
@@ -108,9 +143,9 @@ public function unsubscribeFromAllChannels(ConnectionInterface $connection)
108143 $ connection , $ channel , new stdClass
109144 );
110145 }
146+ })->then (function () use ($ connection ) {
147+ parent ::unsubscribeFromAllChannels ($ connection );
111148 });
112-
113- parent ::unsubscribeFromAllChannels ($ connection );
114149 }
115150
116151 /**
@@ -130,6 +165,8 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
130165 }
131166 });
132167
168+ $ this ->addConnectionToSet ($ connection );
169+
133170 $ this ->addChannelToSet (
134171 $ connection ->app ->id , $ channelName
135172 );
@@ -156,8 +193,14 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
156193 if ($ count === 0 ) {
157194 $ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
158195
196+ $ this ->removeUserData (
197+ $ connection ->app ->id , $ channelName , $ connection ->socketId
198+ );
199+
159200 $ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
160201
202+ $ this ->removeConnectionFromSet ($ connection );
203+
161204 return ;
162205 }
163206
@@ -168,7 +211,13 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
168211 if ($ count < 1 ) {
169212 $ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
170213
214+ $ this ->removeUserData (
215+ $ connection ->app ->id , $ channelName , $ connection ->socketId
216+ );
217+
171218 $ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
219+
220+ $ this ->removeConnectionFromSet ($ connection );
172221 }
173222 });
174223 });
@@ -293,12 +342,8 @@ public function getChannelMembers($appId, string $channel): PromiseInterface
293342 {
294343 return $ this ->publishClient
295344 ->hgetall ($ this ->getRedisKey ($ appId , $ channel , ['users ' ]))
296- ->then (function ($ members ) {
297- [$ keys , $ values ] = collect ($ members )->partition (function ($ value , $ key ) {
298- return $ key % 2 === 0 ;
299- });
300-
301- return collect (array_combine ($ keys ->all (), $ values ->all ()))
345+ ->then (function ($ list ) {
346+ return collect (Helpers::redisListToArray ($ list ))
302347 ->map (function ($ user ) {
303348 return json_decode ($ user );
304349 })
@@ -344,6 +389,43 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
344389 });
345390 }
346391
392+ /**
393+ * Keep tracking the connections availability when they pong.
394+ *
395+ * @param \Ratchet\ConnectionInterface $connection
396+ * @return bool
397+ */
398+ public function connectionPonged (ConnectionInterface $ connection ): bool
399+ {
400+ // This will update the score with the current timestamp.
401+ $ this ->addConnectionToSet ($ connection );
402+
403+ return parent ::connectionPonged ($ connection );
404+ }
405+
406+ /**
407+ * Remove the obsolete connections that didn't ponged in a while.
408+ *
409+ * @return bool
410+ */
411+ public function removeObsoleteConnections (): bool
412+ {
413+ $ this ->lock ()->get (function () {
414+ $ this ->getConnectionsFromSet (0 , now ()->subMinutes (2 )->format ('U ' ))
415+ ->then (function ($ connections ) {
416+ foreach ($ connections as $ connection => $ score ) {
417+ [$ appId , $ socketId ] = explode (': ' , $ connection );
418+
419+ $ this ->unsubscribeFromAllChannels (
420+ $ this ->fakeConnectionForApp ($ appId , $ socketId )
421+ );
422+ }
423+ });
424+ });
425+
426+ return parent ::removeObsoleteConnections ();
427+ }
428+
347429 /**
348430 * Handle a message received from Redis on a specific channel.
349431 *
@@ -462,6 +544,57 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int
462544 return $ this ->incrementSubscriptionsCount ($ appId , $ channel , $ increment * -1 );
463545 }
464546
547+ /**
548+ * Add the connection to the sorted list.
549+ *
550+ * @param \Ratchet\ConnectionInterface $connection
551+ * @param \DateTime|string|null $moment
552+ * @return void
553+ */
554+ public function addConnectionToSet (ConnectionInterface $ connection , $ moment = null )
555+ {
556+ $ this ->getPublishClient ()
557+ ->zadd (
558+ $ this ->getRedisKey (null , null , ['sockets ' ]),
559+ Carbon::parse ($ moment )->format ('U ' ), "{$ connection ->app ->id }: {$ connection ->socketId }"
560+ );
561+ }
562+
563+ /**
564+ * Remove the connection from the sorted list.
565+ *
566+ * @param \Ratchet\ConnectionInterface $connection
567+ * @return void
568+ */
569+ public function removeConnectionFromSet (ConnectionInterface $ connection )
570+ {
571+ $ this ->getPublishClient ()
572+ ->zrem (
573+ $ this ->getRedisKey (null , null , ['sockets ' ]),
574+ "{$ connection ->app ->id }: {$ connection ->socketId }"
575+ );
576+ }
577+
578+ /**
579+ * Get the connections from the sorted list, with last
580+ * connection between certain timestamps.
581+ *
582+ * @param int $start
583+ * @param int $stop
584+ * @return PromiseInterface
585+ */
586+ public function getConnectionsFromSet (int $ start = 0 , int $ stop = 0 )
587+ {
588+ return $ this ->getPublishClient ()
589+ ->zrange (
590+ $ this ->getRedisKey (null , null , ['sockets ' ]),
591+ $ start , $ stop , 'withscores '
592+ )
593+ ->then (function ($ list ) {
594+ return Helpers::redisListToArray ($ list );
595+ });
596+ }
597+
465598 /**
466599 * Add a channel to the set list.
467600 *
@@ -555,11 +688,11 @@ public function unsubscribeFromTopic($appId, string $channel = null)
555688 * Get the Redis Keyspace name to handle subscriptions
556689 * and other key-value sets.
557690 *
558- * @param mixed $appId
691+ * @param string|int|null $appId
559692 * @param string|null $channel
560693 * @return string
561694 */
562- public function getRedisKey ($ appId , string $ channel = null , array $ suffixes = []): string
695+ public function getRedisKey ($ appId = null , string $ channel = null , array $ suffixes = []): string
563696 {
564697 $ prefix = config ('database.redis.options.prefix ' , null );
565698
@@ -577,4 +710,28 @@ public function getRedisKey($appId, string $channel = null, array $suffixes = []
577710
578711 return $ hash ;
579712 }
713+
714+ /**
715+ * Get a new RedisLock instance to avoid race conditions.
716+ *
717+ * @return \Illuminate\Cache\CacheLock
718+ */
719+ protected function lock ()
720+ {
721+ return new RedisLock ($ this ->redis , static ::$ redisLockName , 0 );
722+ }
723+
724+ /**
725+ * Create a fake connection for app that will mimick a connection
726+ * by app ID and Socket ID to be able to be passed to the methods
727+ * that accepts a connection class.
728+ *
729+ * @param string|int $appId
730+ * @param string $socketId
731+ * @return ConnectionInterface
732+ */
733+ public function fakeConnectionForApp ($ appId , string $ socketId )
734+ {
735+ return new MockableConnection ($ appId , $ socketId );
736+ }
580737}
0 commit comments