@@ -590,13 +590,13 @@ class SourceProcessor(
590590 protected suspend fun processItems () {
591591 secondaryFileMover.releaseAll()
592592 val stat = context.stat
593- stat.stopWatch.start(" fetchItems " )
593+ stat.stopWatch.start(" fetch-items " )
594594
595- val itemIterable = retry(getRetryPolicy(" FetchSourceItems " )) {
595+ val itemIterable = retry(getRetryPolicy(" fetch-source-items " )) {
596596 source.fetch(sourcePointer, options.fetchLimit)
597597 }
598598 stat.stopWatch.stop()
599- stat.stopWatch.start(" processItems " )
599+ stat.stopWatch.start(" process-items " )
600600
601601 val semaphore = Semaphore (options.parallelism, true )
602602 // 处理Source的迭代器返回重复的Item
@@ -619,15 +619,15 @@ class SourceProcessor(
619619 itemSet.add(sourceItem)
620620 launch {
621621 log.trace(" Processor:'{}' start process item:{}" , name, pointed)
622- val itemOptions = selectItemOptions(pointed)
623- val filtered = filterItem(itemOptions, pointed)
624- if (filtered) {
625- itemSet.remove(sourceItem)
626- stat.incFilterCounting()
627- semaphore.release()
628- return @launch
629- }
630622 val processingContent = runCatching {
623+ val itemOptions = selectItemOptions(pointed)
624+ val filtered = filterItem(itemOptions, pointed)
625+ if (filtered) {
626+ itemSet.remove(sourceItem)
627+ stat.incFilterCounting()
628+ semaphore.release()
629+ return @launch
630+ }
631631 val ct = processWithRetry(pointed, itemOptions)
632632 context.touch(ct)
633633 onItemSuccess(pointed, ct)
0 commit comments