@@ -2,6 +2,7 @@ package stub
22
33import (
44 "bytes"
5+ "context"
56 "crypto/tls"
67 "crypto/x509"
78 b64 "encoding/base64"
@@ -15,8 +16,11 @@ import (
1516 "path"
1617 "strconv"
1718 "strings"
19+ "sync"
1820 "time"
1921
22+ "golang.org/x/sync/semaphore"
23+
2024 uuid "github.com/satori/go.uuid"
2125
2226 "github.com/sap/infrabox/src/services/gcp/pkg/apis/gcp/v1alpha1"
@@ -1111,6 +1115,14 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E
11111115 log .Infof ("Collecting data from GKE cluster %s" , cluster .Name )
11121116 defer close (done )
11131117
1118+ parallelLogPulls := 1
1119+ if n , err := strconv .Atoi (os .Getenv ("INFRABOX_PARALLEL_LOG_PULL" )); err == nil {
1120+ if n > 0 && n < 10 {
1121+ parallelLogPulls = n
1122+ }
1123+ log .Infof ("Setting parallel log pulls for log collection: %d" , parallelLogPulls )
1124+ }
1125+
11141126 annotations := cr .GetAnnotations ()
11151127 _ , ok := annotations ["infrabox.net/root-url" ]
11161128 if ! ok {
@@ -1144,23 +1156,40 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E
11441156 return
11451157 }
11461158
1159+ // Do log collection in parallel, up to parallelLogPulls concurrent goroutines.
1160+ wg := sync.WaitGroup {}
1161+ sem := semaphore .NewWeighted (int64 (parallelLogPulls ))
11471162 for _ , pod := range pods {
1163+ pod := pod // necessary before Go1.22 I think that changed this behavior.
11481164 for _ , container := range pod .Containers {
1149- log . Debug ( "Collecting logs for pod: " , pod . PodID )
1150- data , err := doCollectorRequest ( cluster , log , "/api/pods/" + pod . PodID + "/log/" + container )
1165+ container := container
1166+ err := sem . Acquire ( context . Background (), 1 )
11511167 if err != nil {
1152- log .Warningf ("Failed to get collected pod logs : %v" , err )
1153- continue
1168+ log .Errorf ("Failed to get collected pod list, cannot acquire semaphore : %v" , err )
1169+ return
11541170 }
11551171
1156- filename := "pod_" + pod .Namespace + "_" + pod .Pod + "_" + container + ".txt"
1157- filename = path .Join (logPath , filename )
1158- if err := ioutil .WriteFile (filename , * data , os .ModePerm ); err != nil {
1159- log .Debugf ("Failed to write pod logs: %v" , err )
1160- continue
1161- }
1172+ wg .Add (1 )
1173+ go func () {
1174+ defer sem .Release (1 )
1175+ defer wg .Done ()
1176+
1177+ log .Debug ("Collecting logs for pod: " , pod .PodID )
1178+ data , err := doCollectorRequest (cluster , log , "/api/pods/" + pod .PodID + "/log/" + container )
1179+ if err != nil {
1180+ log .Warningf ("Failed to get collected pod logs: %v" , err )
1181+ return
1182+ }
1183+ filename := "pod_" + pod .Namespace + "_" + pod .Pod + "_" + container + ".txt"
1184+ filename = path .Join (logPath , filename )
1185+ if err := ioutil .WriteFile (filename , * data , os .ModePerm ); err != nil {
1186+ log .Debugf ("Failed to write pod logs: %v" , err )
1187+ return
1188+ }
1189+ }()
11621190 }
11631191 }
1192+ wg .Wait ()
11641193
11651194 archivePath := path .Join (logPath , "pods_log.zip" )
11661195 err = archiver .Archive ([]string {logPath }, archivePath )
0 commit comments