1+ /*
2+ * Copyright Debezium Authors.
3+ *
4+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+ */
6+ package io .debezium .relational .ddl ;
7+
8+ import java .util .ArrayList ;
9+ import java .util .HashSet ;
10+ import java .util .List ;
11+ import java .util .Set ;
12+ import java .util .function .Predicate ;
13+
14+ import io .debezium .annotation .NotThreadSafe ;
15+ import io .debezium .relational .RelationalTableFilters ;
16+ import io .debezium .relational .TableId ;
17+ import io .debezium .relational .Tables .TableFilter ;
18+
19+ /**
20+ * Copied from Debezium project.
21+ * A {@link DdlParserListener} that accumulates changes, allowing them to be consumed in the same order by database.
22+ *
23+ * @author Randall Hauch
24+ */
25+ @ NotThreadSafe
26+ public class DdlChanges implements DdlParserListener {
27+
28+ private final String terminator ;
29+ private final List <Event > events = new ArrayList <>();
30+ private final Set <String > databaseNames = new HashSet <>();
31+
32+ /**
33+ * Create a new changes object with ';' as the terminator token.
34+ */
35+ public DdlChanges () {
36+ this (null );
37+ }
38+
39+ /**
40+ * Create a new changes object with the designated terminator token.
41+ *
42+ * @param terminator the token used to terminate each statement; may be null
43+ */
44+ public DdlChanges (String terminator ) {
45+ this .terminator = terminator != null ? terminator : ";" ;
46+ }
47+
48+ /**
49+ * Clear all accumulated changes.
50+ *
51+ * @return this object for method chaining; never null
52+ */
53+ public DdlChanges reset () {
54+ events .clear ();
55+ databaseNames .clear ();
56+ return this ;
57+ }
58+
59+ @ Override
60+ public void handle (Event event ) {
61+ events .add (event );
62+ databaseNames .add (getDatabase (event ));
63+ }
64+
65+ /**
66+ * Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
67+ * but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
68+ * @param consumer the consumer
69+ */
70+ public void groupStatementStringsByDatabase (DatabaseStatementStringConsumer consumer ) {
71+ groupEventsByDatabase ((DatabaseEventConsumer ) (dbName , eventList ) -> {
72+ final StringBuilder statements = new StringBuilder ();
73+ final Set <TableId > tables = new HashSet <>();
74+ eventList .forEach (event -> {
75+ statements .append (event .statement ());
76+ statements .append (terminator );
77+ addTable (tables , event );
78+ });
79+ consumer .consume (dbName , tables , statements .toString ());
80+ });
81+ }
82+
83+ private void addTable (final Set <TableId > tables , Event event ) {
84+ if (event instanceof TableEvent ) {
85+ tables .add (((TableEvent ) event ).tableId ());
86+ }
87+ }
88+
89+ /**
90+ * Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
91+ * but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
92+ * @param consumer the consumer
93+ */
94+ public void groupStatementsByDatabase (DatabaseStatementConsumer consumer ) {
95+ groupEventsByDatabase ((DatabaseEventConsumer ) (dbName , eventList ) -> {
96+ List <String > statements = new ArrayList <>();
97+ final Set <TableId > tables = new HashSet <>();
98+ eventList .forEach (event -> {
99+ statements .add (event .statement ());
100+ addTable (tables , event );
101+ });
102+ consumer .consume (dbName , tables , statements );
103+ });
104+ }
105+
106+ /**
107+ * Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
108+ * but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
109+ * @param consumer the consumer
110+ */
111+ public void groupEventsByDatabase (DatabaseEventConsumer consumer ) {
112+ if (isEmpty ()) {
113+ return ;
114+ }
115+ if (databaseNames .size () <= 1 ) {
116+ consumer .consume (databaseNames .iterator ().next (), events );
117+ return ;
118+ }
119+ List <Event > dbEvents = new ArrayList <>();
120+ String currentDatabase = null ;
121+ for (Event event : events ) {
122+ String dbName = getDatabase (event );
123+ if (currentDatabase == null || dbName .equals (currentDatabase )) {
124+ currentDatabase = dbName ;
125+ // Accumulate the statement ...
126+ dbEvents .add (event );
127+ }
128+ else {
129+ // Submit the statements ...
130+ consumer .consume (currentDatabase , dbEvents );
131+ }
132+ }
133+ }
134+
135+ /**
136+ * Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
137+ * but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
138+ * @param consumer the consumer
139+ */
140+ public void getEventsByDatabase (DatabaseEventConsumer consumer ) {
141+ if (isEmpty ()) {
142+ return ;
143+ }
144+ if (databaseNames .size () <= 1 ) {
145+ consumer .consume (databaseNames .iterator ().next (), events );
146+ return ;
147+ }
148+ List <Event > dbEvents = new ArrayList <>();
149+ String currentDatabase = null ;
150+ for (Event event : events ) {
151+ String dbName = getDatabase (event );
152+ if (currentDatabase == null || dbName .equals (currentDatabase )) {
153+ currentDatabase = dbName ;
154+ // Accumulate the statement ...
155+ dbEvents .add (event );
156+ }
157+ else {
158+ // Submit the statements ...
159+ consumer .consume (currentDatabase , dbEvents );
160+ dbEvents = new ArrayList <>();
161+ currentDatabase = dbName ;
162+ // Accumulate the statement ...
163+ dbEvents .add (event );
164+ }
165+ }
166+ if (!dbEvents .isEmpty ()) {
167+ consumer .consume (currentDatabase , dbEvents );
168+ }
169+ }
170+
171+ protected String getDatabase (Event event ) {
172+ switch (event .type ()) {
173+ case CREATE_TABLE :
174+ case ALTER_TABLE :
175+ case DROP_TABLE :
176+ case TRUNCATE_TABLE :
177+ TableEvent tableEvent = (TableEvent ) event ;
178+ return tableEvent .tableId ().catalog ();
179+ case CREATE_INDEX :
180+ case DROP_INDEX :
181+ TableIndexEvent tableIndexEvent = (TableIndexEvent ) event ;
182+ return tableIndexEvent .tableId ().catalog ();
183+ case CREATE_DATABASE :
184+ case ALTER_DATABASE :
185+ case DROP_DATABASE :
186+ case USE_DATABASE :
187+ DatabaseEvent dbEvent = (DatabaseEvent ) event ;
188+ return dbEvent .databaseName ();
189+ case SET_VARIABLE :
190+ SetVariableEvent varEvent = (SetVariableEvent ) event ;
191+ return varEvent .databaseName ().orElse ("" );
192+ }
193+ assert false : "Should never happen" ;
194+ return null ;
195+ }
196+
197+ public boolean isEmpty () {
198+ return events .isEmpty ();
199+ }
200+
201+ public boolean applyToMoreDatabasesThan (String name ) {
202+ return databaseNames .contains (name ) ? databaseNames .size () > 1 : databaseNames .size () > 0 ;
203+ }
204+
205+ @ Override
206+ public String toString () {
207+ return events .toString ();
208+ }
209+
210+ public static interface DatabaseEventConsumer {
211+ void consume (String databaseName , List <Event > events );
212+ }
213+
214+ public static interface DatabaseStatementConsumer {
215+ void consume (String databaseName , Set <TableId > tableList , List <String > ddlStatements );
216+ }
217+
218+ public static interface DatabaseStatementStringConsumer {
219+ void consume (String databaseName , Set <TableId > tableList , String ddlStatements );
220+ }
221+
222+ /**
223+ * @return true if any event stored is one of
224+ * <ul>
225+ * <li>database-wide events and affects included/excluded database</li>
226+ * <li>table related events and the table is included</li>
227+ * <li>events that set a variable and either affects included database or is a system-wide variable</li>
228+ * <ul>
229+ */
230+ @ Deprecated
231+ public boolean anyMatch (Predicate <String > databaseFilter , Predicate <TableId > tableFilter ) {
232+ return events .stream ().anyMatch (event -> (event instanceof DatabaseEvent ) && databaseFilter .test (((DatabaseEvent ) event ).databaseName ())
233+ || (event instanceof TableEvent ) && tableFilter .test (((TableEvent ) event ).tableId ())
234+ || (event instanceof SetVariableEvent ) && (!((SetVariableEvent ) event ).databaseName ().isPresent ()
235+ || databaseFilter .test (((SetVariableEvent ) event ).databaseName ().get ())));
236+ }
237+
238+ /**
239+ * @return true if any event stored is one of
240+ * <ul>
241+ * <li>database-wide events and affects included/excluded database</li>
242+ * <li>table related events and the table is included</li>
243+ * <li>events that set a variable and either affects included database or is a system-wide variable</li>
244+ * <ul>
245+ */
246+ // TODO javadoc
247+ public boolean anyMatch (RelationalTableFilters filters ) {
248+ Predicate <String > databaseFilter = filters .databaseFilter ();
249+ TableFilter tableFilter = filters .dataCollectionFilter ();
250+ return events .stream ().anyMatch (event -> (event instanceof DatabaseEvent ) && databaseFilter .test (((DatabaseEvent ) event ).databaseName ())
251+ || (event instanceof TableEvent ) && tableFilter .isIncluded (((TableEvent ) event ).tableId ())
252+ || (event instanceof SetVariableEvent ) && (!((SetVariableEvent ) event ).databaseName ().isPresent ()
253+ || databaseFilter .test (((SetVariableEvent ) event ).databaseName ().get ())));
254+ }
255+
256+ public boolean anyMatch (Predicate <Event > predicate ) {
257+ return events .stream ().anyMatch (predicate );
258+ }
259+
260+ }
0 commit comments