diff --git a/canal/canal.go b/canal/canal.go index 1ba0c5165..132e7d2e7 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -463,7 +463,12 @@ func (c *Canal) prepareSyncer() error { Dialer: c.cfg.Dialer, Localhost: c.cfg.Localhost, EventCacheCount: c.cfg.EventCacheCount, - RowsEventDecodeFunc: func(event *replication.RowsEvent, data []byte) error { + } + + if c.cfg.RowsEventDecodeFunc != nil { + cfg.RowsEventDecodeFunc = c.cfg.RowsEventDecodeFunc + } else { + cfg.RowsEventDecodeFunc = func(event *replication.RowsEvent, data []byte) error { pos, err := event.DecodeHeader(data) if err != nil { return err @@ -475,7 +480,7 @@ func (c *Canal) prepareSyncer() error { } return event.DecodeData(pos, data) - }, + } } if strings.Contains(c.cfg.Addr, "/") { diff --git a/canal/config.go b/canal/config.go index 16692287d..6c7e435d6 100644 --- a/canal/config.go +++ b/canal/config.go @@ -14,6 +14,7 @@ import ( "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" ) type DumpConfig struct { @@ -71,6 +72,9 @@ type Config struct { IncludeTableRegex []string `toml:"include_table_regex"` ExcludeTableRegex []string `toml:"exclude_table_regex"` + // Allows to specify a custom function to decode RowsEvent. This can be used to skip decoding of some events. + RowsEventDecodeFunc func(*replication.RowsEvent, []byte) error + // discard row event without table meta DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`