Skip to content

Commit 88e78a7

Browse files
committed
aft rebase fix
Signed-off-by: discord9 <discord9@163.com>
1 parent b6e4d9b commit 88e78a7

File tree

5 files changed

+63
-33
lines changed

5 files changed

+63
-33
lines changed

src/common/meta/src/instruction.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,20 @@ impl Instruction {
565565
_ => None,
566566
}
567567
}
568+
569+
pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
570+
match self {
571+
Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
572+
_ => None,
573+
}
574+
}
575+
576+
pub fn into_gc_regions(self) -> Option<GcRegions> {
577+
match self {
578+
Self::GcRegions(gc_regions) => Some(gc_regions),
579+
_ => None,
580+
}
581+
}
568582
}
569583

570584
/// The reply of [UpgradeRegion].

src/datanode/src/heartbeat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ impl HeartbeatTask {
321321
let gc_stat = gc_limiter.gc_stat();
322322
gc_stat.into_extensions(&mut extensions);
323323

324-
let req = HeartbeatRequest {
324+
let mut req = HeartbeatRequest {
325325
region_stats,
326326
topic_stats,
327327
duration_since_epoch,

src/datanode/src/heartbeat/handler.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use common_meta::heartbeat::handler::{
2020
use common_meta::instruction::{Instruction, InstructionReply};
2121
use common_telemetry::error;
2222
use snafu::OptionExt;
23-
use store_api::storage::{GcReport, RegionId};
23+
use store_api::storage::GcReport;
2424

2525
mod close_region;
2626
mod downgrade_region;
@@ -32,7 +32,9 @@ mod upgrade_region;
3232

3333
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
3434
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
35+
use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
3536
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
37+
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
3638
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
3739
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
3840
use crate::heartbeat::task_tracker::TaskTracker;
@@ -110,12 +112,8 @@ impl RegionHeartbeatResponseHandler {
110112
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
111113
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
112114
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
113-
Instruction::GetFileRefs(get_file_refs) => Ok(Box::new(move |handler_context| {
114-
Box::pin(handler_context.handle_get_file_refs_instruction(get_file_refs))
115-
})),
116-
Instruction::GcRegions(gc_regions) => Ok(Box::new(move |handler_context| {
117-
Box::pin(handler_context.handle_gc_regions_instruction(gc_regions))
118-
})),
115+
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler)),
116+
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler)),
119117
}
120118
}
121119
}
@@ -149,14 +147,18 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
149147
let gc_tasks = self.gc_tasks.clone();
150148
let handler = self.build_handler(&instruction)?;
151149
let _handle = common_runtime::spawn_global(async move {
152-
let reply = handler(HandlerContext {
153-
region_server,
154-
catchup_tasks,
155-
downgrade_tasks,
156-
flush_tasks,
157-
gc_tasks,
158-
})
159-
.await;
150+
let reply = handler
151+
.handle(
152+
&HandlerContext {
153+
region_server,
154+
catchup_tasks,
155+
downgrade_tasks,
156+
flush_tasks,
157+
gc_tasks,
158+
},
159+
instruction,
160+
)
161+
.await;
160162

161163
if let Some(reply) = reply
162164
&& let Err(e) = mailbox.send((meta, reply)).await

src/datanode/src/heartbeat/handler/file_ref.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,22 @@
1313
// limitations under the License.
1414

1515
use common_error::ext::ErrorExt;
16-
use common_meta::instruction::{GetFileRefs, GetFileRefsReply, InstructionReply};
16+
use common_meta::instruction::{GetFileRefsReply, Instruction, InstructionReply};
1717
use store_api::storage::FileRefsManifest;
1818

19-
use crate::heartbeat::handler::HandlerContext;
20-
impl HandlerContext {
21-
/// Handles GetFileRefs instruction by getting file references from MitoEngine.
22-
pub(crate) async fn handle_get_file_refs_instruction(
23-
self,
24-
get_file_refs: GetFileRefs,
19+
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20+
21+
pub struct GetFileRefsHandler;
22+
23+
#[async_trait::async_trait]
24+
impl InstructionHandler for GetFileRefsHandler {
25+
async fn handle(
26+
&self,
27+
ctx: &HandlerContext,
28+
instruction: Instruction,
2529
) -> Option<InstructionReply> {
26-
let region_server = self.region_server;
30+
let get_file_refs = instruction.into_get_file_refs().unwrap();
31+
let region_server = &ctx.region_server;
2732

2833
// Get the MitoEngine
2934
let Some(mito_engine) = region_server.mito_engine() else {

src/datanode/src/heartbeat/handler/gc_worker.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,25 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
15+
use common_meta::instruction::{GcRegionsReply, Instruction, InstructionReply};
1616
use common_telemetry::{debug, warn};
1717
use mito2::gc::LocalGcWorker;
1818
use snafu::{OptionExt, ResultExt};
1919
use store_api::storage::{FileRefsManifest, RegionId};
2020

2121
use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
22-
use crate::heartbeat::handler::HandlerContext;
22+
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
2323

24-
impl HandlerContext {
25-
pub(crate) async fn handle_gc_regions_instruction(
26-
self,
27-
gc_regions: GcRegions,
24+
pub struct GcRegionsHandler;
25+
26+
#[async_trait::async_trait]
27+
impl InstructionHandler for GcRegionsHandler {
28+
async fn handle(
29+
&self,
30+
ctx: &HandlerContext,
31+
instruction: Instruction,
2832
) -> Option<InstructionReply> {
33+
let gc_regions = instruction.into_gc_regions().unwrap();
2934
let region_ids = gc_regions.regions.clone();
3035
debug!("Received gc regions instruction: {:?}", region_ids);
3136

@@ -45,6 +50,7 @@ impl HandlerContext {
4550

4651
let (region_id, gc_worker) = match self
4752
.create_gc_worker(
53+
ctx,
4854
region_ids,
4955
&gc_regions.file_refs_manifest,
5056
gc_regions.full_file_listing,
@@ -59,7 +65,7 @@ impl HandlerContext {
5965
}
6066
};
6167

62-
let register_result = self
68+
let register_result = ctx
6369
.gc_tasks
6470
.try_register(
6571
region_id,
@@ -83,7 +89,7 @@ impl HandlerContext {
8389
}));
8490
}
8591
let mut watcher = register_result.into_watcher();
86-
let result = self.gc_tasks.wait_until_finish(&mut watcher).await;
92+
let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
8793
match result {
8894
Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
8995
result: Ok(report),
@@ -93,16 +99,19 @@ impl HandlerContext {
9399
})),
94100
}
95101
}
102+
}
96103

104+
impl GcRegionsHandler {
97105
async fn create_gc_worker(
98106
&self,
107+
ctx: &HandlerContext,
99108
mut region_ids: Vec<RegionId>,
100109
file_ref_manifest: &FileRefsManifest,
101110
full_file_listing: bool,
102111
) -> Result<(RegionId, LocalGcWorker)> {
103112
// always use the smallest region id on datanode as the target region id
104113
region_ids.sort_by_key(|r| r.region_number());
105-
let mito_engine = self
114+
let mito_engine = ctx
106115
.region_server
107116
.mito_engine()
108117
.with_context(|| UnexpectedSnafu {

0 commit comments

Comments
 (0)