Skip to content

Commit 5dd942c

Browse files
committed
RSCBC-202: Make ResponseContext optional on SenderContext
1 parent d1ccf13 commit 5dd942c

File tree

8 files changed

+93
-58
lines changed

8 files changed

+93
-58
lines changed

sdk/couchbase-core/src/memdx/client.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,21 @@ use crate::memdx::subdoc::SubdocRequestInfo;
6161
use crate::orphan_reporter::OrphanContext;
6262

6363
pub(crate) type ResponseSender = Sender<error::Result<ClientResponse>>;
64-
pub(crate) type OpaqueMap = HashMap<u32, Arc<SenderContext>>;
64+
pub(crate) type OpaqueMap = HashMap<u32, SenderContext>;
6565

6666
#[derive(Debug, Clone)]
6767
pub struct ResponseContext {
6868
pub cas: Option<u64>,
6969
pub subdoc_info: Option<SubdocRequestInfo>,
70-
pub is_persistent: bool,
7170
pub scope_name: Option<String>,
7271
pub collection_name: Option<String>,
7372
}
7473

7574
#[derive(Debug, Clone)]
7675
pub(crate) struct SenderContext {
7776
pub sender: ResponseSender,
78-
pub context: Arc<ResponseContext>,
77+
pub is_persistent: bool,
78+
pub context: Option<ResponseContext>,
7979
}
8080

