55 "fmt"
66 "net/http"
77
8+ "k8s.io/apimachinery/pkg/runtime"
9+ "k8s.io/apimachinery/pkg/util/sets"
10+
811 "github.com/jetstack/preflight/api"
912 "github.com/jetstack/preflight/pkg/internal/cyberark"
1013 "github.com/jetstack/preflight/pkg/internal/cyberark/dataupload"
@@ -36,10 +39,17 @@ func NewCyberArk(httpClient *http.Client) (*CyberArkClient, error) {
3639}
3740
3841// PostDataReadingsWithOptions uploads data readings to CyberArk.
42+ // It converts the supplied data readings into a snapshot format expected by CyberArk.
3943// It initializes a data upload client with the configured HTTP client and credentials,
4044// then uploads a snapshot.
4145// The supplied Options are not used by this publisher.
4246func (o * CyberArkClient ) PostDataReadingsWithOptions (ctx context.Context , readings []* api.DataReading , _ Options ) error {
47+ var snapshot dataupload.Snapshot
48+ if err := convertDataReadings (defaultExtractorFunctions , readings , & snapshot ); err != nil {
49+ return fmt .Errorf ("while converting data readings: %s" , err )
50+ }
51+ snapshot .AgentVersion = version .PreflightVersion
52+
4353 cfg , err := o .configLoader ()
4454 if err != nil {
4555 return err
@@ -49,14 +59,134 @@ func (o *CyberArkClient) PostDataReadingsWithOptions(ctx context.Context, readin
4959 return fmt .Errorf ("while initializing data upload client: %s" , err )
5060 }
5161
52- err = datauploadClient .PutSnapshot (ctx , dataupload.Snapshot {
53- // Temporary hard coded cluster ID.
54- // TODO(wallrj): The clusterID will eventually be extracted from the supplied readings.
55- ClusterID : "success-cluster-id" ,
56- AgentVersion : version .PreflightVersion ,
57- })
62+ err = datauploadClient .PutSnapshot (ctx , snapshot )
5863 if err != nil {
5964 return fmt .Errorf ("while uploading snapshot: %s" , err )
6065 }
6166 return nil
6267}
68+
69+ // extractClusterIDAndServerVersionFromReading converts the opaque data from a DiscoveryData
70+ // data reading to allow access to the Kubernetes version fields within.
71+ func extractClusterIDAndServerVersionFromReading (reading * api.DataReading , target * dataupload.Snapshot ) error {
72+ if reading == nil {
73+ return fmt .Errorf ("programmer mistake: the DataReading must not be nil" )
74+ }
75+ data , ok := reading .Data .(* api.DiscoveryData )
76+ if ! ok {
77+ return fmt .Errorf (
78+ "programmer mistake: the DataReading must have data type *api.DiscoveryData. " +
79+ "This DataReading (%s) has data type %T" , reading .DataGatherer , reading .Data )
80+ }
81+ target .ClusterID = data .ClusterID
82+ if data .ServerVersion != nil {
83+ target .K8SVersion = data .ServerVersion .GitVersion
84+ }
85+ return nil
86+ }
87+
88+ // extractResourceListFromReading converts the opaque data from a DynamicData
89+ // data reading to runtime.Object resources, to allow access to the metadata and
90+ // other kubernetes API fields.
91+ func extractResourceListFromReading (reading * api.DataReading , target * []runtime.Object ) error {
92+ if reading == nil {
93+ return fmt .Errorf ("programmer mistake: the DataReading must not be nil" )
94+ }
95+ data , ok := reading .Data .(* api.DynamicData )
96+ if ! ok {
97+ return fmt .Errorf (
98+ "programmer mistake: the DataReading must have data type *api.DynamicData. " +
99+ "This DataReading (%s) has data type %T" , reading .DataGatherer , reading .Data )
100+ }
101+ resources := make ([]runtime.Object , len (data .Items ))
102+ for i , item := range data .Items {
103+ if resource , ok := item .Resource .(runtime.Object ); ok {
104+ resources [i ] = resource
105+ } else {
106+ return fmt .Errorf (
107+ "programmer mistake: the DynamicData items must have Resource type runtime.Object. " +
108+ "This item (%d) has Resource type %T" , i , item .Resource )
109+ }
110+ }
111+ * target = resources
112+ return nil
113+ }
114+
115+ var defaultExtractorFunctions = map [string ]func (* api.DataReading , * dataupload.Snapshot ) error {
116+ "ark/discovery" : extractClusterIDAndServerVersionFromReading ,
117+ "ark/secrets" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
118+ return extractResourceListFromReading (r , & s .Secrets )
119+ },
120+ "ark/serviceaccounts" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
121+ return extractResourceListFromReading (r , & s .ServiceAccounts )
122+ },
123+ "ark/roles" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
124+ return extractResourceListFromReading (r , & s .Roles )
125+ },
126+ "ark/clusterroles" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
127+ return extractResourceListFromReading (r , & s .ClusterRoles )
128+ },
129+ "ark/rolebindings" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
130+ return extractResourceListFromReading (r , & s .RoleBindings )
131+ },
132+ "ark/clusterrolebindings" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
133+ return extractResourceListFromReading (r , & s .ClusterRoleBindings )
134+ },
135+ "ark/jobs" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
136+ return extractResourceListFromReading (r , & s .Jobs )
137+ },
138+ "ark/cronjobs" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
139+ return extractResourceListFromReading (r , & s .CronJobs )
140+ },
141+ "ark/deployments" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
142+ return extractResourceListFromReading (r , & s .Deployments )
143+ },
144+ "ark/statefulsets" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
145+ return extractResourceListFromReading (r , & s .Statefulsets )
146+ },
147+ "ark/daemonsets" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
148+ return extractResourceListFromReading (r , & s .Daemonsets )
149+ },
150+ "ark/pods" : func (r * api.DataReading , s * dataupload.Snapshot ) error {
151+ return extractResourceListFromReading (r , & s .Pods )
152+ },
153+ }
154+
155+ // convertDataReadings processes a list of DataReadings using the provided
156+ // extractor functions to populate the fields of the target snapshot.
157+ // It ensures that all expected data gatherers are handled and that there are
158+ // no unhandled data gatherers. If any discrepancies are found, or if any
159+ // extractor function returns an error, it returns an error.
160+ // The extractorFunctions map should contain functions for each expected
161+ // DataGatherer name, which will be called with the corresponding DataReading
162+ // and the target snapshot to populate the relevant fields.
163+ func convertDataReadings (
164+ extractorFunctions map [string ]func (* api.DataReading , * dataupload.Snapshot ) error ,
165+ readings []* api.DataReading ,
166+ target * dataupload.Snapshot ,
167+ ) error {
168+ expectedDataGatherers := sets .KeySet (extractorFunctions )
169+ unhandledDataGatherers := sets .New [string ]()
170+ missingDataGatherers := expectedDataGatherers .Clone ()
171+ for _ , reading := range readings {
172+ dataGathererName := reading .DataGatherer
173+ extractFunc , found := extractorFunctions [dataGathererName ]
174+ if ! found {
175+ unhandledDataGatherers .Insert (dataGathererName )
176+ continue
177+ }
178+ missingDataGatherers .Delete (dataGathererName )
179+ // Call the extractor function to populate the relevant field in the target snapshot.
180+ if err := extractFunc (reading , target ); err != nil {
181+ return fmt .Errorf ("while extracting data reading %s: %s" , dataGathererName , err )
182+ }
183+ }
184+ if missingDataGatherers .Len () > 0 || unhandledDataGatherers .Len () > 0 {
185+ return fmt .Errorf (
186+ "unexpected data gatherers, missing: %v, unhandled: %v" ,
187+ sets .List (missingDataGatherers ),
188+ sets .List (unhandledDataGatherers ),
189+ )
190+ }
191+ return nil
192+ }
0 commit comments