Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,15 @@ message AlterParallelismRequest {

message AlterParallelismResponse {}

message AlterFragmentParallelismRequest {
repeated uint32 fragment_ids = 1;

// When `parallelism` is `None`, it indicates resetting to the job's parallelism.
optional meta.TableParallelism parallelism = 2;
}

message AlterFragmentParallelismResponse {}

message AlterCdcTableBackfillParallelismRequest {
uint32 table_id = 1;
meta.TableParallelism parallelism = 2;
Expand Down Expand Up @@ -645,6 +654,7 @@ service DdlService {
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse);
rpc AlterFragmentParallelism(AlterFragmentParallelismRequest) returns (AlterFragmentParallelismResponse);
rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
Expand Down
24 changes: 24 additions & 0 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ pub async fn handle_alter_parallelism(
Ok(builder.into())
}

pub async fn handle_alter_fragment_parallelism(
handler_args: HandlerArgs,
fragment_ids: Vec<u32>,
parallelism: SetVariableValue,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let target_parallelism = extract_fragment_parallelism(parallelism)?;

session
.env()
.meta_client()
.alter_fragment_parallelism(fragment_ids, target_parallelism)
.await?;

Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
}

fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
let adaptive_parallelism = PbTableParallelism {
parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
Expand Down Expand Up @@ -166,3 +183,10 @@ fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParal

Ok(target_parallelism)
}

fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
match parallelism {
SetVariableValue::Default => Ok(None),
other => extract_table_parallelism(other).map(Some),
}
}
41 changes: 29 additions & 12 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,18 +1254,35 @@ pub async fn handle(
operation,
} => alter_secret::handle_alter_secret(handler_args, name, with_options, operation).await,
Statement::AlterFragment {
fragment_id,
operation: AlterFragmentOperation::AlterBackfillRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
&handler_args.session,
PbThrottleTarget::Fragment,
fragment_id,
rate_limit,
StatementType::SET_VARIABLE,
)
.await
}
fragment_ids,
operation,
} => match operation {
AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
let [fragment_id] = fragment_ids.as_slice() else {
return Err(ErrorCode::InvalidInputSyntax(
"ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
.to_owned(),
)
.into());
};
alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
&handler_args.session,
PbThrottleTarget::Fragment,
*fragment_id,
rate_limit,
StatementType::SET_VARIABLE,
)
.await
}
AlterFragmentOperation::SetParallelism { parallelism } => {
alter_parallelism::handle_alter_fragment_parallelism(
handler_args,
fragment_ids,
parallelism,
)
.await
}
},
Statement::AlterDefaultPrivileges { .. } => {
handle_privilege::handle_alter_default_privileges(handler_args, stmt).await
}
Expand Down
20 changes: 18 additions & 2 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
use risingwave_pb::meta::list_streaming_job_states_response::StreamingJobState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{
EventLog, FragmentDistribution, PbThrottleTarget, RecoveryStatus, RefreshRequest,
RefreshResponse,
EventLog, FragmentDistribution, PbTableParallelism, PbThrottleTarget, RecoveryStatus,
RefreshRequest, RefreshResponse,
};
use risingwave_pb::secret::PbSecretRef;
use risingwave_rpc_client::error::Result;
Expand Down Expand Up @@ -132,6 +132,12 @@ pub trait FrontendMetaClient: Send + Sync {
rate_limit: Option<u32>,
) -> Result<()>;

async fn alter_fragment_parallelism(
&self,
fragment_ids: Vec<u32>,
parallelism: Option<PbTableParallelism>,
) -> Result<()>;

async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
Expand Down Expand Up @@ -353,6 +359,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
.map(|_| ())
}

async fn alter_fragment_parallelism(
&self,
fragment_ids: Vec<u32>,
parallelism: Option<PbTableParallelism>,
) -> Result<()> {
self.0
.alter_fragment_parallelism(fragment_ids, parallelism)
.await
}

async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
self.0.get_cluster_recovery_status().await
}
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,14 @@ impl FrontendMetaClient for MockFrontendMetaClient {
unimplemented!()
}

async fn alter_fragment_parallelism(
&self,
_fragment_ids: Vec<u32>,
_parallelism: Option<PbTableParallelism>,
) -> RpcResult<()> {
unimplemented!()
}

async fn get_cluster_recovery_status(&self) -> RpcResult<RecoveryStatus> {
Ok(RecoveryStatus::StatusRunning)
}
Expand Down
46 changes: 46 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,52 @@ impl DdlService for DdlServiceImpl {
Ok(Response::new(AlterParallelismResponse {}))
}

