1010from botocore .exceptions import ClientError
1111
1212if "LOGLEVEL" in os .environ :
13- logging .basicConfig (level = os .environ .get ("LOGLEVEL" , "WARNING" ), format = '%(levelname)s:%(message)s' )
13+ logging .basicConfig (level = os .environ .get (
14+ "LOGLEVEL" , "WARNING" ), format = '%(levelname)s:%(message)s' )
1415logger = logging .getLogger (__name__ )
1516
1617
@@ -141,7 +142,7 @@ def process_vpc_config(account, vpc, vpc_dict):
141142def flatten_subnet_config (vpc_name , subnets ):
142143 """Takes subnet object from ASEA config and generate list of subnets to be created per AZ"""
143144 return [
144- {"Name" : f"{ subnet ['name' ]} _{ vpc_name } _az{ d ['az' ]} _net" , "route-table" : f"{ d ['route-table' ]} _rt" }
145+ {"Name" : f"{ subnet ['name' ]} _{ vpc_name } _az{ d ['az' ]} _net" , "route-table" : f"{ d ['route-table' ]} _rt" } # nopep8
145146 for subnet in subnets
146147 for d in subnet ["definitions" ]
147148 if not d .get ('disabled' , False )
@@ -279,7 +280,7 @@ def get_transit_gateway_route_tables(ec2_client, tgw_id: str) -> List[Dict]:
279280 blackhole_routes = get_transit_gateway_routes (
280281 ec2_client , tgwrt ["TransitGatewayRouteTableId" ], "blackhole" )
281282 except Exception as e :
282- logger .error (f"Failed to get routes for table { tgwrt ['TransitGatewayRouteTableId' ]} : { str (e )} " )
283+ logger .error (f"Failed to get routes for table { tgwrt ['TransitGatewayRouteTableId' ]} : { str (e )} " ) # nopep8
283284 active_routes = []
284285
285286 name = next ((tag ["Value" ] for tag in tgwrt .get ("Tags" , [])
@@ -322,7 +323,7 @@ def get_transit_gateway_routes(ec2_client, tgwrt_id: str, state: str) -> List[Di
322323 """
323324 valid_states = ['active' , 'blackhole' , 'deleted' , 'deleting' , 'pending' ]
324325 if state not in valid_states :
325- raise ValueError (f"Invalid route state. Must be one of: { ', ' .join (valid_states )} " )
326+ raise ValueError (f"Invalid route state. Must be one of: { ', ' .join (valid_states )} " ) # nopep8
326327
327328 try :
328329 response = ec2_client .search_transit_gateway_routes (
@@ -495,7 +496,7 @@ def analyze_vpcs(vpc_from_config, account_list, role_to_assume, region):
495496 drift ["route_tables_not_deployed" ].append (
496497 {"RouteTable" : crt ['name' ], "Vpc" : dv })
497498 continue
498- elif len (drt ) > 0 :
499+ elif len (drt ) > 0 :
499500 if len (drt ) > 1 :
500501 logger .error (
501502 f"More than one route table named { crt ['name' ]} is deployed! LZA upgrade already executed?" )
@@ -553,6 +554,7 @@ def analyze_vpcs(vpc_from_config, account_list, role_to_assume, region):
553554
554555 return {"Drift" : drift , "VpcDetails" : vpc_details }
555556
557+
556558def compare_route_table (crt , drt ):
557559 """
558560 Compare entries of configured and deployed route table
@@ -561,74 +563,95 @@ def compare_route_table(crt, drt):
561563 """
562564 drift = []
563565
564- #ignoring gateway endpoint routes (S3 and DynamoDB) and local subnet routes
565- cRoutes = [r for r in crt .get ('routes' , []) if r ['target' ].lower () != 's3' and r ['target' ].lower () != 'dynamodb' ]
566- dRoutes = [r for r in drt .get ('Routes' , []) if 'DestinationCidrBlock' in r and r .get ("GatewayId" , "" ) != "local" ]
566+ # ignoring gateway endpoint routes (S3 and DynamoDB) and local subnet routes
567+ cRoutes = [r for r in crt .get ('routes' , []) if r ['target' ].lower (
568+ ) != 's3' and r ['target' ].lower () != 'dynamodb' ]
569+ dRoutes = [r for r in drt .get (
570+ 'Routes' , []) if 'DestinationCidrBlock' in r and r .get ("GatewayId" , "" ) != "local" ]
567571
568572 if len (cRoutes ) != len (dRoutes ):
569- logger .warning (f"Different number of routes in config and deployed route table for { crt ['name' ]} " )
573+ logger .warning (
574+ f"Different number of routes in config and deployed route table for { crt ['name' ]} " )
570575
571- #check if all route entries in config matches what is deployed
576+ # check if all route entries in config matches what is deployed
572577 for cr in cRoutes :
573578 if cr ['target' ].lower () == "pcx" :
574- logger .warning (f"Route { cr ['destination' ]} is a VPC peering route. Skipping check" )
579+ logger .warning (
580+ f"Route { cr ['destination' ]} is a VPC peering route. Skipping check" )
575581 continue
576582
577- dr = [r for r in dRoutes if cr ['destination' ] == r ['DestinationCidrBlock' ]]
583+ dr = [r for r in dRoutes if cr ['destination' ]
584+ == r ['DestinationCidrBlock' ]]
578585 if len (dr ) == 0 :
579- logger .warning (f"Route { cr ['destination' ]} exists in config but not found in deployed route table" )
580- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not found in deployed route table" })
586+ logger .warning (f"Route { cr ['destination' ]} exists in config but not found in deployed route table" ) # nopep8
587+ drift .append (
588+ {"Route" : cr ['destination' ], "Reason" : "Not found in deployed route table" })
581589 continue
582590 elif len (dr ) == 1 :
583591 dre = dr [0 ]
584592 if cr ['target' ] == "IGW" :
585593 if not ("GatewayId" in dre and dre ['GatewayId' ].startswith ("igw-" )):
586- logger .warning (f"Route { cr ['destination' ]} not matched to IGW" )
587- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not matched to IGW" })
594+ logger .warning (
595+ f"Route { cr ['destination' ]} not matched to IGW" )
596+ drift .append (
597+ {"Route" : cr ['destination' ], "Reason" : "Not matched to IGW" })
588598 elif cr ['target' ] == "TGW" :
589599 if not "TransitGatewayId" in dre :
590- logger .warning (f"Route { cr ['destination' ]} not matched to TGW" )
591- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not matched to TGW" })
600+ logger .warning (
601+ f"Route { cr ['destination' ]} not matched to TGW" )
602+ drift .append (
603+ {"Route" : cr ['destination' ], "Reason" : "Not matched to TGW" })
592604 elif cr ['target' ].startswith ("NFW_" ):
593605 if not ("GatewayId" in dre and dre ['GatewayId' ].startswith ("vpce-" )):
594- logger .warning (f"Route { cr ['destination' ]} not matched to NFW VPCE" )
595- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not matched to NFW VPCE" })
606+ logger .warning (
607+ f"Route { cr ['destination' ]} not matched to NFW VPCE" )
608+ drift .append (
609+ {"Route" : cr ['destination' ], "Reason" : "Not matched to NFW VPCE" })
596610 elif cr ['target' ].startswith ("NATGW_" ):
597611 if not "NatGatewayId" in dre :
598- logger .warning (f"Route { cr ['destination' ]} not matched to NATGW" )
599- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not matched to NATGW" })
612+ logger .warning (
613+ f"Route { cr ['destination' ]} not matched to NATGW" )
614+ drift .append (
615+ {"Route" : cr ['destination' ], "Reason" : "Not matched to NATGW" })
600616 elif cr ['target' ] == "VGW" :
601617 if not ("GatewayId" in dre and dre ['GatewayId' ].startswith ("vgw-" )):
602- logger .warning (f"Route { cr ['destination' ]} not matched to VGW" )
603- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not matched to VGW" })
618+ logger .warning (
619+ f"Route { cr ['destination' ]} not matched to VGW" )
620+ drift .append (
621+ {"Route" : cr ['destination' ], "Reason" : "Not matched to VGW" })
604622 elif cr ['target' ].lower () == "firewall" :
605623 if not "InstanceId" in dre :
606- logger .warning (f"Route { cr ['destination' ]} not matched to firewall instance" )
607- drift .append ({"Route" : cr ['destination' ], "Reason" : "Not matched to firewall instance" })
624+ logger .warning (
625+ f"Route { cr ['destination' ]} not matched to firewall instance" )
626+ drift .append (
627+ {"Route" : cr ['destination' ], "Reason" : "Not matched to firewall instance" })
608628 else :
609629 logger .error (f"Route target { cr ['target' ]} is not supported!" )
610- drift .append ({"Route" : cr ['destination' ], "Reason" : f"Route target { cr ['target' ]} is not supported!" })
630+ drift .append ({"Route" : cr ['destination' ], "Reason" : f"Route target {
631+ cr ['target' ]} is not supported!" })
611632 else :
612- #this should not be possible!
613- logger .error (f"More than one route with destination { cr ['destination' ]} is deployed!" )
614- drift .append ({"Route" : cr ['destination' ], "Reason" : f"More than one route with destination { cr ['destination' ]} found" })
633+ # this should not be possible!
634+ logger .error (f"More than one route with destination { cr ['destination' ]} is deployed!" ) # nopep8
635+ drift .append ({"Route" : cr ['destination' ], "Reason" : f"More than one route with destination {
636+ cr ['destination' ]} found" })
615637
616- #check if there are route entries deployed that are not in the config
638+ # check if there are route entries deployed that are not in the config
617639 for dr in dRoutes :
618640 if 'VpcPeeringConnectionId' in dr :
619- logger .warning (f"Route { dr ['DestinationCidrBlock' ]} is a VPC peering route. Skipping check" )
641+ logger .warning (
642+ f"Route { dr ['DestinationCidrBlock' ]} is a VPC peering route. Skipping check" )
620643 continue
621644
622- cr = [r for r in cRoutes if r ['destination' ] == dr ['DestinationCidrBlock' ]]
645+ cr = [r for r in cRoutes if r ['destination' ]
646+ == dr ['DestinationCidrBlock' ]]
623647 if len (cr ) == 0 :
624- logger .warning (f"Route { dr ['DestinationCidrBlock' ]} exists in deployed route table but not found in config" )
625- drift .append ({"Route" : dr ['DestinationCidrBlock' ], "Reason" : "Not found in config" })
648+ logger .warning (f"Route { dr ['DestinationCidrBlock' ]} exists in deployed route table but not found in config" ) # nopep8
649+ drift .append (
650+ {"Route" : dr ['DestinationCidrBlock' ], "Reason" : "Not found in config" })
626651
627652 return drift
628653
629654
630-
631-
632655def get_tgw_from_config (asea_config , region ):
633656 """
634657 Get all Transit Gateways defined in the config for the provided region
@@ -787,8 +810,7 @@ def main():
787810 accel_prefix = args .accel_prefix
788811 asea_config_path = args .raw_config_path
789812 output_path = args .output_dir
790- role_to_assume = args .role_to_assume if args .role_to_assume else f"{
791- accel_prefix } -PipelineRole"
813+ role_to_assume = args .role_to_assume if args .role_to_assume else f"{ accel_prefix } -PipelineRole" # nopep8
792814 parameter_table = f"{ accel_prefix } -Parameters"
793815 shared_network_key = 'shared-network'
794816 home_region = args .home_region
0 commit comments