@@ -112,9 +112,9 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
112112 valueConverter ,
113113 getTableFilter ());
114114 this .ddlChanges = this .ddlParser .getDdlChanges ();
115+ this .parseOnLineSchemaChanges = parseOnLineSchemaChanges ;
115116 this .connectorConfig = connectorConfig ;
116117 filters = connectorConfig .getTableFilters ();
117- this .parseOnLineSchemaChanges = parseOnLineSchemaChanges ;
118118 }
119119
120120 /**
@@ -158,11 +158,13 @@ private boolean isGhostTable(TableId tableId) {
158158 * table to schema change history.
159159 */
160160 private boolean isTableIncluded (String MethodName ,TableId tableId ) {
161- // 先应用原有过滤规则
162161 boolean isIncludedByOriginalFilter = filters .dataCollectionFilter ().isIncluded (tableId );
162+ LOGGER .info ("MethodName: {},Including isIncludedByOriginalFilter table that would otherwise be filtered: {}," +
163+ "isIncludedByOriginalFilter:{}" ,
164+ MethodName , tableId ,isIncludedByOriginalFilter );
163165 if (parseOnLineSchemaChanges ) {
164166 if (!isIncludedByOriginalFilter && isGhostTable (tableId )) {
165- LOGGER .info ("MethodName{},Including gh-ost table that would otherwise be filtered: {}" ,MethodName , tableId );
167+ LOGGER .info ("MethodName: {},Including gh-ost table that would otherwise be filtered: {}" ,MethodName , tableId );
166168 return true ;
167169 }
168170 }
@@ -226,7 +228,10 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
226228 // - or DDLs for monitored objects
227229 if (!databaseHistory .storeOnlyCapturedTables () || isGlobalSetVariableStatement (schemaChange .getDdl (), schemaChange .getDatabase ())
228230 || schemaChange .getTables ().stream ().map (Table ::id ).anyMatch (x ->isTableIncluded ("applySchemaChange" ,x ))) {
229- LOGGER .info ("Recorded DDL statements for database '{}': {}" , schemaChange .getDatabase (), schemaChange .getDdl ());
231+ boolean applySchemaChange = schemaChange .getTables ().stream ().map (Table ::id ).anyMatch (x -> isTableIncluded ("applySchemaChange" , x ));
232+ LOGGER .info ("Recorded DDL statements for database '{}': {},{},{},{}" , schemaChange .getDatabase (),
233+ schemaChange .getDdl ()
234+ ,applySchemaChange ,!databaseHistory .storeOnlyCapturedTables (),isGlobalSetVariableStatement (schemaChange .getDdl (), schemaChange .getDatabase ()));
230235 record (schemaChange , schemaChange .getTableChanges ());
231236 }
232237 }
@@ -239,7 +244,7 @@ public List<SchemaChangeEvent> parseSnapshotDdl(MySqlPartition partition, String
239244
240245 public List <SchemaChangeEvent > parseStreamingDdl (MySqlPartition partition , String ddlStatements , String databaseName ,
241246 MySqlOffsetContext offset , Instant sourceTime ) {
242- LOGGER .debug ("Processing streaming DDL '{}' for database '{}'" , ddlStatements , databaseName );
247+ LOGGER .info ("Processing streaming DDL '{}' for database '{}'" , ddlStatements , databaseName );
243248 return parseDdl (partition , ddlStatements , databaseName , offset , sourceTime , false );
244249 }
245250
@@ -265,10 +270,7 @@ private List<SchemaChangeEvent> parseDdl(MySqlPartition partition, String ddlSta
265270 }
266271 }
267272 // No need to send schema events or store DDL if no table has changed
268- if (!databaseHistory .storeOnlyCapturedTables () || isGlobalSetVariableStatement (ddlStatements , databaseName ) || ddlChanges .anyMatch (event -> {
269- TableId tableId = this .getTableId (event );
270- return tableId != null && isTableIncluded ("parseDdl" ,tableId );
271- })) {
273+ if (!databaseHistory .storeOnlyCapturedTables () || isGlobalSetVariableStatement (ddlStatements , databaseName ) || ddlChanges .anyMatch (filters ,parseOnLineSchemaChanges )){
272274
273275 // We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
274276 // by database. Unfortunately, the databaseName on the event might not be the same database as that
@@ -407,13 +409,12 @@ public boolean storeOnlyCapturedTables() {
407409 * connector's configuration
408410 */
409411 public boolean assignTableNumber (long tableNumber , TableId id ) {
410-
412+ if (!isTableIncluded ("assignTableNumber" ,id )) {
413+ excludeTableIdsByTableNumber .put (tableNumber , id );
414+ return false ;
415+ }
411416 final TableSchema tableSchema = schemaFor (id );
412417 if (tableSchema == null ) {
413- if (isTableIncluded ("assignTableNumber" ,id )) {
414- tableIdsByTableNumber .put (tableNumber , id );
415- return true ;
416- }
417418 excludeTableIdsByTableNumber .put (tableNumber , id );
418419 return false ;
419420 }
0 commit comments