8181
struct ReadLoopOptions {
@@ -122,7 +122,7 @@ impl Client {
122122

123123
let opaque = self.current_opaque.fetch_add(1, Ordering::SeqCst);
124124

125-
map.insert(opaque, Arc::new(response_context));
125+
map.insert(opaque, response_context);
126126

127127
opaque
128128
}
@@ -202,14 +202,11 @@ impl Client {
202202

203203
let requests: Arc<std::sync::Mutex<OpaqueMap>> = Arc::clone(&opaque_map);
204204
let context = {
205-
let map = requests.lock().unwrap();
206-
207-
let t = map.get(&opaque);
208-
209-
t.map(Arc::clone)
205+
let mut map = requests.lock().unwrap();
206+
map.remove(&opaque)
210207
};
211208

212-
if let Some(context) = context {
209+
if let Some(mut context) = context {
213210
let sender = &context.sender;
214211

215212
if let Some(value) = &packet.value {
@@ -235,21 +232,20 @@ impl Client {
235232
}
236233
}
237234

238-
if !context.context.is_persistent {
235+
if context.is_persistent {
239236
{
240237
let mut map = requests.lock().unwrap();
241-
map.remove(&opaque);
238+
map.insert(opaque, context.clone());
242239
}
243240
}
244241

245-
let resp = ClientResponse::new(packet, context.context.clone());
242+
let resp = ClientResponse::new(packet, context.context);
246243
match sender.send(Ok(resp)).await {
247244
Ok(_) => {}
248245
Err(e) => {
249246
debug!("Sending response to caller failed: {e}");
250247
}
251248
};
252-
drop(context);
253249
} else if let Some(ref orphan_handler) = opts.orphan_handler {
254250
orphan_handler(
255251
packet,
@@ -342,20 +338,15 @@ impl Dispatcher for Client {
342338
async fn dispatch<'a>(
343339
&self,
344340
mut packet: RequestPacket<'a>,
341+
is_persistent: bool,
345342
response_context: Option<ResponseContext>,
346343
) -> error::Result<ClientPendingOp> {
347344
let (response_tx, response_rx) = mpsc::channel(1);
348-
let context = response_context.unwrap_or(ResponseContext {
349-
cas: packet.cas,
350-
subdoc_info: None,
351-
is_persistent: false,
352-
scope_name: None,
353-
collection_name: None,
354-
});
355-
let is_persistent = context.is_persistent;
345+
356346
let opaque = self.register_handler(SenderContext {
357347
sender: response_tx,
358-
context: Arc::new(context),
348+
is_persistent,
349+
context: response_context,
359350
});
360351
packet.opaque = Some(opaque);
361352
let op_code = packet.op_code;

sdk/couchbase-core/src/memdx/client_response.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818

1919
use crate::memdx::client::ResponseContext;
2020
use crate::memdx::packet::ResponsePacket;
21-
use std::sync::Arc;
2221

2322
#[derive(Debug)]
2423
pub struct ClientResponse {
2524
packet: ResponsePacket,
26-
response_context: Arc<ResponseContext>,
25+
response_context: Option<ResponseContext>,
2726
}
2827

2928
impl ClientResponse {
30-
pub fn new(packet: ResponsePacket, response_context: Arc<ResponseContext>) -> Self {
29+
pub fn new(packet: ResponsePacket, response_context: Option<ResponseContext>) -> Self {
3130
Self {
3231
packet,
3332
response_context,
@@ -38,7 +37,7 @@ impl ClientResponse {
3837
self.packet
3938
}
4039

41-
pub fn response_context(&self) -> Arc<ResponseContext> {
42-
self.response_context.clone()
40+
pub fn response_context(&self) -> Option<&ResponseContext> {
41+
self.response_context.as_ref()
4342
}
4443
}

sdk/couchbase-core/src/memdx/dispatcher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub trait Dispatcher: Send + Sync {
4747
async fn dispatch<'a>(
4848
&self,
4949
packet: RequestPacket<'a>,
50+
is_persistent: bool,
5051
response_context: Option<ResponseContext>,
5152
) -> Result<ClientPendingOp>;
5253
async fn close(&self) -> Result<()>;

sdk/couchbase-core/src/memdx/ops_core.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ impl OpBootstrapEncoder for OpsCore {
114114
framing_extras: None,
115115
opaque: None,
116116
},
117+
false,
117118
None,
118119
)
119120
.await?;
@@ -145,6 +146,7 @@ impl OpBootstrapEncoder for OpsCore {
145146
framing_extras: None,
146147
opaque: None,
147148
},
149+
false,
148150
None,
149151
)
150152
.await?;
@@ -176,6 +178,7 @@ impl OpBootstrapEncoder for OpsCore {
176178
framing_extras: None,
177179
opaque: None,
178180
},
181+
false,
179182
None,
180183
)
181184
.await?;
@@ -215,6 +218,7 @@ impl OpBootstrapEncoder for OpsCore {
215218
framing_extras: None,
216219
opaque: None,
217220
},
221+
false,
218222
None,
219223
)
220224
.await?;
@@ -250,6 +254,7 @@ impl OpSASLPlainEncoder for OpsCore {
250254
framing_extras: None,
251255
opaque: None,
252256
},
257+
false,
253258
None,
254259
)
255260
.await?;
@@ -283,6 +288,7 @@ impl OpSASLAutoEncoder for OpsCore {
283288
framing_extras: None,
284289
opaque: None,
285290
},
291+
false,
286292
None,
287293
)
288294
.await?;
@@ -318,6 +324,7 @@ impl OpSASLScramEncoder for OpsCore {
318324
framing_extras: None,
319325
opaque: None,
320326
},
327+
false,
321328
None,
322329
)
323330
.await?;

sdk/couchbase-core/src/memdx/ops_crud.rs

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl OpsCrud {
9999
opaque: None,
100100
};
101101

102-
let pending_op = dispatcher.dispatch(packet, None).await?;
102+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
103103

104104
Ok(StandardPendingOp::new(pending_op))
105105
}
@@ -138,7 +138,7 @@ impl OpsCrud {
138138
opaque: None,
139139
};
140140

141-
let pending_op = dispatcher.dispatch(packet, None).await?;
141+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
142142

143143
Ok(StandardPendingOp::new(pending_op))
144144
}
@@ -181,7 +181,7 @@ impl OpsCrud {
181181
opaque: None,
182182
};
183183

184-
let pending_op = dispatcher.dispatch(packet, None).await?;
184+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
185185

186186
Ok(StandardPendingOp::new(pending_op))
187187
}
@@ -225,7 +225,7 @@ impl OpsCrud {
225225
opaque: None,
226226
};
227227

228-
let pending_op = dispatcher.dispatch(packet, None).await?;
228+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
229229

230230
Ok(StandardPendingOp::new(pending_op))
231231
}
@@ -267,7 +267,7 @@ impl OpsCrud {
267267
opaque: None,
268268
};
269269

270-
let pending_op = dispatcher.dispatch(packet, None).await?;
270+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
271271

272272
Ok(StandardPendingOp::new(pending_op))
273273
}
@@ -309,7 +309,7 @@ impl OpsCrud {
309309
opaque: None,
310310
};
311311

312-
let pending_op = dispatcher.dispatch(packet, None).await?;
312+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
313313

314314
Ok(StandardPendingOp::new(pending_op))
315315
}
@@ -348,7 +348,7 @@ impl OpsCrud {
348348
opaque: None,
349349
};
350350

