@@ -191,6 +191,7 @@ def perform_account_aggr_job(*args, **job_params):
191191 values .extend (NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
192192 values .extend (NetFlowBot .get_top_N_protocols_for_entity (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
193193 values .extend (NetFlowBot .get_top_N_protocols_for_entity_interfaces (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
194+ values .extend (NetFlowBot .get_top_N_connections_for_entity (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
194195
195196 if not values :
196197 log .warning ("No values found to be sent to Grafolean" )
@@ -419,6 +420,39 @@ def get_top_N_protocols_for_entity(interval_label, last_used_ts, max_ts, time_be
419420
420421 return values
421422
423+ @staticmethod
424+ @slow_down
425+ def get_top_N_connections_for_entity (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ):
426+ with get_db_cursor () as c :
427+ values = []
428+ c .execute (f"""
429+ SELECT
430+ f.ipv4_src_addr, f.ipv4_dst_addr,
431+ sum(f.in_bytes) "traffic"
432+ FROM
433+ { DB_PREFIX } flows "f"
434+ WHERE
435+ f.client_ip = %s AND
436+ f.ts > %s AND
437+ f.ts <= %s AND
438+ f.direction = %s
439+ GROUP BY
440+ f.ipv4_src_addr, f.ipv4_dst_addr
441+ ORDER BY
442+ traffic desc
443+ LIMIT { TOP_N_MAX } ;
444+ """ , (entity_ip , last_used_ts , max_ts , direction ,))
445+
446+ output_path_entity = NetFlowBot .construct_output_path_prefix (interval_label , direction , entity_id , interface = None )
447+ for ipv4_src_addr , ipv4_dst_addr , traffic_bytes in c .fetchall ():
448+ output_path = f"{ output_path_entity } .topconn.{ path_part_encode (ipv4_src_addr )} .{ path_part_encode (ipv4_dst_addr )} "
449+ values .append ({
450+ 'p' : output_path ,
451+ 'v' : traffic_bytes / time_between , # Bps
452+ })
453+
454+ return values
455+
422456 # @staticmethod
423457 # @slow_down
424458 # def get_top_N_protocols(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True):
0 commit comments