33namespace BeyondCode \LaravelWebSockets \ChannelManagers ;
44
55use BeyondCode \LaravelWebSockets \Channels \Channel ;
6+ use BeyondCode \LaravelWebSockets \Helpers ;
7+ use BeyondCode \LaravelWebSockets \Server \MockableConnection ;
8+ use Carbon \Carbon ;
69use Clue \React \Redis \Client ;
710use Clue \React \Redis \Factory ;
11+ use Illuminate \Cache \RedisLock ;
12+ use Illuminate \Support \Facades \Redis ;
813use Illuminate \Support \Str ;
914use Ratchet \ConnectionInterface ;
15+ use Ratchet \WebSocket \WsConnection ;
1016use React \EventLoop \LoopInterface ;
1117use React \Promise \PromiseInterface ;
1218use stdClass ;
@@ -41,6 +47,21 @@ class RedisChannelManager extends LocalChannelManager
4147 */
4248 protected $ subscribeClient ;
4349
50+ /**
51+ * The Redis manager instance.
52+ *
53+ * @var \Illuminate\Redis\RedisManager
54+ */
55+ protected $ redis ;
56+
57+ /**
58+ * The lock name to use on Redis to avoid multiple
59+ * actions that might lead to multiple processings.
60+ *
61+ * @var string
62+ */
63+ protected static $ redisLockName = 'laravel-websockets:channel-manager:lock ' ;
64+
4465 /**
4566 * Create a new channel manager instance.
4667 *
@@ -52,6 +73,10 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
5273 {
5374 $ this ->loop = $ loop ;
5475
76+ $ this ->redis = Redis::connection (
77+ config ('websockets.replication.modes.redis.connection ' , 'default ' )
78+ );
79+
5580 $ connectionUri = $ this ->getConnectionUri ();
5681
5782 $ factoryClass = $ factoryClass ?: Factory::class;
@@ -141,6 +166,8 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
141166 }
142167 });
143168
169+ $ this ->addConnectionToSet ($ connection );
170+
144171 $ this ->addChannelToSet (
145172 $ connection ->app ->id , $ channelName
146173 );
@@ -167,8 +194,14 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
167194 if ($ count === 0 ) {
168195 $ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
169196
197+ $ this ->removeUserData (
198+ $ connection ->app ->id , $ channelName , $ connection ->socketId
199+ );
200+
170201 $ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
171202
203+ $ this ->removeConnectionFromSet ($ connection );
204+
172205 return ;
173206 }
174207
@@ -179,7 +212,13 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
179212 if ($ count < 1 ) {
180213 $ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
181214
215+ $ this ->removeUserData (
216+ $ connection ->app ->id , $ channelName , $ connection ->socketId
217+ );
218+
182219 $ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
220+
221+ $ this ->removeConnectionFromSet ($ connection );
183222 }
184223 });
185224 });
@@ -304,12 +343,8 @@ public function getChannelMembers($appId, string $channel): PromiseInterface
304343 {
305344 return $ this ->publishClient
306345 ->hgetall ($ this ->getRedisKey ($ appId , $ channel , ['users ' ]))
307- ->then (function ($ members ) {
308- [$ keys , $ values ] = collect ($ members )->partition (function ($ value , $ key ) {
309- return $ key % 2 === 0 ;
310- });
311-
312- return collect (array_combine ($ keys ->all (), $ values ->all ()))
346+ ->then (function ($ list ) {
347+ return collect (Helpers::redisListToArray ($ list ))
313348 ->map (function ($ user ) {
314349 return json_decode ($ user );
315350 })
@@ -355,6 +390,43 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
355390 });
356391 }
357392
393+ /**
394+ * Keep tracking the connections availability when they pong.
395+ *
396+ * @param \Ratchet\ConnectionInterface $connection
397+ * @return bool
398+ */
399+ public function connectionPonged (ConnectionInterface $ connection ): bool
400+ {
401+ // This will update the score with the current timestamp.
402+ $ this ->addConnectionToSet ($ connection );
403+
404+ return parent ::connectionPonged ($ connection );
405+ }
406+
407+ /**
408+ * Remove the obsolete connections that didn't ponged in a while.
409+ *
410+ * @return bool
411+ */
412+ public function removeObsoleteConnections (): bool
413+ {
414+ $ this ->lock ()->get (function () {
415+ $ this ->getConnectionsFromSet (0 , now ()->subMinutes (2 )->format ('U ' ))
416+ ->then (function ($ connections ) {
417+ foreach ($ connections as $ connection => $ score ) {
418+ [$ appId , $ socketId ] = explode (': ' , $ connection );
419+
420+ $ this ->unsubscribeFromAllChannels (
421+ $ this ->fakeConnectionForApp ($ appId , $ socketId )
422+ );
423+ }
424+ });
425+ });
426+
427+ return parent ::removeObsoleteConnections ();
428+ }
429+
358430 /**
359431 * Handle a message received from Redis on a specific channel.
360432 *
@@ -473,6 +545,57 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int
473545 return $ this ->incrementSubscriptionsCount ($ appId , $ channel , $ increment * -1 );
474546 }
475547
548+ /**
549+ * Add the connection to the sorted list.
550+ *
551+ * @param \Ratchet\ConnectionInterface $connection
552+ * @param \DateTime|string|null $moment
553+ * @return void
554+ */
555+ public function addConnectionToSet (ConnectionInterface $ connection , $ moment = null )
556+ {
557+ $ this ->getPublishClient ()
558+ ->zadd (
559+ $ this ->getRedisKey (null , null , ['sockets ' ]),
560+ Carbon::parse ($ moment )->format ('U ' ), "{$ connection ->app ->id }: {$ connection ->socketId }"
561+ );
562+ }
563+
564+ /**
565+ * Remove the connection from the sorted list.
566+ *
567+ * @param \Ratchet\ConnectionInterface $connection
568+ * @return void
569+ */
570+ public function removeConnectionFromSet (ConnectionInterface $ connection )
571+ {
572+ $ this ->getPublishClient ()
573+ ->zrem (
574+ $ this ->getRedisKey (null , null , ['sockets ' ]),
575+ "{$ connection ->app ->id }: {$ connection ->socketId }"
576+ );
577+ }
578+
579+ /**
580+ * Get the connections from the sorted list, with last
581+ * connection between certain timestamps.
582+ *
583+ * @param int $start
584+ * @param int $stop
585+ * @return PromiseInterface
586+ */
587+ public function getConnectionsFromSet (int $ start = 0 , int $ stop = 0 )
588+ {
589+ return $ this ->getPublishClient ()
590+ ->zrange (
591+ $ this ->getRedisKey (null , null , ['sockets ' ]),
592+ $ start , $ stop , 'withscores '
593+ )
594+ ->then (function ($ list ) {
595+ return Helpers::redisListToArray ($ list );
596+ });
597+ }
598+
476599 /**
477600 * Add a channel to the set list.
478601 *
@@ -566,11 +689,11 @@ public function unsubscribeFromTopic($appId, string $channel = null)
566689 * Get the Redis Keyspace name to handle subscriptions
567690 * and other key-value sets.
568691 *
569- * @param mixed $appId
692+ * @param string|int|null $appId
570693 * @param string|null $channel
571694 * @return string
572695 */
573- public function getRedisKey ($ appId , string $ channel = null , array $ suffixes = []): string
696+ public function getRedisKey ($ appId = null , string $ channel = null , array $ suffixes = []): string
574697 {
575698 $ prefix = config ('database.redis.options.prefix ' , null );
576699
@@ -588,4 +711,28 @@ public function getRedisKey($appId, string $channel = null, array $suffixes = []
588711
589712 return $ hash ;
590713 }
714+
715+ /**
716+ * Get a new RedisLock instance to avoid race conditions.
717+ *
718+ * @return \Illuminate\Cache\CacheLock
719+ */
720+ protected function lock ()
721+ {
722+ return new RedisLock ($ this ->redis , static ::$ redisLockName , 0 );
723+ }
724+
725+ /**
726+ * Create a fake connection for app that will mimick a connection
727+ * by app ID and Socket ID to be able to be passed to the methods
728+ * that accepts a connection class.
729+ *
730+ * @param string|int $appId
731+ * @param string $socketId
732+ * @return ConnectionInterface
733+ */
734+ public function fakeConnectionForApp ($ appId , string $ socketId )
735+ {
736+ return new MockableConnection ($ appId , $ socketId );
737+ }
591738}
0 commit comments