diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 5e6251f1f40f2..b7266d5ee7c4a 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -312,6 +312,15 @@ message AlterParallelismRequest { message AlterParallelismResponse {} +message AlterFragmentParallelismRequest { + repeated uint32 fragment_ids = 1; + + // When `parallelism` is `None`, it indicates resetting to the job's parallelism. + optional meta.TableParallelism parallelism = 2; +} + +message AlterFragmentParallelismResponse {} + message AlterCdcTableBackfillParallelismRequest { uint32 table_id = 1; meta.TableParallelism parallelism = 2; @@ -645,6 +654,7 @@ service DdlService { rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse); rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse); rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse); + rpc AlterFragmentParallelism(AlterFragmentParallelismRequest) returns (AlterFragmentParallelismResponse); rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse); rpc DropTable(DropTableRequest) returns (DropTableResponse); rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse); diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 736e7b1ffbe61..01342dc8a3ec5 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -121,6 +121,23 @@ pub async fn handle_alter_parallelism( Ok(builder.into()) } +pub async fn handle_alter_fragment_parallelism( + handler_args: HandlerArgs, + fragment_ids: Vec, + parallelism: SetVariableValue, +) -> Result { + let session = handler_args.session; + let target_parallelism = extract_fragment_parallelism(parallelism)?; + + session + .env() + .meta_client() + .alter_fragment_parallelism(fragment_ids, target_parallelism) + .await?; + + Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into()) +} + fn extract_table_parallelism(parallelism: SetVariableValue) -> Result { let adaptive_parallelism = PbTableParallelism { parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})), @@ -166,3 +183,10 @@ fn extract_table_parallelism(parallelism: SetVariableValue) -> Result Result> { + match parallelism { + SetVariableValue::Default => Ok(None), + other => extract_table_parallelism(other).map(Some), + } +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index b22455670504b..f9a58c50348a7 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -1254,18 +1254,35 @@ pub async fn handle( operation, } => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await, Statement::AlterFragment { - fragment_id, - operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit }, - } => { - alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id( - &handler_args.session, - PbThrottleTarget::Fragment, - fragment_id, - rate_limit, - StatementType::SET_VARIABLE, - ) - .await - } + fragment_ids, + operation, + } => match operation { + AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => { + let [fragment_id] = fragment_ids.as_slice() else { + return Err(ErrorCode::InvalidInputSyntax( + "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id" + .to_owned(), + ) + .into()); + }; + alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id( + &handler_args.session, + PbThrottleTarget::Fragment, + *fragment_id, + rate_limit, + StatementType::SET_VARIABLE, + ) + .await + } + AlterFragmentOperation::SetParallelism { parallelism } => { + alter_parallelism::handle_alter_fragment_parallelism( + handler_args, + fragment_ids, + parallelism, + ) + .await + } + }, Statement::AlterDefaultPrivileges { .. } => { handle_privilege::handle_alter_default_privileges(handler_args, stmt).await } diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 6ceafe8a9d82c..4ab5bc4e12f9b 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -38,8 +38,8 @@ use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{ - EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest, - RefreshResponse, + EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus, + RefreshRequest, RefreshResponse, }; use risingwave_pb::secret::PbSecretRef; use risingwave_rpc_client::error::Result; @@ -132,6 +132,12 @@ pub trait FrontendMetaClient: Send + Sync { rate_limit: Option, ) -> Result<()>; + async fn alter_fragment_parallelism( + &self, + fragment_ids: Vec, + parallelism: Option, + ) -> Result<()>; + async fn get_cluster_recovery_status(&self) -> Result; async fn get_cluster_limits(&self) -> Result>; @@ -353,6 +359,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl { .map(|_| ()) } + async fn alter_fragment_parallelism( + &self, + fragment_ids: Vec, + parallelism: Option, + ) -> Result<()> { + self.0 + .alter_fragment_parallelism(fragment_ids, parallelism) + .await + } + async fn get_cluster_recovery_status(&self) -> Result { self.0.get_cluster_recovery_status().await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 603e2a9eee185..3a78b079da9f3 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1177,6 +1177,14 @@ impl FrontendMetaClient for MockFrontendMetaClient { unimplemented!() } + async fn alter_fragment_parallelism( + &self, + _fragment_ids: Vec, + _parallelism: Option, + ) -> RpcResult<()> { + unimplemented!() + } + async fn get_cluster_recovery_status(&self) -> RpcResult { Ok(RecoveryStatus::StatusRunning) } diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 209b535743401..38e3bd38a6578 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -1041,6 +1041,52 @@ impl DdlService for DdlServiceImpl { Ok(Response::new(AlterParallelismResponse {})) } + async fn alter_fragment_parallelism( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let fragment_ids = req.fragment_ids; + if fragment_ids.is_empty() { + return Err(Status::invalid_argument( + "at least one fragment id must be provided", + )); + } + + let parallelism = match req.parallelism { + Some(parallelism) => { + let streaming_parallelism = match parallelism.get_parallelism()? { + Parallelism::Fixed(FixedParallelism { parallelism }) => { + StreamingParallelism::Fixed(*parallelism as _) + } + Parallelism::Auto(_) | Parallelism::Adaptive(_) => { + StreamingParallelism::Adaptive + } + _ => bail_unavailable!(), + }; + Some(streaming_parallelism) + } + None => None, + }; + + let fragment_targets = fragment_ids + .into_iter() + .map(|fragment_id| { + ( + fragment_id as risingwave_meta_model::FragmentId, + parallelism.clone(), + ) + }) + .collect(); + + self.ddl_controller + .reschedule_fragments(fragment_targets) + .await?; + + Ok(Response::new(AlterFragmentParallelismResponse {})) + } + /// Auto schema change for cdc sources, /// called by the source parser when a schema change is detected. async fn auto_schema_change( diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index fb19fd8b28f02..90ef8bb23ca43 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -43,7 +43,8 @@ use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity}; use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::{ ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId, - SchemaId, SecretId, SinkId, SourceId, SubscriptionId, UserId, ViewId, WorkerId, + SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId, + WorkerId, }; use risingwave_pb::catalog::{ Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source, @@ -81,8 +82,9 @@ use crate::manager::{ NotificationVersion, StreamingJob, StreamingJobType, }; use crate::model::{ - DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, StreamContext, - StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism, + DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, + FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate, + TableParallelism, }; use crate::stream::cdc::{ is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits, @@ -520,6 +522,24 @@ impl DdlController { .await } + pub async fn reschedule_fragments( + &self, + fragment_targets: HashMap>, + ) -> MetaResult<()> { + tracing::info!( + "altering parallelism for fragments {:?}", + fragment_targets.keys() + ); + let fragment_targets = fragment_targets + .into_iter() + .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism)) + .collect(); + + self.stream_manager + .reschedule_fragments(fragment_targets) + .await + } + async fn drop_database(&self, database_id: DatabaseId) -> MetaResult { self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade) .await diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 933924512250a..09b237899b94d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,6 +41,7 @@ use crate::controller::scale::{ FragmentRenderMap, NoShuffleEnsemble, RenderedGraph, WorkerInfo, find_fragment_no_shuffle_dags_detailed, render_fragments, render_jobs, }; +use crate::error::bail_invalid_parameter; use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager}; use crate::model::{ ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, StreamContext, @@ -357,7 +358,7 @@ impl ScaleController { pub async fn reschedule_fragment_inplace( &self, - policy: HashMap, + policy: HashMap>, workers: HashMap, ) -> MetaResult> { if policy.is_empty() { @@ -394,37 +395,53 @@ impl ScaleController { for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? { let entry_fragment_ids = ensemble.entry_fragments().collect_vec(); - let parallelisms = entry_fragment_ids + let desired_parallelism = match entry_fragment_ids .iter() - .filter_map(|fragment_id| policy.get(fragment_id)) + .filter_map(|fragment_id| policy.get(fragment_id).cloned()) .dedup() - .collect_vec(); - - let parallelism = match parallelisms.as_slice() { + .collect_vec() + .as_slice() + { [] => { - bail!( - "no reschedule policy specified for fragments in the no-shuffle ensemble: {:?}", + bail_invalid_parameter!( + "none of the entry fragments {:?} were included in the reschedule request; \ + provide at least one entry fragment id", entry_fragment_ids ); } - [policy] => &policy.parallelism, - _ => { + [parallelism] => parallelism.clone(), + parallelisms => { bail!( "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}", parallelisms - .iter() - .map(|policy| &policy.parallelism) - .collect_vec() ); } }; - for fragment_id in entry_fragment_ids { - let fragment = fragment::ActiveModel { - fragment_id: Set(fragment_id), - parallelism: Set(Some(parallelism.clone())), - ..Default::default() - }; + let fragments = Fragment::find() + .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids)) + .all(&txn) + .await?; + + debug_assert!( + fragments + .iter() + .map(|fragment| fragment.parallelism.as_ref()) + .all_equal(), + "entry fragments in the same ensemble should share the same parallelism" + ); + + let current_parallelism = fragments + .first() + .and_then(|fragment| fragment.parallelism.clone()); + + if current_parallelism == desired_parallelism { + continue; + } + + for fragment in fragments { + let mut fragment = fragment.into_active_model(); + fragment.parallelism = Set(desired_parallelism.clone()); fragment.update(&txn).await?; } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d52a75674d270..85f43f85b1176 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -820,7 +820,50 @@ impl GlobalStreamManager { } }; - let fragment_policy = HashMap::from([(cdc_fragment_id, parallelism_policy.clone())]); + let fragment_policy = HashMap::from([( + cdc_fragment_id, + Some(parallelism_policy.parallelism.clone()), + )]); + + let commands = self + .scale_controller + .reschedule_fragment_inplace(fragment_policy, workers) + .await?; + + let _source_pause_guard = self.source_manager.pause_tick().await; + + for (database_id, command) in commands { + self.barrier_scheduler + .run_command(database_id, command) + .await?; + } + + Ok(()) + } + + pub(crate) async fn reschedule_fragments( + &self, + fragment_targets: HashMap>, + ) -> MetaResult<()> { + if fragment_targets.is_empty() { + return Ok(()); + } + + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + + let workers = self + .metadata_manager + .list_active_streaming_compute_nodes() + .await? + .into_iter() + .filter(|w| w.is_streaming_schedulable()) + .map(|worker| (worker.id as i32, worker)) + .collect(); + + let fragment_policy = fragment_targets + .into_iter() + .map(|(fragment_id, parallelism)| (fragment_id as _, parallelism)) + .collect(); let commands = self .scale_controller diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 97ddf55fb964d..9288cbc0f9773 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -651,6 +651,20 @@ impl MetaClient { Ok(()) } + pub async fn alter_fragment_parallelism( + &self, + fragment_ids: Vec, + parallelism: Option, + ) -> Result<()> { + let request = AlterFragmentParallelismRequest { + fragment_ids, + parallelism, + }; + + self.inner.alter_fragment_parallelism(request).await?; + Ok(()) + } + pub async fn alter_cdc_table_backfill_parallelism( &self, table_id: u32, @@ -2429,6 +2443,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse } ,{ ddl_client, alter_set_schema, AlterSetSchemaRequest, AlterSetSchemaResponse } ,{ ddl_client, alter_parallelism, AlterParallelismRequest, AlterParallelismResponse } + ,{ ddl_client, alter_fragment_parallelism, AlterFragmentParallelismRequest, AlterFragmentParallelismResponse } ,{ ddl_client, alter_cdc_table_backfill_parallelism, AlterCdcTableBackfillParallelismRequest, AlterCdcTableBackfillParallelismResponse } ,{ ddl_client, alter_resource_group, AlterResourceGroupRequest, AlterResourceGroupResponse } ,{ ddl_client, alter_database_param, AlterDatabaseParamRequest, AlterDatabaseParamResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index c82966ba58ed8..41b082c4a6636 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -282,6 +282,7 @@ pub enum AlterSecretOperation { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum AlterFragmentOperation { AlterBackfillRateLimit { rate_limit: i32 }, + SetParallelism { parallelism: SetVariableValue }, } impl fmt::Display for AlterDatabaseOperation { @@ -679,6 +680,9 @@ impl fmt::Display for AlterFragmentOperation { AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterFragmentOperation::SetParallelism { parallelism } => { + write!(f, "SET PARALLELISM TO {}", parallelism) + } } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index ef885b37ea937..fb8aac3369851 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1522,7 +1522,7 @@ pub enum Statement { }, /// ALTER FRAGMENT AlterFragment { - fragment_id: u32, + fragment_ids: Vec, operation: AlterFragmentOperation, }, /// DESCRIBE relation @@ -2515,10 +2515,15 @@ impl Statement { Ok(()) } Statement::AlterFragment { - fragment_id, + fragment_ids, operation, } => { - write!(f, "ALTER FRAGMENT {} {}", fragment_id, operation) + write!( + f, + "ALTER FRAGMENT {} {}", + display_comma_separated(fragment_ids), + operation + ) } Statement::AlterDefaultPrivileges { target_users, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index c8abce92c7cb2..875e7e8ce20bf 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3810,14 +3810,25 @@ impl Parser<'_> { } pub fn parse_alter_fragment(&mut self) -> ModalResult { - let fragment_id = self.parse_literal_u32()?; + let mut fragment_ids = vec![self.parse_literal_u32()?]; + while self.consume_token(&Token::Comma) { + fragment_ids.push(self.parse_literal_u32()?); + } if !self.parse_keyword(Keyword::SET) { return self.expected("SET after ALTER FRAGMENT"); } - let rate_limit = self.parse_alter_fragment_rate_limit()?; - let operation = AlterFragmentOperation::AlterBackfillRateLimit { rate_limit }; + let operation = if self.parse_keyword(Keyword::PARALLELISM) { + if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { + return self.expected("TO or = after ALTER FRAGMENT SET PARALLELISM"); + } + let parallelism = self.parse_set_variable()?; + AlterFragmentOperation::SetParallelism { parallelism } + } else { + let rate_limit = self.parse_alter_fragment_rate_limit()?; + AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } + }; Ok(Statement::AlterFragment { - fragment_id, + fragment_ids, operation, }) } diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index e404b8ececd8d..e1f37dc6623f8 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -4137,3 +4137,64 @@ fn parse_window_clause() { let ast = parse_sql_statements(sql).unwrap(); assert_eq!(ast.len(), 1); } + +#[test] +fn parse_alter_fragment_set_parallelism() { + match verified_stmt("ALTER FRAGMENT 1 SET PARALLELISM TO 4") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![1]); + match operation { + AlterFragmentOperation::SetParallelism { parallelism } => { + assert_eq!( + parallelism, + SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number( + "4".into() + ))) + ); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } + + match verified_stmt("ALTER FRAGMENT 2 SET PARALLELISM TO DEFAULT") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![2]); + match operation { + AlterFragmentOperation::SetParallelism { parallelism } => { + assert_eq!(parallelism, SetVariableValue::Default); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } + + match verified_stmt("ALTER FRAGMENT 1, 2, 3 SET PARALLELISM TO 8") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![1, 2, 3]); + match operation { + AlterFragmentOperation::SetParallelism { parallelism } => { + assert_eq!( + parallelism, + SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number( + "8".into() + ))) + ); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } +} diff --git a/src/tests/simulation/tests/integration_tests/scale/alter_fragment.rs b/src/tests/simulation/tests/integration_tests/scale/alter_fragment.rs new file mode 100644 index 0000000000000..cb860fd5843ca --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/scale/alter_fragment.rs @@ -0,0 +1,132 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::Result; +use madsim::time::sleep; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::ctl_ext::predicate::identity_contains; +use risingwave_simulation::utils::AssertResult; + +#[tokio::test] +async fn test_alter_fragment_no_shuffle() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; + let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; + cluster.run("create table t1 (c1 int, c2 int);").await?; + let upstream_fragment = cluster + .locate_one_fragment([identity_contains("materialize")]) + .await?; + + let upstream_fragment_id = upstream_fragment.id(); + cluster + .run("create materialized view m as select * from t1;") + .await?; + + let downstream_fragment = cluster + .locate_one_fragment([identity_contains("StreamTableScan")]) + .await?; + + let downstream_fragment_id = downstream_fragment.id(); + + let new_parallelism = default_parallelism + 1; + + assert!( + cluster + .run(&format!( + "alter fragment {downstream_fragment_id} set parallelism = {new_parallelism};" + )) + .await + .is_err() + ); + + cluster + .run(&format!( + "alter fragment {upstream_fragment_id} set parallelism = {new_parallelism};" + )) + .await?; + + cluster + .run(&format!( + "select parallelism from rw_fragment_parallelism where fragment_id = {upstream_fragment_id};" + )) + .await? + .assert_result_eq(format!("{new_parallelism}")); + + cluster + .run(&format!( + "select parallelism from rw_fragment_parallelism where fragment_id = {downstream_fragment_id};" + )) + .await? + .assert_result_eq(format!("{new_parallelism}")); + + Ok(()) +} + +#[tokio::test] +async fn test_alter_fragment() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let default_parallelism = cluster.config().compute_nodes * cluster.config().compute_node_cores; + cluster.run("create table t1 (c1 int, c2 int);").await?; + let materialize_fragment = cluster + .locate_one_fragment([identity_contains("materialize")]) + .await?; + + let fragment_id = materialize_fragment.id(); + cluster + .run(&format!( + "select parallelism from rw_fragment_parallelism where fragment_id = {fragment_id};" + )) + .await? + .assert_result_eq(format!("{default_parallelism}")); + + let new_parallelism = default_parallelism + 1; + + cluster + .run(&format!( + "alter fragment {fragment_id} set parallelism = {new_parallelism};" + )) + .await?; + + cluster + .run(&format!( + "select parallelism from rw_fragment_parallelism where fragment_id = {fragment_id};" + )) + .await? + .assert_result_eq(format!("{new_parallelism}")); + + cluster + .run(&"alter table t1 set parallelism = 1;".to_owned()) + .await?; + + cluster + .run(&format!( + "select parallelism from rw_fragment_parallelism where fragment_id = {fragment_id};" + )) + .await? + .assert_result_eq(format!("{new_parallelism}")); + + cluster.run(&"recover;".to_owned()).await?; + + sleep(Duration::from_secs(10)).await; + + cluster + .run(&format!( + "select parallelism from rw_fragment_parallelism where fragment_id = {fragment_id};" + )) + .await? + .assert_result_eq(format!("{new_parallelism}")); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs index 5fde4d79cdabd..582df723364f0 100644 --- a/src/tests/simulation/tests/integration_tests/scale/mod.rs +++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod adaptive_strategy; +mod alter_fragment; mod auto_parallelism; mod background_ddl; mod cascade_materialized_view; diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 6f3f04170a383..2912d212fd953 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -91,6 +91,7 @@ pub enum StatementType { ALTER_CONNECTION, ALTER_SYSTEM, ALTER_SECRET, + ALTER_FRAGMENT, REVOKE_PRIVILEGE, // Introduce ORDER_BY statement type cuz Calcite unvalidated AST has SqlKind.ORDER_BY. Note // that Statement Type is not designed to be one to one mapping with SqlKind. @@ -290,6 +291,7 @@ impl StatementType { } Statement::AlterTable { .. } => Ok(StatementType::ALTER_TABLE), Statement::AlterSystem { .. } => Ok(StatementType::ALTER_SYSTEM), + Statement::AlterFragment { .. } => Ok(StatementType::ALTER_FRAGMENT), Statement::DropFunction { .. } => Ok(StatementType::DROP_FUNCTION), Statement::Discard(..) => Ok(StatementType::DISCARD), Statement::SetVariable { .. } => Ok(StatementType::SET_VARIABLE),