diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 24ad5c00..ffbfa9d9 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -1,7 +1,7 @@ [General] # CheckDuplicates signals if the events received from observers have been already pushed to clients # Requires a redis instance/cluster and should be used when multiple observers push from the same shard - CheckDuplicates = true + CheckDuplicates = false # ExternalMarshaller is used for handling incoming/outcoming api requests [General.ExternalMarshaller] @@ -19,7 +19,7 @@ [WebSocketConnector] # Enabled will determine if websocket connector will be enabled or not - Enabled = false + Enabled = true # URL for the WebSocket client/server connection # This value represents the IP address and port number that the WebSocket client or server will use to establish a connection. @@ -46,6 +46,35 @@ # The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned AcknowledgeTimeoutInSec = 60 +[ExternalWebSocketConnector] + # Enabled will determine if websocket connector will be enabled or not + Enabled = true + + # URL for the WebSocket client/server connection + # This value represents the IP address and port number that the WebSocket client or server will use to establish a connection. + URL = "localhost:22112" + + # This flag describes the mode to start the WebSocket connector. Can be "client" or "server" + Mode = "server" + + # Possible values: json, gogo protobuf. Should be compatible with mx-chain-node outport driver config + DataMarshallerType = "json" + + # Retry duration (receive/send ack signal) in seconds + RetryDurationInSec = 5 + + # Signals if in case of data payload processing error, we should send the ack signal or not + BlockingAckOnError = false + + # Set to true to drop messages if there is no active WebSocket connection to send to. + DropMessagesIfNoConnection = false + + # After a message will be sent it will wait for an ack message if this flag is enabled + WithAcknowledge = true + + # The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned + AcknowledgeTimeoutInSec = 60 + [ConnectorApi] # Enabled will determine if http connector will be enabled or not. # It will determine if http connector endpoints will be created. @@ -54,7 +83,7 @@ # The address on which the events notifier listens for subscriptions # It can be specified as "localhost:5000" or only as "5000" - Host = "5000" + Host = "5001" # Username and Password needed to authorize the connector # BasicAuth is enabled only for the endpoints with "Auth" flag enabled diff --git a/common/constants.go b/common/constants.go index 2908cf83..bd008b5a 100644 --- a/common/constants.go +++ b/common/constants.go @@ -6,6 +6,9 @@ const ( // MessageQueuePublisherType defines a webserver api type using a message queueing service MessageQueuePublisherType string = "rabbitmq" + + // WSPublisherTypeV2 defines a webserver api type using WebSockets without custom filtering and subscriptions + WSPublisherTypeV2 string = "wsv2" ) const ( diff --git a/config/config.go b/config/config.go index b99f17ba..0375df66 100644 --- a/config/config.go +++ b/config/config.go @@ -11,11 +11,12 @@ type Configs struct { // MainConfig defines the config setup based on main config file type MainConfig struct { - General GeneralConfig - WebSocketConnector WebSocketConfig - ConnectorApi ConnectorApiConfig - Redis RedisConfig - RabbitMQ RabbitMQConfig + General GeneralConfig + WebSocketConnector WebSocketConfig + ConnectorApi ConnectorApiConfig + Redis RedisConfig + RabbitMQ RabbitMQConfig + ExternalWebSocketConnector WebSocketConfig } // GeneralConfig maps the general config section diff --git a/factory/hubFactory.go b/factory/hubFactory.go index be94e12f..f863ef3d 100644 --- a/factory/hubFactory.go +++ b/factory/hubFactory.go @@ -15,6 +15,8 @@ func CreateHub(apiType string) (dispatcher.Hub, error) { return &disabled.Hub{}, nil case common.WSPublisherType: return createHub() + case common.WSPublisherTypeV2: + return &disabled.Hub{}, nil default: return nil, common.ErrInvalidAPIType } diff --git a/factory/pubsubFactory.go b/factory/pubsubFactory.go index 5ebb8ce3..ac2367ad 100644 --- a/factory/pubsubFactory.go +++ b/factory/pubsubFactory.go @@ -2,11 +2,13 @@ package factory import ( "github.com/multiversx/mx-chain-core-go/marshal" + marshalFactory "github.com/multiversx/mx-chain-core-go/marshal/factory" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/config" "github.com/multiversx/mx-chain-notifier-go/dispatcher" "github.com/multiversx/mx-chain-notifier-go/process" "github.com/multiversx/mx-chain-notifier-go/rabbitmq" + "github.com/multiversx/mx-chain-notifier-go/ws" ) // CreatePublisher creates publisher component @@ -21,12 +23,17 @@ func CreatePublisher( return createRabbitMqPublisher(config.RabbitMQ, marshaller) case common.WSPublisherType: return createWSPublisher(commonHub) + case common.WSPublisherTypeV2: + return createWSPublisherV2(config.ExternalWebSocketConnector) default: return nil, common.ErrInvalidAPIType } } -func createRabbitMqPublisher(config config.RabbitMQConfig, marshaller marshal.Marshalizer) (rabbitmq.PublisherService, error) { +func createRabbitMqPublisher( + config config.RabbitMQConfig, + marshaller marshal.Marshalizer, +) (rabbitmq.PublisherService, error) { rabbitClient, err := rabbitmq.NewRabbitMQClient(config.Url) if err != nil { return nil, err @@ -48,3 +55,29 @@ func createRabbitMqPublisher(config config.RabbitMQConfig, marshaller marshal.Ma func createWSPublisher(commonHub dispatcher.Hub) (process.Publisher, error) { return process.NewPublisher(commonHub) } + +func createWSPublisherV2( + config config.WebSocketConfig, +) (process.Publisher, error) { + marshaller, err := marshalFactory.NewMarshalizer(config.DataMarshallerType) + if err != nil { + return nil, err + } + + host, err := createWsHost(config, marshaller) + if err != nil { + return nil, err + } + + args := ws.WSPublisherArgs{ + Marshaller: marshaller, + WSConn: host, + } + + wsPublisher, err := ws.NewWSPublisher(args) + if err != nil { + return nil, err + } + + return process.NewPublisher(wsPublisher) +} diff --git a/factory/wsFactory.go b/factory/wsFactory.go index 4122a1de..a7484209 100644 --- a/factory/wsFactory.go +++ b/factory/wsFactory.go @@ -25,6 +25,8 @@ func CreateWSHandler(apiType string, wsDispatcher dispatcher.Dispatcher, marshal return &disabled.WSHandler{}, nil case common.WSPublisherType: return createWSHandler(wsDispatcher, marshaller) + case common.WSPublisherTypeV2: + return &disabled.WSHandler{}, nil default: return nil, common.ErrInvalidAPIType } diff --git a/ws/publisher.go b/ws/publisher.go new file mode 100644 index 00000000..0d5341da --- /dev/null +++ b/ws/publisher.go @@ -0,0 +1,75 @@ +package ws + +import ( + "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/marshal" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-notifier-go/data" +) + +type WSConnection interface { + Send(payload []byte, topic string) error + Close() error + IsInterfaceNil() bool +} + +var log = logger.GetOrCreate("ws") + +type WSPublisherArgs struct { + Marshaller marshal.Marshalizer + WSConn WSConnection +} + +type wsPublisher struct { + marshaller marshal.Marshalizer + wsConn WSConnection +} + +// NewWSPublisher will create a new instance of websocket publisher +func NewWSPublisher(args WSPublisherArgs) (*wsPublisher, error) { + return &wsPublisher{ + marshaller: args.Marshaller, + wsConn: args.WSConn, + }, nil +} + +// Publish will publish logs and events to websocket clients +func (w *wsPublisher) Publish(events data.BlockEvents) { + eventBytes, err := w.marshaller.Marshal(events) + if err != nil { + log.Error("failure marshalling events", "err", err.Error()) + return + } + + w.wsConn.Send(eventBytes, outport.TopicSaveBlock) +} + +// PublishRevert will publish revert event to websocket clients +func (w *wsPublisher) PublishRevert(revertBlock data.RevertBlock) { +} + +// PublishFinalized will publish finalized event to websocket clients +func (w *wsPublisher) PublishFinalized(finalizedBlock data.FinalizedBlock) { +} + +// PublishTxs will publish txs event to websocket clients +func (w *wsPublisher) PublishTxs(blockTxs data.BlockTxs) { +} + +// PublishScrs will publish scrs event to websocket clients +func (w *wsPublisher) PublishScrs(blockScrs data.BlockScrs) { +} + +// PublishBlockEventsWithOrder will publish block events with order to websocket clients +func (w *wsPublisher) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { +} + +// Close will trigger to close ws publisher +func (w *wsPublisher) Close() error { + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (w *wsPublisher) IsInterfaceNil() bool { + return w == nil +}