@@ -1027,6 +1027,214 @@ async def test_integration_fallbacks(self) -> bool:
10271027 self .log_test_result ("Integration Fallbacks" , False , str (e ), duration )
10281028 return False
10291029
1030+ async def test_host_device_collector (self ) -> bool :
1031+ """Test host device collector functionality and VFIO rebinding prevention."""
1032+ self .logger .info ("Testing host device collector and VFIO rebinding prevention..." )
1033+ start_time = time .time ()
1034+
1035+ try :
1036+ # Import required modules
1037+ from src .host_device_collector import HostDeviceCollector
1038+ from src .device_clone .pcileech_generator import PCILeechGenerator , PCILeechGenerationConfig
1039+ from src .device_clone .config_space_manager import ConfigSpaceManager
1040+ from src .utils .unified_context import UnifiedContextBuilder
1041+
1042+ # Test device
1043+ test_device = self .test_devices [0 ]
1044+ bdf = test_device ["bdf" ]
1045+
1046+ # Create mock device context data that would be collected by host
1047+ mock_device_context = {
1048+ "vendor_id" : test_device ["vendor_id" ],
1049+ "device_id" : test_device ["device_id" ],
1050+ "subsystem_vendor" : test_device ["subsystem_vendor" ],
1051+ "subsystem_device" : test_device ["subsystem_device" ],
1052+ "class_code" : test_device ["class_code" ],
1053+ "device_name" : test_device ["device_name" ],
1054+ "bdf" : bdf ,
1055+ "bar_config" : {
1056+ "bars" : [
1057+ {"size" : 0x1000000 , "type" : "memory" , "address" : 0xF0000000 },
1058+ {"size" : 0x1000 , "type" : "memory" , "address" : 0xF1000000 },
1059+ {"size" : 0x40 , "type" : "io" , "address" : 0x2000 }
1060+ ]
1061+ },
1062+ "capabilities" : {
1063+ "msi" : {"supported" : True , "vectors" : 1 },
1064+ "msix" : {"supported" : False },
1065+ "pcie" : {"supported" : True , "version" : "2.0" }
1066+ },
1067+ "generation_metadata" : {
1068+ "collected_by" : "host_device_collector" ,
1069+ "collection_time" : "2024-01-01T00:00:00Z" ,
1070+ "version" : "1.0"
1071+ }
1072+ }
1073+
1074+ # Create mock MSI-X data
1075+ mock_msix_data = {
1076+ "msix_supported" : False ,
1077+ "table_offset" : 0 ,
1078+ "table_size" : 0 ,
1079+ "vectors" : []
1080+ }
1081+
1082+ # Create temporary files for host-collected data
1083+ temp_dir = self .test_base_dir / "host_collected_data"
1084+ temp_dir .mkdir (parents = True , exist_ok = True )
1085+
1086+ device_context_file = temp_dir / "device_context.json"
1087+ msix_data_file = temp_dir / "msix_data.json"
1088+
1089+ # Write mock data to files
1090+ with open (device_context_file , 'w' ) as f :
1091+ json .dump (mock_device_context , f , indent = 2 )
1092+
1093+ with open (msix_data_file , 'w' ) as f :
1094+ json .dump (mock_msix_data , f , indent = 2 )
1095+
1096+ # Test 1: HostDeviceCollector can load pre-collected data
1097+ self .logger .info ("Testing HostDeviceCollector data loading..." )
1098+
1099+ # Set environment variables to point to our mock files
1100+ os .environ ["DEVICE_CONTEXT_FILE" ] = str (device_context_file )
1101+ os .environ ["MSIX_DATA_FILE" ] = str (msix_data_file )
1102+
1103+ collector = HostDeviceCollector ()
1104+
1105+ # Verify data loading
1106+ loaded_context = collector ._load_device_context ()
1107+ loaded_msix = collector ._load_msix_data ()
1108+
1109+ if not loaded_context :
1110+ raise ValueError ("Failed to load device context from file" )
1111+ if not loaded_msix :
1112+ raise ValueError ("Failed to load MSI-X data from file" )
1113+
1114+ # Verify loaded data matches expected
1115+ if loaded_context ["vendor_id" ] != test_device ["vendor_id" ]:
1116+ raise ValueError (f"Vendor ID mismatch: expected { test_device ['vendor_id' ]} , got { loaded_context ['vendor_id' ]} " )
1117+
1118+ self .logger .info ("✓ HostDeviceCollector successfully loaded pre-collected data" )
1119+
1120+ # Test 2: PCILeechGenerator uses preloaded config space instead of VFIO
1121+ self .logger .info ("Testing PCILeechGenerator with preloaded config space..." )
1122+
1123+ # Create realistic config space data
1124+ config_space_data = self ._generate_realistic_config_space (test_device )
1125+
1126+ # Create PCILeechGenerator with preloaded config space
1127+ config = PCILeechGenerationConfig (
1128+ bdf = bdf ,
1129+ board = test_device ["board" ],
1130+ output_dir = self .build_output_dir / "host_collector_test" ,
1131+ preloaded_config_space = config_space_data # This is the key feature
1132+ )
1133+
1134+ generator = PCILeechGenerator (config )
1135+
1136+ # Verify that the generator has the preloaded config space
1137+ if generator ._preloaded_config_space != config_space_data :
1138+ raise ValueError ("Generator did not properly store preloaded config space" )
1139+
1140+ # Test 3: Verify that _analyze_configuration_space uses preloaded data
1141+ self .logger .info ("Testing configuration space analysis with preloaded data..." )
1142+
1143+ # Mock VFIO operations to detect if they're called
1144+ vfio_called = False
1145+
1146+ def mock_vfio_read (* args , ** kwargs ):
1147+ nonlocal vfio_called
1148+ vfio_called = True
1149+ raise Exception ("VFIO should not be called when preloaded data is available" )
1150+
1151+ # Patch VFIO operations
1152+ with patch ("src.vfio_handler.VFIODeviceHandler.read_config_space" , side_effect = mock_vfio_read ):
1153+ with patch ("src.vfio_handler.VFIODeviceHandler.__init__" , side_effect = Exception ("VFIO binding should not occur" )):
1154+ try :
1155+ # This should use preloaded data and NOT call VFIO
1156+ config_space_manager = generator ._analyze_configuration_space ()
1157+
1158+ # Verify VFIO was NOT called
1159+ if vfio_called :
1160+ raise ValueError ("VFIO was called despite preloaded config space being available" )
1161+
1162+ # Verify we got a valid ConfigSpaceManager
1163+ if not config_space_manager :
1164+ raise ValueError ("Configuration space analysis returned None" )
1165+
1166+ # Verify the config space data is correct
1167+ if config_space_manager .get_raw_config_space () != config_space_data :
1168+ raise ValueError ("Config space manager does not contain expected data" )
1169+
1170+ self .logger .info ("✓ Configuration space analysis used preloaded data without VFIO calls" )
1171+
1172+ except Exception as e :
1173+ if "VFIO binding should not occur" in str (e ):
1174+ raise ValueError ("VFIO binding was attempted despite preloaded data being available" )
1175+ else :
1176+ raise
1177+
1178+ # Test 4: Test with environment variables set (container scenario)
1179+ self .logger .info ("Testing container scenario with environment variables..." )
1180+
1181+ # Set environment variables as they would be in container
1182+ os .environ ["PCILEECH_PRELOADED_CONFIG_SPACE" ] = config_space_data .hex ()
1183+ os .environ ["PCILEECH_SKIP_VFIO" ] = "true"
1184+
1185+ # Create generator with environment-based config
1186+ container_config = PCILeechGenerationConfig (
1187+ bdf = bdf ,
1188+ board = test_device ["board" ],
1189+ output_dir = self .build_output_dir / "container_test"
1190+ )
1191+
1192+ container_generator = PCILeechGenerator (container_config )
1193+
1194+ # Verify preloaded config space is set from environment
1195+ if not container_generator ._preloaded_config_space :
1196+ raise ValueError ("Container environment did not set preloaded config space" )
1197+
1198+ # Test 5: Verify MSI-X data integration
1199+ self .logger .info ("Testing MSI-X data integration..." )
1200+
1201+ # Test that MSI-X analysis uses pre-collected data
1202+ msix_analysis_result = generator ._analyze_msix_capabilities ()
1203+
1204+ if msix_analysis_result is None :
1205+ self .logger .warning ("MSI-X analysis returned None, but this may be expected for mock data" )
1206+
1207+ duration = time .time () - start_time
1208+
1209+ # Create summary of what was tested
1210+ test_details = [
1211+ "✓ HostDeviceCollector loaded pre-collected device context" ,
1212+ "✓ HostDeviceCollector loaded pre-collected MSI-X data" ,
1213+ "✓ PCILeechGenerator accepted preloaded config space" ,
1214+ "✓ Configuration space analysis used preloaded data (no VFIO calls)" ,
1215+ "✓ Container environment variables properly configured generator" ,
1216+ "✓ MSI-X capability analysis completed"
1217+ ]
1218+
1219+ self .log_test_result (
1220+ "Host Device Collector" ,
1221+ True ,
1222+ " | " .join (test_details ),
1223+ duration ,
1224+ [str (device_context_file ), str (msix_data_file )]
1225+ )
1226+ return True
1227+
1228+ except Exception as e :
1229+ duration = time .time () - start_time
1230+ self .log_test_result ("Host Device Collector" , False , str (e ), duration )
1231+ return False
1232+ finally :
1233+ # Clean up environment variables
1234+ for env_var in ["DEVICE_CONTEXT_FILE" , "MSIX_DATA_FILE" , "PCILEECH_PRELOADED_CONFIG_SPACE" , "PCILEECH_SKIP_VFIO" ]:
1235+ if env_var in os .environ :
1236+ del os .environ [env_var ]
1237+
10301238 def generate_test_report (self ) -> Dict [str , Any ]:
10311239 """Generate comprehensive test report."""
10321240 total_duration = time .time () - self .start_time
@@ -1104,6 +1312,7 @@ async def run_all_tests(self) -> bool:
11041312 self .test_container_build ,
11051313 self .test_performance_benchmarks ,
11061314 self .test_integration_fallbacks ,
1315+ self .test_host_device_collector ,
11071316 ]
11081317
11091318 # Run tests
0 commit comments