async fn alter_fragment_parallelism(
&self,
request: Request<AlterFragmentParallelismRequest>,
) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
let req = request.into_inner();

let fragment_ids = req.fragment_ids;
if fragment_ids.is_empty() {
return Err(Status::invalid_argument(
"at least one fragment id must be provided",
));
}

let parallelism = match req.parallelism {
Some(parallelism) => {
let streaming_parallelism = match parallelism.get_parallelism()? {
Parallelism::Fixed(FixedParallelism { parallelism }) => {
StreamingParallelism::Fixed(*parallelism as _)
}
Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
StreamingParallelism::Adaptive
}
_ => bail_unavailable!(),
};
Some(streaming_parallelism)
}
None => None,
};

let fragment_targets = fragment_ids
.into_iter()
.map(|fragment_id| {
(
fragment_id as risingwave_meta_model::FragmentId,
parallelism.clone(),
)
})
.collect();

self.ddl_controller
.reschedule_fragments(fragment_targets)
.await?;

Ok(Response::new(AlterFragmentParallelismResponse {}))
}

/// Auto schema change for cdc sources,
/// called by the source parser when a schema change is detected.
async fn auto_schema_change(
Expand Down
26 changes: 23 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::{
ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
SchemaId, SecretId, SinkId, SourceId, SubscriptionId, UserId, ViewId, WorkerId,
SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
WorkerId,
};
use risingwave_pb::catalog::{
Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
Expand Down Expand Up @@ -81,8 +82,9 @@ use crate::manager::{
NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{
DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, StreamContext,
StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
TableParallelism,
};
use crate::stream::cdc::{
is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
Expand Down Expand Up @@ -520,6 +522,24 @@ impl DdlController {
.await
}

pub async fn reschedule_fragments(
&self,
fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
) -> MetaResult<()> {
tracing::info!(
"altering parallelism for fragments {:?}",
fragment_targets.keys()
);
let fragment_targets = fragment_targets
.into_iter()
.map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
.collect();

self.stream_manager
.reschedule_fragments(fragment_targets)
.await
}

async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade)
.await
Expand Down
55 changes: 36 additions & 19 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::controller::scale::{
FragmentRenderMap, NoShuffleEnsemble, RenderedGraph, WorkerInfo,
find_fragment_no_shuffle_dags_detailed, render_fragments, render_jobs,
};
use crate::error::bail_invalid_parameter;
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
use crate::model::{
ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, StreamContext,
Expand Down Expand Up @@ -357,7 +358,7 @@ impl ScaleController {

pub async fn reschedule_fragment_inplace(
&self,
policy: HashMap<risingwave_meta_model::FragmentId, ParallelismPolicy>,
policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
workers: HashMap<WorkerId, PbWorkerNode>,
) -> MetaResult<HashMap<DatabaseId, Command>> {
if policy.is_empty() {
Expand Down Expand Up @@ -394,37 +395,53 @@ impl ScaleController {
for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
let entry_fragment_ids = ensemble.entry_fragments().collect_vec();

let parallelisms = entry_fragment_ids
let desired_parallelism = match entry_fragment_ids
.iter()
.filter_map(|fragment_id| policy.get(fragment_id))
.filter_map(|fragment_id| policy.get(fragment_id).cloned())
.dedup()
.collect_vec();

let parallelism = match parallelisms.as_slice() {
.collect_vec()
.as_slice()
{
[] => {
bail!(
"no reschedule policy specified for fragments in the no-shuffle ensemble: {:?}",
bail_invalid_parameter!(
"none of the entry fragments {:?} were included in the reschedule request; \
provide at least one entry fragment id",
entry_fragment_ids
);
}
[policy] => &policy.parallelism,
_ => {
[parallelism] => parallelism.clone(),
parallelisms => {
bail!(
"conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
parallelisms
.iter()
.map(|policy| &policy.parallelism)
.collect_vec()
);
}
};

for fragment_id in entry_fragment_ids {
let fragment = fragment::ActiveModel {
fragment_id: Set(fragment_id),
parallelism: Set(Some(parallelism.clone())),
..Default::default()
};
let fragments = Fragment::find()
.filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
.all(&txn)
.await?;

debug_assert!(
fragments
.iter()
.map(|fragment| fragment.parallelism.as_ref())
.all_equal(),
"entry fragments in the same ensemble should share the same parallelism"
);

let current_parallelism = fragments
.first()
.and_then(|fragment| fragment.parallelism.clone());

if current_parallelism == desired_parallelism {
continue;
}

for fragment in fragments {
let mut fragment = fragment.into_active_model();
fragment.parallelism = Set(desired_parallelism.clone());
fragment.update(&txn).await?;
}

Expand Down
Loading
Loading