File tree Expand file tree Collapse file tree 1 file changed +5
-4
lines changed
hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase Expand file tree Collapse file tree 1 file changed +5
-4
lines changed Original file line number Diff line number Diff line change 1919
2020package com .dtstack .flink .sql .sink .hbase ;
2121
22+ import com .dtstack .flink .sql .exception .ExceptionTrace ;
2223import com .dtstack .flink .sql .factory .DTThreadFactory ;
2324import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
2425import com .google .common .collect .Maps ;
@@ -233,10 +234,10 @@ protected synchronized void dealBatchOperation(List<Row> records) {
233234 } finally {
234235 // 判断数据是否插入成功
235236 for (int i = 0 ; i < results .length ; i ++) {
236- if (results [i ] != null ) {
237+ if (results [i ] instanceof Exception ) {
237238 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
238239 LOG .error ("Get dirty data: {}" , records .get (i ).toString ());
239- LOG .error ("Error cause: " + results [i ]);
240+ LOG .error ("Error cause: " + ExceptionTrace . traceOriginalCause (( Exception ) results [i ]) );
240241 }
241242 // 脏数据记录
242243 outDirtyRecords .inc ();
@@ -262,8 +263,8 @@ protected void dealInsert(Row record) {
262263 table .put (put );
263264 } catch (Exception e ) {
264265 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
265- LOG .error ("record insert failed .. {}" , record .toString ());
266- LOG .error ("" , e );
266+ LOG .error ("Get dirty data: {}" , record .toString ());
267+ LOG .error ("Error cause: " + ExceptionTrace . traceOriginalCause ( e ) );
267268 }
268269 outDirtyRecords .inc ();
269270 }
You can’t perform that action at this time.
0 commit comments