@@ -671,7 +671,6 @@ private static void runWithConcurrency(
671671 int maxWaitRetries ,
672672 TerminationFlag terminationFlag ,
673673 ExecutorService executor ) {
674- System .out .println ("[ParallelUtil#runWithConcurrency] running with " + tasks .size () + " tasks" + ", and concurrency of " + concurrency );
675674 if (!canRunInParallel (executor )
676675 || tasks .size () == 1
677676 || concurrency <= 1 ) {
@@ -692,7 +691,6 @@ private static void runWithConcurrency(
692691 // generally assumes that tasks.size is notably larger than concurrency
693692 try {
694693 //noinspection StatementWithEmptyBody - add first concurrency tasks
695- System .out .println ("[ParallelUtil#runWithConcurrency] submit tasks" );
696694 while (concurrency -- > 0
697695 && terminationFlag .running ()
698696 && completionService .trySubmit (ts ));
@@ -701,15 +699,12 @@ private static void runWithConcurrency(
701699 return ;
702700 }
703701
704- System .out .println ("[ParallelUtil#runWithConcurrency] submit more tasks" );
705702 // submit all remaining tasks
706703 int tries = 0 ;
707704 while (ts .hasNext ()) {
708705 if (completionService .hasTasks ()) {
709706 try {
710- System .out .println ("[ParallelUtil#runWithConcurrency] waiting for task to finish... " + executor );
711707 completionService .awaitNext ();
712- System .out .println ("[ParallelUtil#runWithConcurrency] task finished... " + executor );
713708 } catch (ExecutionException e ) {
714709 error = Exceptions .chain (error , e .getCause ());
715710 } catch (CancellationException ignore ) {
@@ -720,18 +715,15 @@ private static void runWithConcurrency(
720715 }
721716 if (!completionService .trySubmit (ts ) && !completionService .hasTasks ()) {
722717 if (++tries >= maxWaitRetries ) {
723- System .out .println ("[ParallelUtil#runWithConcurrency] exceeded max wait retries" );
724718 break ;
725719 }
726720 LockSupport .parkNanos (waitNanos );
727721 }
728722 }
729723
730- System .out .println ("[ParallelUtil#runWithConcurrency] wait for tasks" );
731724 // wait for all tasks to finish
732725 while (completionService .hasTasks () && terminationFlag .running ()) {
733726 try {
734- System .out .println ("[ParallelUtil#runWithConcurrency] waiting for next task" );
735727 completionService .awaitNext ();
736728 } catch (ExecutionException e ) {
737729 error = Exceptions .chain (error , e .getCause ());
@@ -743,7 +735,6 @@ private static void runWithConcurrency(
743735 } finally {
744736 finishRunWithConcurrency (completionService , error );
745737 }
746- System .out .println ("[ParallelUtil#runWithConcurrency] finished running with " + tasks .size () + " tasks" );
747738 }
748739
749740 private static void finishRunWithConcurrency (
@@ -890,9 +881,7 @@ protected void done() {
890881 if (executor instanceof ThreadPoolExecutor ) {
891882 pool = (ThreadPoolExecutor ) executor ;
892883 availableConcurrency = pool .getCorePoolSize ();
893- // availableConcurrency = 2;
894884 int capacity = Math .max (targetConcurrency , availableConcurrency ) + 1 ;
895- System .out .println ("[ParallelUtil#runWithConcurrency] capacity = " + capacity + " [target:" + targetConcurrency + ",available:" + availableConcurrency + "]" );
896885 completionQueue = new ArrayBlockingQueue <>(capacity );
897886 } else {
898887 pool = null ;
@@ -939,17 +928,7 @@ void cancelAll() {
939928 }
940929
941930 private boolean canSubmit () {
942- int activeCount = 0 ;
943- boolean canSubmit = pool == null || (activeCount = pool .getActiveCount ()) < availableConcurrency ;
944-
945- if (!canSubmit ) {
946- System .out .println ("[ParallelUtil#runWithConcurrency] unable to submit task and pool:" + pool + ", activeCount:" + activeCount + ", availableConcurrency:" + availableConcurrency );
947- // throw new RuntimeException();
948- } else {
949- System .out .println ("[ParallelUtil#runWithConcurrency] submitted task and pool:" + pool + ", activeCount:" + activeCount + ", availableConcurrency:" + availableConcurrency );
950- }
951-
952- return canSubmit ;
931+ return pool == null || pool .getActiveCount () < availableConcurrency ;
953932 }
954933
955934 private void stopFutures (Collection <Future <Void >> futures ) {
@@ -1001,7 +980,6 @@ public T next() {
1001980
1002981 void pushBack (T element ) {
1003982 if (pushedElement != null ) {
1004- System .out .println ("[ParallelUtil#runWithConcurrency] unable to reschedule task" );
1005983 throw new IllegalArgumentException ("Cannot push back twice" );
1006984 }
1007985 pushedElement = element ;
0 commit comments