3232import org .apache .kudu .client .KuduSession ;
3333import org .apache .kudu .client .KuduTable ;
3434import org .apache .kudu .client .Operation ;
35+ import org .apache .kudu .client .OperationResponse ;
3536import org .apache .kudu .client .PartialRow ;
37+ import org .apache .kudu .client .RowError ;
3638import org .apache .kudu .client .SessionConfiguration ;
3739import org .slf4j .Logger ;
3840import org .slf4j .LoggerFactory ;
@@ -74,8 +76,6 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean,
7476
7577 private Integer defaultOperationTimeoutMs ;
7678
77- private Integer defaultSocketReadTimeoutMs ;
78-
7979 /**
8080 * kerberos
8181 */
@@ -133,12 +133,7 @@ private void initSchedulerTask() {
133133 );
134134
135135 this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
136- () -> {
137- synchronized (KuduOutputFormat .this ) {
138- flush ();
139- }
140- }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS
141- );
136+ this ::flush , batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
142137 }
143138 } catch (Exception e ) {
144139 LOG .error ("init schedule task failed !" );
@@ -151,9 +146,6 @@ private void establishConnection() throws IOException {
151146 if (null != workerCount ) {
152147 kuduClientBuilder .workerCount (workerCount );
153148 }
154- if (null != defaultSocketReadTimeoutMs ) {
155- kuduClientBuilder .defaultSocketReadTimeoutMs (defaultSocketReadTimeoutMs );
156- }
157149
158150 if (null != defaultOperationTimeoutMs ) {
159151 kuduClientBuilder .defaultOperationTimeoutMs (defaultOperationTimeoutMs );
@@ -184,14 +176,13 @@ private void establishConnection() throws IOException {
184176 }
185177
186178 /**
187- * According to the different flush mode, construct different session. Detail see {@link SessionConfiguration.FlushMode}
179+ * According to the different flush mode, build different session. Detail see {@link SessionConfiguration.FlushMode}
188180 *
189181 * @param flushMode flush mode
190182 * @param kuduClient kudu client
191183 * @return KuduSession with flush mode
192- * @throws KuduException kudu exception when session flush
193184 */
194- private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) throws KuduException {
185+ private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) {
195186 KuduSession kuduSession = kuduClient .newSession ();
196187 if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .MANUAL_FLUSH .name ())) {
197188 kuduSession .setFlushMode (SessionConfiguration .FlushMode .MANUAL_FLUSH );
@@ -220,14 +211,6 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
220211 return ;
221212 }
222213 Row row = record .getField (1 );
223- if (row .getArity () != fieldNames .length ) {
224- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
225- LOG .error ("record insert failed ..{}" , row .toString ());
226- LOG .error ("cause by row.getArity() != fieldNames.length" );
227- }
228- outDirtyRecords .inc ();
229- return ;
230- }
231214
232215 try {
233216 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
@@ -236,33 +219,69 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
236219 if (rowCount .getAndIncrement () >= batchSize ) {
237220 flush ();
238221 }
222+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation, then get the response from kudu server.
223+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
224+ dealResponse (session .apply (toOperation (writeMode , row )));
225+ }
226+
239227 session .apply (toOperation (writeMode , row ));
240228 outRecords .inc ();
241229 } catch (KuduException e ) {
242- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
243- LOG .error ("record insert failed, total dirty record:{} current row:{}" , outDirtyRecords .getCount (), row .toString ());
244- LOG .error ("" , e );
245- }
246- outDirtyRecords .inc ();
230+ throw new RuntimeException (e );
247231 }
248232 }
249233
234+ /**
235+ * Flush data with session, then deal the responses of operations and reset rowCount.
236+ * Detail of flush see {@link KuduSession#flush()}
237+ */
250238 private synchronized void flush () {
251239 try {
252240 if (session .isClosed ()) {
253- throw new IllegalStateException ("session is closed! flush data error!" );
241+ throw new IllegalStateException ("Session is closed! Flush data error!" );
254242 }
255243
256- session .flush ();
244+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation
245+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
246+ return ;
247+ }
248+ session .flush ().forEach (this ::dealResponse );
257249 // clear
258250 rowCount .set (0 );
259251 } catch (KuduException kuduException ) {
260- LOG .error (
261- "flush data error!" , kuduException );
252+ LOG .error ("flush data error!" , kuduException );
262253 throw new RuntimeException (kuduException );
263254 }
264255 }
265256
257+ /**
258+ * Deal response when operation apply.
259+ * At MANUAL_FLUSH mode, response returns after {@link KuduSession#flush()}.
260+ * But at AUTO_FLUSH_SYNC mode, response returns after {@link KuduSession#apply(Operation)}
261+ *
262+ * @param response {@link OperationResponse} response after operation done.
263+ */
264+ private void dealResponse (OperationResponse response ) {
265+ if (response .hasRowError ()) {
266+ RowError error = response .getRowError ();
267+ String errorMsg = error .getErrorStatus ().toString ();
268+ if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
269+ LOG .error (errorMsg );
270+ LOG .error (String .format ("Dirty data count: [%s]. Row data: [%s]" ,
271+ outDirtyRecords .getCount () + 1 , error .getOperation ().getRow ().toString ()));
272+ }
273+ outDirtyRecords .inc ();
274+
275+ if (error .getErrorStatus ().isNotFound ()
276+ || error .getErrorStatus ().isIOError ()
277+ || error .getErrorStatus ().isRuntimeError ()
278+ || error .getErrorStatus ().isServiceUnavailable ()
279+ || error .getErrorStatus ().isIllegalState ()) {
280+ throw new RuntimeException (errorMsg );
281+ }
282+ }
283+ }
284+
266285 @ Override
267286 public void close () {
268287 if (Objects .nonNull (session ) && !session .isClosed ()) {
@@ -429,11 +448,6 @@ public KuduOutputFormatBuilder setDefaultOperationTimeoutMs(Integer defaultOpera
429448 return this ;
430449 }
431450
432- public KuduOutputFormatBuilder setDefaultSocketReadTimeoutMs (Integer defaultSocketReadTimeoutMs ) {
433- kuduOutputFormat .defaultSocketReadTimeoutMs = defaultSocketReadTimeoutMs ;
434- return this ;
435- }
436-
437451 public KuduOutputFormatBuilder setPrincipal (String principal ) {
438452 kuduOutputFormat .principal = principal ;
439453 return this ;
0 commit comments