Skip to content

Commit baacc40

Browse files
committed
feat: support ALTER FRAGMENT SET PARALLELISM via SQL
Enable fine-grained scaling by allowing users to adjust parallelism for individual streaming fragments using SQL. Improves operational flexibility and user experience. - Add Protobuf and gRPC support for fragment parallelism changes - Extend SQL parser and AST for ALTER FRAGMENT SET PARALLELISM - Implement frontend handler and meta client logic for new operation - Update meta service, controller, and stream manager to reschedule fragments online with new parallelism - Enhance error handling and test utilities for new trait method Signed-off-by: Shanicky Chen <peng@risingwave-labs.com>
1 parent 0b71f1c commit baacc40

File tree

14 files changed

+209
-18
lines changed

14 files changed

+209
-18
lines changed

proto/ddl_service.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,13 @@ message AlterParallelismRequest {
287287

288288
message AlterParallelismResponse {}
289289

290+
message AlterFragmentParallelismRequest {
291+
uint32 fragment_id = 1;
292+
meta.TableParallelism parallelism = 2;
293+
}
294+
295+
message AlterFragmentParallelismResponse {}
296+
290297
message AlterCdcTableBackfillParallelismRequest {
291298
uint32 table_id = 1;
292299
meta.TableParallelism parallelism = 2;
@@ -619,6 +626,7 @@ service DdlService {
619626
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
620627
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
621628
rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse);
629+
rpc AlterFragmentParallelism(AlterFragmentParallelismRequest) returns (AlterFragmentParallelismResponse);
622630
rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse);
623631
rpc DropTable(DropTableRequest) returns (DropTableResponse);
624632
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);

src/frontend/src/handler/alter_parallelism.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,23 @@ pub async fn handle_alter_parallelism(
121121
Ok(builder.into())
122122
}
123123

124+
pub async fn handle_alter_fragment_parallelism(
125+
handler_args: HandlerArgs,
126+
fragment_id: u32,
127+
parallelism: SetVariableValue,
128+
) -> Result<RwPgResponse> {
129+
let session = handler_args.session;
130+
let target_parallelism = extract_table_parallelism(parallelism)?;
131+
132+
session
133+
.env()
134+
.meta_client()
135+
.alter_fragment_parallelism(fragment_id, target_parallelism)
136+
.await?;
137+
138+
Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
139+
}
140+
124141
fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
125142
let adaptive_parallelism = PbTableParallelism {
126143
parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),

src/frontend/src/handler/mod.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,17 +1255,27 @@ pub async fn handle(
12551255
} => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
12561256
Statement::AlterFragment {
12571257
fragment_id,
1258-
operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
1259-
} => {
1260-
alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1261-
&handler_args.session,
1262-
PbThrottleTarget::Fragment,
1263-
fragment_id,
1264-
rate_limit,
1265-
StatementType::SET_VARIABLE,
1266-
)
1267-
.await
1268-
}
1258+
operation,
1259+
} => match operation {
1260+
AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
1261+
alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
1262+
&handler_args.session,
1263+
PbThrottleTarget::Fragment,
1264+
fragment_id,
1265+
rate_limit,
1266+
StatementType::SET_VARIABLE,
1267+
)
1268+
.await
1269+
}
1270+
AlterFragmentOperation::SetParallelism { parallelism } => {
1271+
alter_parallelism::handle_alter_fragment_parallelism(
1272+
handler_args,
1273+
fragment_id,
1274+
parallelism,
1275+
)
1276+
.await
1277+
}
1278+
},
12691279
Statement::AlterDefaultPrivileges { .. } => {
12701280
handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
12711281
}

src/frontend/src/meta_client.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
3939
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
4040
use risingwave_pb::meta::{
4141
EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
42-
RefreshResponse,
42+
RefreshResponse, TableParallelism as PbTableParallelism,
4343
};
4444
use risingwave_pb::secret::PbSecretRef;
4545
use risingwave_rpc_client::error::Result;
@@ -132,6 +132,12 @@ pub trait FrontendMetaClient: Send + Sync {
132132
rate_limit: Option<u32>,
133133
) -> Result<()>;
134134

135+
async fn alter_fragment_parallelism(
136+
&self,
137+
fragment_id: u32,
138+
parallelism: PbTableParallelism,
139+
) -> Result<()>;
140+
135141
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
136142

137143
async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
@@ -353,6 +359,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
353359
.map(|_| ())
354360
}
355361

362+
async fn alter_fragment_parallelism(
363+
&self,
364+
fragment_id: u32,
365+
parallelism: PbTableParallelism,
366+
) -> Result<()> {
367+
self.0
368+
.alter_fragment_parallelism(fragment_id, parallelism)
369+
.await
370+
}
371+
356372
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
357373
self.0.get_cluster_recovery_status().await
358374
}

src/frontend/src/test_utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,6 +1166,14 @@ impl FrontendMetaClient for MockFrontendMetaClient {
11661166
unimplemented!()
11671167
}
11681168

1169+
async fn alter_fragment_parallelism(
1170+
&self,
1171+
_fragment_id: u32,
1172+
_parallelism: PbTableParallelism,
1173+
) -> RpcResult<()> {
1174+
unimplemented!()
1175+
}
1176+
11691177
async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
11701178
Ok(RecoveryStatus::StatusRunning)
11711179
}

