@@ -11,13 +11,13 @@ use std::cmp::Ordering;
1111use std:: future:: Future ;
1212use std:: ops:: Deref ;
1313use std:: sync:: atomic:: AtomicBool ;
14- use std:: sync:: { Arc , Mutex } ;
14+ use std:: sync:: { Arc , Mutex , MutexGuard } ;
1515use std:: time:: Duration ;
16- use tokio:: sync:: broadcast:: { Receiver , Sender } ;
16+ use tokio:: sync:: broadcast:: error :: RecvError ;
1717use tokio:: sync:: { broadcast, watch, Notify } ;
1818
1919pub ( crate ) trait ConfigManager : Sized + Send + Sync {
20- fn watch ( & self ) -> Receiver < ParsedConfig > ;
20+ fn watch ( & self ) -> watch :: Receiver < ParsedConfig > ;
2121 fn reconfigure ( & self , config : ConfigManagerMemdConfig ) -> crate :: error:: Result < ( ) > ;
2222 fn out_of_band_version (
2323 & self ,
@@ -41,7 +41,7 @@ pub(crate) struct ConfigManagerMemdOptions<M: KvClientManager> {
4141 pub polling_period : Duration ,
4242 pub first_config : ParsedConfig ,
4343 pub kv_client_manager : Arc < M > ,
44- pub on_shutdown_rx : Receiver < ( ) > ,
44+ pub on_shutdown_rx : broadcast :: Receiver < ( ) > ,
4545}
4646
4747pub ( crate ) struct ConfigManagerMemd < M : KvClientManager > {
@@ -55,19 +55,19 @@ pub(crate) struct ConfigManagerMemdInner<M: KvClientManager> {
5555 out_of_band_notify : Notify ,
5656 performing_out_of_band_fetch : AtomicBool ,
5757
58- latest_config : Mutex < ParsedConfig > ,
58+ latest_config : Arc < Mutex < ParsedConfig > > ,
5959 latest_version_tx : watch:: Sender < ConfigVersion > ,
6060 bucket : Mutex < Option < ParsedConfigBucket > > ,
6161
62- on_new_config_txs : Vec < Sender < ParsedConfig > > ,
62+ on_new_config_tx : watch :: Sender < ParsedConfig > ,
6363
64- on_shutdown_rx : Receiver < ( ) > ,
65- watcher_shutdown_tx : Sender < ( ) > ,
64+ on_shutdown_rx : broadcast :: Receiver < ( ) > ,
65+ watcher_shutdown_tx : broadcast :: Sender < ( ) > ,
6666}
6767
6868impl < M : KvClientManager + ' static > ConfigManagerMemdInner < M > {
69- pub fn watch ( & self ) -> Receiver < ParsedConfig > {
70- self . watcher . watch ( self . watcher_shutdown_tx . subscribe ( ) )
69+ pub fn watch ( & self ) -> watch :: Receiver < ParsedConfig > {
70+ self . on_new_config_tx . subscribe ( )
7171 }
7272
7373 pub fn reconfigure ( & self , config : ConfigManagerMemdConfig ) -> crate :: error:: Result < ( ) > {
@@ -117,34 +117,23 @@ impl<M: KvClientManager + 'static> ConfigManagerMemdInner<M> {
117117 {
118118 Ok ( c) => c,
119119 Err ( e) => {
120- // TODO: log
121- dbg ! ( e) ;
120+ warn ! ( "Out-of-band fetch from {endpoint} failed: {e}" ) ;
122121 continue ;
123122 }
124123 } ;
125124
126125 if let Some ( parsed_config) = parsed_config {
127- // refresh our known version in case it changed somehow.
128- let mut latest_config = self . latest_config . lock ( ) . unwrap ( ) ;
129-
130- if Self :: can_update_config ( & parsed_config, latest_config. deref ( ) ) {
131- let new_latest_version = ConfigVersion {
132- rev_epoch : parsed_config. rev_epoch ,
133- rev_id : parsed_config. rev_id ,
134- } ;
135- * latest_config = parsed_config. clone ( ) ;
136- drop ( latest_config) ;
137-
138- if let Err ( e) = self . latest_version_tx . send ( new_latest_version) {
139- warn ! ( "Failed to send new config version: {e}" ) ;
140- }
141-
126+ if let Some ( cfg) = Self :: handle_config (
127+ self . latest_config . lock ( ) . unwrap ( ) ,
128+ parsed_config,
129+ self . latest_version_tx . clone ( ) ,
130+ ) {
142131 self . performing_out_of_band_fetch
143132 . store ( false , std:: sync:: atomic:: Ordering :: SeqCst ) ;
144133 self . out_of_band_notify . notify_waiters ( ) ;
145134
146- return Some ( parsed_config ) ;
147- }
135+ return Some ( cfg ) ;
136+ } ;
148137 }
149138 }
150139
@@ -164,8 +153,18 @@ impl<M: KvClientManager + 'static> ConfigManagerMemdInner<M> {
164153 }
165154
166155 pub fn out_of_band_config ( & self , parsed_config : ParsedConfig ) -> Option < ParsedConfig > {
167- let mut latest_config = self . latest_config . lock ( ) . unwrap ( ) ;
156+ Self :: handle_config (
157+ self . latest_config . lock ( ) . unwrap ( ) ,
158+ parsed_config,
159+ self . latest_version_tx . clone ( ) ,
160+ )
161+ }
168162
163+ fn handle_config (
164+ mut latest_config : MutexGuard < ParsedConfig > ,
165+ parsed_config : ParsedConfig ,
166+ latest_version_tx : watch:: Sender < ConfigVersion > ,
167+ ) -> Option < ParsedConfig > {
169168 if Self :: can_update_config ( & parsed_config, latest_config. deref ( ) ) {
170169 let new_latest_version = ConfigVersion {
171170 rev_epoch : parsed_config. rev_epoch ,
@@ -174,8 +173,8 @@ impl<M: KvClientManager + 'static> ConfigManagerMemdInner<M> {
174173 * latest_config = parsed_config. clone ( ) ;
175174 drop ( latest_config) ;
176175
177- if let Err ( e) = self . latest_version_tx . send ( new_latest_version) {
178- warn ! ( "Failed to send new config version: {e}" ) ;
176+ if let Err ( e) = latest_version_tx. send ( new_latest_version) {
177+ warn ! ( "Failed to update config watcher with latest version: {e}" ) ;
179178 }
180179
181180 return Some ( parsed_config) ;
@@ -184,14 +183,6 @@ impl<M: KvClientManager + 'static> ConfigManagerMemdInner<M> {
184183 None
185184 }
186185
187- fn send_new_config ( & self , parsed_config : ParsedConfig ) {
188- for tx in self . on_new_config_txs . iter ( ) {
189- if let Err ( e) = tx. send ( parsed_config. clone ( ) ) {
190- warn ! ( "Failed to send new config: {e}" ) ;
191- }
192- }
193- }
194-
195186 fn shutdown ( & self ) {
196187 if let Err ( e) = self . watcher_shutdown_tx . send ( ( ) ) {
197188 debug ! ( "Failed to send shutdown signal to watcher: {e}" ) ;
@@ -200,20 +191,56 @@ impl<M: KvClientManager + 'static> ConfigManagerMemdInner<M> {
200191
201192 fn can_update_config ( new_config : & ParsedConfig , old_config : & ParsedConfig ) -> bool {
202193 if new_config. bucket != old_config. bucket {
203- debug ! ( "Switching config due to changed bucket type (bucket takeover)" ) ;
194+ debug ! (
195+ "Switching config due to changed bucket type (bucket takeover) old: {:?} new: {:?}" ,
196+ old_config. bucket, new_config. bucket
197+ ) ;
204198 return true ;
205199 } else if let Some ( cmp) = new_config. partial_cmp ( old_config) {
206200 if cmp == Ordering :: Less {
207- debug ! ( "Skipping config due to new config being an older revision" )
201+ debug ! ( "Skipping config due to new config being an older revision old: rev_epoch={}, rev_id={} new: rev_epoch={}, rev_id={}" ,
202+ old_config. rev_epoch, old_config. rev_id, new_config. rev_epoch, new_config. rev_id) ;
208203 } else if cmp == Ordering :: Equal {
209- debug ! ( "Skipping config due to matching revisions" )
204+ debug ! ( "Skipping config due to matching revisions old: rev_epoch={}, rev_id={} new: rev_epoch={}, rev_id={}" ,
205+ old_config. rev_epoch, old_config. rev_id, new_config. rev_epoch, new_config. rev_id) ;
210206 } else {
211207 return true ;
212208 }
213209 }
214210
215211 false
216212 }
213+
214+ pub fn start_watcher ( & self , watcher_shutdown_rx : broadcast:: Receiver < ( ) > ) {
215+ let mut rx = self . watcher . watch ( watcher_shutdown_rx) ;
216+ let latest_version_tx = self . latest_version_tx . clone ( ) ;
217+ let guard = self . latest_config . clone ( ) ;
218+ let new_config_tx = self . on_new_config_tx . clone ( ) ;
219+
220+ tokio:: spawn ( async move {
221+ loop {
222+ match rx. recv ( ) . await {
223+ Ok ( cfg) => {
224+ if let Some ( new_cfg) = Self :: handle_config (
225+ guard. lock ( ) . unwrap ( ) ,
226+ cfg,
227+ latest_version_tx. clone ( ) ,
228+ ) {
229+ new_config_tx. send_replace ( new_cfg) ;
230+ }
231+ }
232+ Err ( e) => {
233+ if e == RecvError :: Closed {
234+ debug ! ( "Config watcher channel closed" ) ;
235+ return ;
236+ } else {
237+ warn ! ( "Config watcher channel error: {e}" ) ;
238+ }
239+ }
240+ }
241+ }
242+ } ) ;
243+ }
217244}
218245
219246impl < M : KvClientManager + ' static > ConfigManagerMemd < M > {
@@ -243,33 +270,34 @@ impl<M: KvClientManager + 'static> ConfigManagerMemd<M> {
243270 let ( watcher_shutdown_tx, watcher_shutdown_rx) = broadcast:: channel ( 1 ) ;
244271 let bucket = opts. first_config . bucket . clone ( ) ;
245272
273+ let ( on_new_config_tx, _on_new_config_rx) =
274+ watch:: channel :: < ParsedConfig > ( opts. first_config . clone ( ) ) ;
275+
246276 let inner = Arc :: new ( ConfigManagerMemdInner {
247277 fetcher,
248278 watcher : watcher. clone ( ) ,
249279
250280 out_of_band_notify : Notify :: new ( ) ,
251281 performing_out_of_band_fetch : AtomicBool :: new ( false ) ,
252282
253- latest_config : Mutex :: new ( opts. first_config ) ,
283+ latest_config : Arc :: new ( Mutex :: new ( opts. first_config ) ) ,
254284 latest_version_tx,
255285 bucket : Mutex :: new ( bucket) ,
256286
257- on_new_config_txs : Vec :: new ( ) ,
287+ on_new_config_tx ,
258288
259289 on_shutdown_rx : opts. on_shutdown_rx ,
260290 watcher_shutdown_tx,
261291 } ) ;
262292
263- tokio:: spawn ( async move {
264- watcher. watch ( watcher_shutdown_rx) ;
265- } ) ;
293+ inner. start_watcher ( watcher_shutdown_rx) ;
266294
267295 ConfigManagerMemd { inner }
268296 }
269297}
270298
271299impl < M : KvClientManager + ' static > ConfigManager for ConfigManagerMemd < M > {
272- fn watch ( & self ) -> Receiver < ParsedConfig > {
300+ fn watch ( & self ) -> watch :: Receiver < ParsedConfig > {
273301 self . inner . watch ( )
274302 }
275303
0 commit comments