351-
let pending_op = dispatcher.dispatch(packet, None).await?;
351+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
352352

353353
Ok(StandardPendingOp::new(pending_op))
354354
}
@@ -390,7 +390,7 @@ impl OpsCrud {
390390
opaque: None,
391391
};
392392

393-
let pending_op = dispatcher.dispatch(packet, None).await?;
393+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
394394

395395
Ok(StandardPendingOp::new(pending_op))
396396
}
@@ -438,7 +438,7 @@ impl OpsCrud {
438438
opaque: None,
439439
};
440440

441-
let pending_op = dispatcher.dispatch(packet, None).await?;
441+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
442442

443443
Ok(StandardPendingOp::new(pending_op))
444444
}
@@ -493,7 +493,7 @@ impl OpsCrud {
493493
opaque: None,
494494
};
495495

496-
let pending_op = dispatcher.dispatch(packet, None).await?;
496+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
497497

498498
Ok(StandardPendingOp::new(pending_op))
499499
}
@@ -537,7 +537,18 @@ impl OpsCrud {
537537
opaque: None,
538538
};
539539

540-
let pending_op = dispatcher.dispatch(packet, None).await?;
540+
let pending_op = dispatcher
541+
.dispatch(
542+
packet,
543+
false,
544+
Some(ResponseContext {
545+
cas: request.cas,
546+
subdoc_info: None,
547+
scope_name: None,
548+
collection_name: None,
549+
}),
550+
)
551+
.await?;
541552

542553
Ok(StandardPendingOp::new(pending_op))
543554
}
@@ -581,7 +592,18 @@ impl OpsCrud {
581592
opaque: None,
582593
};
583594

584-
let pending_op = dispatcher.dispatch(packet, None).await?;
595+
let pending_op = dispatcher
596+
.dispatch(
597+
packet,
598+
false,
599+
Some(ResponseContext {
600+
cas: request.cas,
601+
subdoc_info: None,
602+
scope_name: None,
603+
collection_name: None,
604+
}),
605+
)
606+
.await?;
585607

586608
Ok(StandardPendingOp::new(pending_op))
587609
}
@@ -649,7 +671,7 @@ impl OpsCrud {
649671
opaque: None,
650672
};
651673

652-
let pending_op = dispatcher.dispatch(packet, None).await?;
674+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
653675

654676
Ok(StandardPendingOp::new(pending_op))
655677
}
@@ -701,7 +723,7 @@ impl OpsCrud {
701723
opaque: None,
702724
};
703725

704-
let pending_op = dispatcher.dispatch(packet, None).await?;
726+
let pending_op = dispatcher.dispatch(packet, false, None).await?;
705727

706728
Ok(StandardPendingOp::new(pending_op))
707729
}
@@ -778,12 +800,13 @@ impl OpsCrud {
778800
flags: request.flags,
779801
op_count: request.ops.len() as u8,
780802
}),
781-
is_persistent: false,
782803
scope_name: None,
783804
collection_name: None,
784805
};
785806

786-
let pending_op = dispatcher.dispatch(packet, Some(response_context)).await?;
807+
let pending_op = dispatcher
808+
.dispatch(packet, false, Some(response_context))
809+
.await?;
787810

788811
Ok(StandardPendingOp::new(pending_op))
789812
}
@@ -888,12 +911,13 @@ impl OpsCrud {
888911
flags: request.flags,
889912
op_count: request.ops.len() as u8,
890913
}),
891-
is_persistent: false,
892914
scope_name: None,
893915
collection_name: None,
894916
};
895917

896-
let pending_op = dispatcher.dispatch(packet, Some(response_context)).await?;
918+
let pending_op = dispatcher
919+
.dispatch(packet, false, Some(response_context))
920+
.await?;
897921

898922
Ok(StandardPendingOp::new(pending_op))
899923
}

sdk/couchbase-core/src/memdx/ops_util.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ impl OpsUtil {
5353
framing_extras: None,
5454
opaque: None,
5555
},
56+
false,
5657
Some(ResponseContext {
5758
cas: None,
5859
subdoc_info: None,
59-
is_persistent: false,
6060
scope_name: Some(request.scope_name.to_string()),
6161
collection_name: Some(request.collection_name.to_string()),
6262
}),
@@ -88,6 +88,7 @@ impl OpsUtil {
8888
framing_extras: None,
8989
opaque: None,
9090
},
91+
false,
9192
None,
9293
)
9394
.await?;

0 commit comments

Comments
 (0)