Skip to content

Commit 7bade32

Browse files
risingwave-cihzxa21cyliu0
authored andcommitted
fix: use error instead of warn for source/sink errors (#23178) (#23189)
Co-authored-by: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com> Co-authored-by: Chengyou Liu <35356271+cyliu0@users.noreply.github.com>
1 parent c6bbb02 commit 7bade32

File tree

4 files changed

+18
-5
lines changed

4 files changed

+18
-5
lines changed

src/stream/src/executor/sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
660660
if F::ALLOW_REWIND {
661661
match log_reader.rewind().await {
662662
Ok(()) => {
663-
warn!(
663+
error!(
664664
error = %e.as_report(),
665665
executor_id = sink_writer_param.executor_id,
666666
sink_id = sink_param.sink_id.sink_id,

src/stream/src/executor/source/reader_stream.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::time::Duration;
1717

1818
use itertools::Itertools;
1919
use risingwave_common::catalog::{ColumnId, TableId};
20+
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
2021
use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
2122
use risingwave_connector::source::reader::desc::SourceDesc;
2223
use risingwave_connector::source::{
@@ -179,13 +180,19 @@ impl StreamReaderBuilder {
179180
if is_initial_build {
180181
return Err(StreamExecutorError::connector_error(e));
181182
} else {
182-
tracing::warn!(
183+
tracing::error!(
183184
error = %e.as_report(),
184185
source_name = self.source_name,
185186
source_id = self.source_id.table_id,
186187
actor_id = self.actor_ctx.id,
187188
"build stream source reader error, retry in 1s"
188189
);
190+
GLOBAL_ERROR_METRICS.user_source_error.report([
191+
e.variant_name().to_owned(),
192+
self.source_id.table_id.to_string(),
193+
self.source_name.to_owned(),
194+
self.actor_ctx.fragment_id.to_string(),
195+
]);
189196
tokio::time::sleep(Duration::from_secs(1)).await;
190197
continue 'build_consume_loop;
191198
}
@@ -208,13 +215,19 @@ impl StreamReaderBuilder {
208215
yield (msg, latest_splits_info.clone());
209216
}
210217
Err(e) => {
211-
tracing::warn!(
218+
tracing::error!(
212219
error = %e.as_report(),
213220
source_name = self.source_name,
214221
source_id = self.source_id.table_id,
215222
actor_id = self.actor_ctx.id,
216223
"stream source reader error"
217224
);
225+
GLOBAL_ERROR_METRICS.user_source_error.report([
226+
e.variant_name().to_owned(),
227+
self.source_id.table_id.to_string(),
228+
self.source_name.to_owned(),
229+
self.actor_ctx.fragment_id.to_string(),
230+
]);
218231
is_error = true;
219232
break 'consume;
220233
}

src/stream/src/executor/source/source_backfill_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
497497
Either::Left(msg) => {
498498
let Ok(msg) = msg else {
499499
let e = msg.unwrap_err();
500-
tracing::warn!(
500+
tracing::error!(
501501
error = ?e.as_report(),
502502
source_id = %self.source_id,
503503
"stream source reader error",

src/stream/src/executor/source/source_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ impl<S: StateStore> SourceExecutor<S> {
376376
e: StreamExecutorError,
377377
) -> StreamExecutorResult<()> {
378378
let core = &mut self.stream_source_core;
379-
tracing::warn!(
379+
tracing::error!(
380380
error = ?e.as_report(),
381381
actor_id = self.actor_ctx.id,
382382
source_id = %core.source_id,

0 commit comments

Comments
 (0)