Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 153 additions & 6 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl Display for RegionIdent {
/// The result of downgrade leader region.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct DowngradeRegionReply {
/// The [RegionId].
/// For compatibility, it is defaulted to [RegionId::new(0, 0)].
#[serde(default)]
pub region_id: RegionId,
/// Returns the `last_entry_id` if available.
pub last_entry_id: Option<u64>,
/// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
Expand Down Expand Up @@ -423,14 +427,60 @@ pub enum Instruction {
CloseRegions(Vec<RegionIdent>),
/// Upgrades a region.
UpgradeRegion(UpgradeRegion),
#[serde(
deserialize_with = "single_or_multiple_from",
alias = "DowngradeRegion"
)]
/// Downgrades a region.
DowngradeRegion(DowngradeRegion),
DowngradeRegions(Vec<DowngradeRegion>),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegions(FlushRegions),
}

impl Instruction {
/// Converts the instruction into a vector of [OpenRegion].
pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
match self {
Self::OpenRegions(open_regions) => Some(open_regions),
_ => None,
}
}

/// Converts the instruction into a vector of [RegionIdent].
pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
match self {
Self::CloseRegions(close_regions) => Some(close_regions),
_ => None,
}
}

/// Converts the instruction into a [FlushRegions].
pub fn into_flush_regions(self) -> Option<FlushRegions> {
match self {
Self::FlushRegions(flush_regions) => Some(flush_regions),
_ => None,
}
}

/// Converts the instruction into a [DowngradeRegion].
pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
match self {
Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
_ => None,
}
}

/// Converts the instruction into a [UpgradeRegion].
pub fn into_upgrade_regions(self) -> Option<UpgradeRegion> {
match self {
Self::UpgradeRegion(upgrade_region) => Some(upgrade_region),
_ => None,
}
}
}

/// The reply of [UpgradeRegion].
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
Expand All @@ -452,6 +502,39 @@ impl Display for UpgradeRegionReply {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct DowngradeRegionsReply {
pub replies: Vec<DowngradeRegionReply>,
}

impl DowngradeRegionsReply {
pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
Self { replies }
}

pub fn single(reply: DowngradeRegionReply) -> Self {
Self::new(vec![reply])
}
}

#[derive(Deserialize)]
#[serde(untagged)]
enum DowngradeRegionsCompat {
Single(DowngradeRegionReply),
Multiple(DowngradeRegionsReply),
}

fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
where
D: Deserializer<'de>,
{
let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
Ok(match helper {
DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
DowngradeRegionsCompat::Multiple(reply) => reply,
})
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
Expand All @@ -460,7 +543,11 @@ pub enum InstructionReply {
#[serde(alias = "close_region")]
CloseRegions(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
#[serde(
alias = "downgrade_region",
deserialize_with = "downgrade_regions_compat_from"
)]
DowngradeRegions(DowngradeRegionsReply),
FlushRegions(FlushRegionReply),
}

Expand All @@ -470,8 +557,8 @@ impl Display for InstructionReply {
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
Self::DowngradeRegions(reply) => {
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
}
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
}
Expand All @@ -493,6 +580,27 @@ impl InstructionReply {
_ => panic!("Expected OpenRegions reply"),
}
}

pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply {
match self {
Self::UpgradeRegion(reply) => reply,
_ => panic!("Expected UpgradeRegion reply"),
}
}

pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
match self {
Self::DowngradeRegions(reply) => reply.replies,
_ => panic!("Expected DowngradeRegion reply"),
}
}

pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
match self {
Self::FlushRegions(reply) => reply,
_ => panic!("Expected FlushRegions reply"),
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -532,11 +640,27 @@ mod tests {
r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
serialized
);

let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single(
DowngradeRegionReply {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: None,
},
));

let serialized = serde_json::to_string(&downgrade_region).unwrap();
assert_eq!(
r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
serialized
)
}

#[test]
fn test_deserialize_instruction() {
let open_region_instruction = r#"{"OpenRegion":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#;
let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
let open_region_instruction: Instruction =
serde_json::from_str(open_region_instruction).unwrap();
let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
Expand All @@ -553,7 +677,7 @@ mod tests {
)]);
assert_eq!(open_region_instruction, open_region);

let close_region_instruction = r#"{"CloseRegion":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#;
let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
let close_region_instruction: Instruction =
serde_json::from_str(close_region_instruction).unwrap();
let close_region = Instruction::CloseRegions(vec![RegionIdent {
Expand All @@ -564,6 +688,15 @@ mod tests {
}]);
assert_eq!(close_region_instruction, close_region);

let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
let downgrade_region_instruction: Instruction =
serde_json::from_str(downgrade_region_instruction).unwrap();
let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
region_id: RegionId::new(1024, 1),
flush_timeout: Some(Duration::from_millis(1000)),
}]);
assert_eq!(downgrade_region_instruction, downgrade_region);

let close_region_instruction_reply =
r#"{"result":true,"error":null,"type":"close_region"}"#;
let close_region_instruction_reply: InstructionReply =
Expand All @@ -582,6 +715,20 @@ mod tests {
error: None,
});
assert_eq!(open_region_instruction_reply, open_region_reply);

let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
let downgrade_region_instruction_reply: InstructionReply =
serde_json::from_str(downgrade_region_instruction_reply).unwrap();
let downgrade_region_reply = InstructionReply::DowngradeRegions(
DowngradeRegionsReply::single(DowngradeRegionReply {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: None,
}),
);
assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Loading