@@ -96,8 +96,8 @@ def __init__(self, flow1, flow2):
9696
9797 # Assume the size that sent the most data is the source
9898 # TODO: this might not always be right, maybe use earlier timestamp?
99- size1 = fallback (flow1 , ['IN_BYTES' , 'IN_OCTETS' ])
100- size2 = fallback (flow2 , ['IN_BYTES' , 'IN_OCTETS' ])
99+ size1 = fallback (flow1 , ['IN_BYTES' , 'IN_OCTETS' , "octetDeltaCount" ])
100+ size2 = fallback (flow2 , ['IN_BYTES' , 'IN_OCTETS' , "octetDeltaCount" ])
101101 if size1 >= size2 :
102102 src = flow1
103103 dest = flow2
@@ -120,12 +120,24 @@ def __init__(self, flow1, flow2):
120120 ips = self .get_ips (src )
121121 self .src = ips .src
122122 self .dest = ips .dest
123- self .src_port = fallback (src , ['L4_SRC_PORT' , 'SRC_PORT' ])
124- self .dest_port = fallback (dest , ['L4_DST_PORT' , 'DST_PORT' ])
125- self .size = fallback (src , ['IN_BYTES' , 'IN_OCTETS' ])
123+ self .proto = fallback (src , ['PROTOCOL' , 'PROTO' , 'protocolIdentifier' ])
124+ # ICMP and ICMPv6 does not include port fields
125+ if self .proto == 1 or self .proto == 58 :
126+ self .src_port = 0
127+ # ICMP field is treated as destination port
128+ try :
129+ self .dest_port = fallback (dest , ['ICMP_TYPE' , 'icmpTypeCodeIPv4' , 'icmpTypeCodeIPv6' ])
130+ except :
131+ self .dest_port = fallback (dest , ['L4_DST_PORT' , 'DST_PORT' , 'destinationTransportPort' ])
132+ else :
133+ self .src_port = fallback (src , ['L4_SRC_PORT' , 'SRC_PORT' , 'sourceTransportPort' ])
134+ self .dest_port = fallback (dest , ['L4_DST_PORT' , 'DST_PORT' , 'destinationTransportPort' ])
135+ self .size = fallback (src , ['IN_BYTES' , 'IN_OCTETS' , 'octetDeltaCount' ])
126136
127137 # Duration is given in milliseconds
128- self .duration = src ['LAST_SWITCHED' ] - src ['FIRST_SWITCHED' ]
138+ lastSwitched = fallback (src , ['LAST_SWITCHED' , 'flowEndSysUpTime' ])
139+ firstSwitched = fallback (src , ['FIRST_SWITCHED' , 'flowStartSysUpTime' ])
140+ self .duration = lastSwitched - firstSwitched
129141 if self .duration < 0 :
130142 # 32 bit int has its limits. Handling overflow here
131143 # TODO: Should be handled in the collection phase
@@ -139,16 +151,17 @@ def __repr__(self):
139151 def get_ips (flow ):
140152 # IPv4
141153 if flow .get ('IP_PROTOCOL_VERSION' ) == 4 or \
154+ 'sourceIPv4Address' in flow or 'destinationIPv4Address' in flow or \
142155 'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow :
143156 return Pair (
144- ipaddress .ip_address (flow ['IPV4_SRC_ADDR' ] ),
145- ipaddress .ip_address (flow ['IPV4_DST_ADDR' ] )
157+ ipaddress .ip_address (fallback ( flow , ['IPV4_SRC_ADDR' , 'sourceIPv4Address' ]) ),
158+ ipaddress .ip_address (fallback ( flow , ['IPV4_DST_ADDR' , 'destinationIPv4Address' ]) )
146159 )
147160
148161 # IPv6
149162 return Pair (
150- ipaddress .ip_address (flow ['IPV6_SRC_ADDR' ] ),
151- ipaddress .ip_address (flow ['IPV6_DST_ADDR' ] )
163+ ipaddress .ip_address (fallback ( flow , ['IPV6_SRC_ADDR' , 'sourceIPv6Address' ]) ),
164+ ipaddress .ip_address (fallback ( flow , ['IPV6_DST_ADDR' , 'destinationIPv6Address' ]) )
152165 )
153166
154167 @property
@@ -179,7 +192,13 @@ def service(self):
179192
180193 @property
181194 def total_packets (self ):
182- return self .src_flow ["IN_PKTS" ] + self .dest_flow ["IN_PKTS" ]
195+ src_flow_packets = fallback (
196+ self .src_flow , ["IN_PKTS" , "IN_PACKETS" , "packetDeltaCount" ]
197+ )
198+ dest_flow_packets = fallback (
199+ self .dest_flow , ["IN_PKTS" , "IN_PACKETS" , "packetDeltaCount" ]
200+ )
201+ return src_flow_packets + dest_flow_packets
183202
184203
185204if __name__ == "netflow.analyzer" :
@@ -236,10 +255,6 @@ def total_packets(self):
236255 logger .error ("No header dict in entry {}" .format (ts ))
237256 raise ValueError
238257
239- if entry [ts ]["header" ]["version" ] == 10 :
240- logger .warning ("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented" )
241- continue
242-
243258 data [ts ] = entry [ts ]
244259
245260 # Go through data and dissect every flow saved inside the dump
@@ -258,20 +273,29 @@ def total_packets(self):
258273 client = data [key ]["client" ]
259274 flows = data [key ]["flows" ]
260275
261- for flow in sorted (flows , key = lambda x : x ["FIRST_SWITCHED" ]):
262- first_switched = flow ["FIRST_SWITCHED" ]
276+ for flow in sorted (flows ,
277+ key = lambda x :fallback (x ,
278+ ["FIRST_SWITCHED" , "flowStartSysUpTime" , "systemInitTimeMilliseconds" ],
279+ ),
280+ ):
281+ if "systemInitTimeMilliseconds" in flow :
282+ # systemInitTimeMilliseconds exists in only option data record
283+ continue
284+ first_switched = fallback (flow , ["FIRST_SWITCHED" , "flowStartSysUpTime" ])
263285
264286 if first_switched - 1 in pending :
265287 # TODO: handle fitting, yet mismatching (here: 1 second) pairs
266288 pass
267289
268290 # Find the peer for this connection
269- if "IPV4_SRC_ADDR" in flow or flow .get ("IP_PROTOCOL_VERSION" ) == 4 :
270- local_peer = flow ["IPV4_SRC_ADDR" ]
271- remote_peer = flow ["IPV4_DST_ADDR" ]
291+ if ("IPV4_SRC_ADDR" in flow or "sourceIPv4Address" in flow
292+ or fallback (flow , ["IP_PROTOCOL_VERSION" , "ipVersion" ]) == 4
293+ ):
294+ local_peer = fallback (flow , ["IPV4_SRC_ADDR" , "sourceIPv4Address" ])
295+ remote_peer = fallback (flow , ["IPV4_DST_ADDR" , "destinationIPv4Address" ])
272296 else :
273- local_peer = flow ["IPV6_SRC_ADDR" ]
274- remote_peer = flow ["IPV6_DST_ADDR" ]
297+ local_peer = fallback ( flow , ["IPV6_SRC_ADDR" , "sourceIPv6Address" ])
298+ remote_peer = fallback ( flow , ["IPV6_DST_ADDR" , "destinationIPv6Address" ])
275299
276300 # Match on host filter passed in as argument
277301 if args .match_host and not any ([local_peer == args .match_host , remote_peer == args .match_host ]):
0 commit comments