Skip to content

Commit f8b5d38

Browse files
committed
refactor: support batch downgrade region instructions
Signed-off-by: WenyXu <wenymedia@gmail.com>
1 parent 4d5e0dd commit f8b5d38

File tree

5 files changed

+152
-122
lines changed

5 files changed

+152
-122
lines changed

src/common/meta/src/instruction.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ impl Display for RegionIdent {
5555
/// The result of downgrade leader region.
5656
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
5757
pub struct DowngradeRegionReply {
58+
#[serde(default)]
59+
pub region_id: RegionId,
5860
/// Returns the `last_entry_id` if available.
5961
pub last_entry_id: Option<u64>,
6062
/// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
@@ -423,8 +425,12 @@ pub enum Instruction {
423425
CloseRegions(Vec<RegionIdent>),
424426
/// Upgrades a region.
425427
UpgradeRegion(UpgradeRegion),
428+
#[serde(
429+
deserialize_with = "single_or_multiple_from",
430+
alias = "DowngradeRegion"
431+
)]
426432
/// Downgrades a region.
427-
DowngradeRegion(DowngradeRegion),
433+
DowngradeRegions(Vec<DowngradeRegion>),
428434
/// Invalidates batch cache.
429435
InvalidateCaches(Vec<CacheIdent>),
430436
/// Flushes regions.
@@ -457,9 +463,9 @@ impl Instruction {
457463
}
458464

459465
/// Converts the instruction into a [DowngradeRegion].
460-
pub fn into_downgrade_regions(self) -> Option<DowngradeRegion> {
466+
pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
461467
match self {
462-
Self::DowngradeRegion(downgrade_region) => Some(downgrade_region),
468+
Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
463469
_ => None,
464470
}
465471
}
@@ -502,7 +508,11 @@ pub enum InstructionReply {
502508
#[serde(alias = "close_region")]
503509
CloseRegions(SimpleReply),
504510
UpgradeRegion(UpgradeRegionReply),
505-
DowngradeRegion(DowngradeRegionReply),
511+
#[serde(
512+
alias = "downgrade_region",
513+
deserialize_with = "single_or_multiple_from"
514+
)]
515+
DowngradeRegions(Vec<DowngradeRegionReply>),
506516
FlushRegions(FlushRegionReply),
507517
}
508518

@@ -512,8 +522,8 @@ impl Display for InstructionReply {
512522
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
513523
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
514524
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
515-
Self::DowngradeRegion(reply) => {
516-
write!(f, "InstructionReply::DowngradeRegion({})", reply)
525+
Self::DowngradeRegions(reply) => {
526+
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
517527
}
518528
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
519529
}
@@ -543,9 +553,9 @@ impl InstructionReply {
543553
}
544554
}
545555

546-
pub fn expect_downgrade_region_reply(self) -> DowngradeRegionReply {
556+
pub fn expect_downgrade_region_reply(self) -> Vec<DowngradeRegionReply> {
547557
match self {
548-
Self::DowngradeRegion(reply) => reply,
558+
Self::DowngradeRegions(reply) => reply,
549559
_ => panic!("Expected DowngradeRegion reply"),
550560
}
551561
}

src/datanode/src/heartbeat/handler.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl RegionHeartbeatResponseHandler {
100100
open_region_parallelism: self.open_region_parallelism,
101101
})),
102102
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)),
103-
Instruction::DowngradeRegion(_) => Ok(Box::new(DowngradeRegionsHandler)),
103+
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
104104
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
105105
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
106106
}
@@ -112,7 +112,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
112112
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
113113
matches!(ctx.incoming_message.as_ref(), |Some((
114114
_,
115-
Instruction::DowngradeRegion { .. },
115+
Instruction::DowngradeRegions { .. },
116116
))| Some((
117117
_,
118118
Instruction::UpgradeRegion { .. }
@@ -242,10 +242,10 @@ mod tests {
242242
);
243243

244244
// Downgrade region
245-
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
245+
let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
246246
region_id: RegionId::new(2048, 1),
247247
flush_timeout: Some(Duration::from_secs(1)),
248-
});
248+
}]);
249249
assert!(
250250
heartbeat_handler
251251
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
@@ -440,44 +440,38 @@ mod tests {
440440
// Should be ok, if we try to downgrade it twice.
441441
for _ in 0..2 {
442442
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
443-
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
443+
let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
444444
region_id,
445445
flush_timeout: Some(Duration::from_secs(1)),
446-
});
446+
}]);
447447

448448
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
449449
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
450450
assert_matches!(control, HandleControl::Continue);
451451

452452
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
453453

454-
if let InstructionReply::DowngradeRegion(reply) = reply {
455-
assert!(reply.exists);
456-
assert!(reply.error.is_none());
457-
assert_eq!(reply.last_entry_id.unwrap(), 0);
458-
} else {
459-
unreachable!()
460-
}
454+
let reply = &reply.expect_downgrade_region_reply()[0];
455+
assert!(reply.exists);
456+
assert!(reply.error.is_none());
457+
assert_eq!(reply.last_entry_id.unwrap(), 0);
461458
}
462459

463460
// Downgrades a not exists region.
464461
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
465-
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
462+
let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
466463
region_id: RegionId::new(2048, 1),
467464
flush_timeout: Some(Duration::from_secs(1)),
468-
});
465+
}]);
469466
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
470467
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
471468
assert_matches!(control, HandleControl::Continue);
472469

473470
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
474471

475-
if let InstructionReply::DowngradeRegion(reply) = reply {
476-
assert!(!reply.exists);
477-
assert!(reply.error.is_none());
478-
assert!(reply.last_entry_id.is_none());
479-
} else {
480-
unreachable!()
481-
}
472+
let reply = reply.expect_downgrade_region_reply();
473+
assert!(!reply[0].exists);
474+
assert!(reply[0].error.is_none());
475+
assert!(reply[0].last_entry_id.is_none());
482476
}
483477
}

0 commit comments

Comments
 (0)