src/meta/service/src/ddl_service.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,33 @@ impl DdlService for DdlServiceImpl {
10341034
Ok(Response::new(AlterParallelismResponse {}))
10351035
}
10361036

1037+
async fn alter_fragment_parallelism(
1038+
&self,
1039+
request: Request<AlterFragmentParallelismRequest>,
1040+
) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1041+
let req = request.into_inner();
1042+
1043+
let fragment_id = req.fragment_id;
1044+
let parallelism = *req.get_parallelism()?;
1045+
1046+
let parallelism = match parallelism.get_parallelism()? {
1047+
Parallelism::Fixed(FixedParallelism { parallelism }) => {
1048+
StreamingParallelism::Fixed(*parallelism as _)
1049+
}
1050+
Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1051+
_ => bail_unavailable!(),
1052+
};
1053+
1054+
self.ddl_controller
1055+
.reschedule_fragment(
1056+
fragment_id,
1057+
ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1058+
)
1059+
.await?;
1060+
1061+
Ok(Response::new(AlterFragmentParallelismResponse {}))
1062+
}
1063+
10371064
/// Auto schema change for cdc sources,
10381065
/// called by the source parser when a schema change is detected.
10391066
async fn auto_schema_change(

src/meta/src/rpc/ddl_controller.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,17 @@ impl DdlController {
520520
.await
521521
}
522522

523+
pub async fn reschedule_fragment(
524+
&self,
525+
fragment_id: u32,
526+
target: ReschedulePolicy,
527+
) -> MetaResult<()> {
528+
tracing::info!("altering parallelism for fragment {}", fragment_id);
529+
self.stream_manager
530+
.reschedule_fragment(fragment_id, target)
531+
.await
532+
}
533+
523534
async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
524535
self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade)
525536
.await

src/meta/src/stream/scale.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::controller::scale::{
4545
FragmentRenderMap, NoShuffleEnsemble, RenderedGraph, WorkerInfo,
4646
find_fragment_no_shuffle_dags_detailed, render_fragments, render_jobs,
4747
};
48+
use crate::error::bail_invalid_parameter;
4849
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
4950
use crate::model::{ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers};
5051
use crate::serving::{
@@ -459,8 +460,9 @@ impl ScaleController {
459460

460461
let parallelism = match parallelisms.as_slice() {
461462
[] => {
462-
bail!(
463-
"no reschedule policy specified for fragments in the no-shuffle ensemble: {:?}",
463+
bail_invalid_parameter!(
464+
"none of the entry fragments {:?} were included in the reschedule request; \
465+
provide at least one entry fragment id",
464466
entry_fragment_ids
465467
);
466468
}

src/meta/src/stream/stream_manager.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,45 @@ impl GlobalStreamManager {
868868
Ok(())
869869
}
870870

871+
pub(crate) async fn reschedule_fragment(
872+
&self,
873+
fragment_id: u32,
874+
target: ReschedulePolicy,
875+
) -> MetaResult<()> {
876+
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
877+
878+
let parallelism_policy = match target {
879+
ReschedulePolicy::Parallelism(policy) => policy,
880+
_ => bail_invalid_parameter!("fragment reschedule only supports parallelism targets"),
881+
};
882+
883+
let worker_nodes = self
884+
.metadata_manager
885+
.list_active_streaming_compute_nodes()
886+
.await?
887+
.into_iter()
888+
.filter(|w| w.is_streaming_schedulable())
889+
.collect_vec();
890+
let workers = worker_nodes.into_iter().map(|x| (x.id as i32, x)).collect();
891+
892+
let fragment_policy = HashMap::from([(fragment_id as _, parallelism_policy.clone())]);
893+
894+
let commands = self
895+
.scale_controller
896+
.reschedule_fragment_inplace(fragment_policy, workers)
897+
.await?;
898+
899+
let _source_pause_guard = self.source_manager.pause_tick().await;
900+
901+
for (database_id, command) in commands {
902+
self.barrier_scheduler
903+
.run_command(database_id, command)
904+
.await?;
905+
}
906+
907+
Ok(())
908+
}
909+
871910
// Don't need to add actor, just send a command
872911
pub async fn create_subscription(
873912
self: &Arc<Self>,

src/rpc_client/src/meta_client.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,20 @@ impl MetaClient {
651651
Ok(())
652652
}
653653

654+
pub async fn alter_fragment_parallelism(
655+
&self,
656+
fragment_id: u32,
657+
parallelism: PbTableParallelism,
658+
) -> Result<()> {
659+
let request = AlterFragmentParallelismRequest {
660+
fragment_id,
661+
parallelism: Some(parallelism),
662+
};
663+
664+
self.inner.alter_fragment_parallelism(request).await?;
665+
Ok(())
666+
}
667+
654668
pub async fn alter_cdc_table_backfill_parallelism(
655669
&self,
656670
table_id: u32,
@@ -2408,6 +2422,7 @@ macro_rules! for_all_meta_rpc {
24082422
,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
24092423
,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse }
24102424
,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse }
2425+
,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse }
24112426
,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse }
24122427
,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse }
24132428
,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse }

0 commit comments

Comments
 (0)