Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions cmd/notifier/config/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[General]
# CheckDuplicates signals if the events received from observers have been already pushed to clients
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true
CheckDuplicates = false

# ExternalMarshaller is used for handling incoming/outcoming api requests
[General.ExternalMarshaller]
Expand All @@ -19,7 +19,7 @@

[WebSocketConnector]
# Enabled will determine if websocket connector will be enabled or not
Enabled = false
Enabled = true

# URL for the WebSocket client/server connection
# This value represents the IP address and port number that the WebSocket client or server will use to establish a connection.
Expand All @@ -46,6 +46,35 @@
# The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned
AcknowledgeTimeoutInSec = 60

[ExternalWebSocketConnector]
# Enabled will determine if websocket connector will be enabled or not
Enabled = true

# URL for the WebSocket client/server connection
# This value represents the IP address and port number that the WebSocket client or server will use to establish a connection.
URL = "localhost:22112"

# This flag describes the mode to start the WebSocket connector. Can be "client" or "server"
Mode = "server"

# Possible values: json, gogo protobuf. Should be compatible with mx-chain-node outport driver config
DataMarshallerType = "json"

# Retry duration (receive/send ack signal) in seconds
RetryDurationInSec = 5

# Signals if in case of data payload processing error, we should send the ack signal or not
BlockingAckOnError = false

# Set to true to drop messages if there is no active WebSocket connection to send to.
DropMessagesIfNoConnection = false

# After a message will be sent it will wait for an ack message if this flag is enabled
WithAcknowledge = true

# The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned
AcknowledgeTimeoutInSec = 60

[ConnectorApi]
# Enabled will determine if http connector will be enabled or not.
# It will determine if http connector endpoints will be created.
Expand All @@ -54,7 +83,7 @@

# The address on which the events notifier listens for subscriptions
# It can be specified as "localhost:5000" or only as "5000"
Host = "5000"
Host = "5001"

# Username and Password needed to authorize the connector
# BasicAuth is enabled only for the endpoints with "Auth" flag enabled
Expand Down
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const (

// MessageQueuePublisherType defines a webserver api type using a message queueing service
MessageQueuePublisherType string = "rabbitmq"

// WSPublisherTypeV2 defines a webserver api type using WebSockets without custom filtering and subscriptions
WSPublisherTypeV2 string = "wsv2"
)

const (
Expand Down
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ type Configs struct {

// MainConfig defines the config setup based on main config file
type MainConfig struct {
General GeneralConfig
WebSocketConnector WebSocketConfig
ConnectorApi ConnectorApiConfig
Redis RedisConfig
RabbitMQ RabbitMQConfig
General GeneralConfig
WebSocketConnector WebSocketConfig
ConnectorApi ConnectorApiConfig
Redis RedisConfig
RabbitMQ RabbitMQConfig
ExternalWebSocketConnector WebSocketConfig
}

// GeneralConfig maps the general config section
Expand Down
2 changes: 2 additions & 0 deletions factory/hubFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ func CreateHub(apiType string) (dispatcher.Hub, error) {
return &disabled.Hub{}, nil
case common.WSPublisherType:
return createHub()
case common.WSPublisherTypeV2:
return &disabled.Hub{}, nil
default:
return nil, common.ErrInvalidAPIType
}
Expand Down
35 changes: 34 additions & 1 deletion factory/pubsubFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package factory

import (
"github.com/multiversx/mx-chain-core-go/marshal"
marshalFactory "github.com/multiversx/mx-chain-core-go/marshal/factory"
"github.com/multiversx/mx-chain-notifier-go/common"
"github.com/multiversx/mx-chain-notifier-go/config"
"github.com/multiversx/mx-chain-notifier-go/dispatcher"
"github.com/multiversx/mx-chain-notifier-go/process"
"github.com/multiversx/mx-chain-notifier-go/rabbitmq"
"github.com/multiversx/mx-chain-notifier-go/ws"
)

// CreatePublisher creates publisher component
Expand All @@ -21,12 +23,17 @@ func CreatePublisher(
return createRabbitMqPublisher(config.RabbitMQ, marshaller)
case common.WSPublisherType:
return createWSPublisher(commonHub)
case common.WSPublisherTypeV2:
return createWSPublisherV2(config.ExternalWebSocketConnector)
default:
return nil, common.ErrInvalidAPIType
}
}

func createRabbitMqPublisher(config config.RabbitMQConfig, marshaller marshal.Marshalizer) (rabbitmq.PublisherService, error) {
func createRabbitMqPublisher(
config config.RabbitMQConfig,
marshaller marshal.Marshalizer,
) (rabbitmq.PublisherService, error) {
rabbitClient, err := rabbitmq.NewRabbitMQClient(config.Url)
if err != nil {
return nil, err
Expand All @@ -48,3 +55,29 @@ func createRabbitMqPublisher(config config.RabbitMQConfig, marshaller marshal.Ma
func createWSPublisher(commonHub dispatcher.Hub) (process.Publisher, error) {
return process.NewPublisher(commonHub)
}

func createWSPublisherV2(
config config.WebSocketConfig,
) (process.Publisher, error) {
marshaller, err := marshalFactory.NewMarshalizer(config.DataMarshallerType)
if err != nil {
return nil, err
}

host, err := createWsHost(config, marshaller)
if err != nil {
return nil, err
}

args := ws.WSPublisherArgs{
Marshaller: marshaller,
WSConn: host,
}

wsPublisher, err := ws.NewWSPublisher(args)
if err != nil {
return nil, err
}

return process.NewPublisher(wsPublisher)
}
2 changes: 2 additions & 0 deletions factory/wsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func CreateWSHandler(apiType string, wsDispatcher dispatcher.Dispatcher, marshal
return &disabled.WSHandler{}, nil
case common.WSPublisherType:
return createWSHandler(wsDispatcher, marshaller)
case common.WSPublisherTypeV2:
return &disabled.WSHandler{}, nil
default:
return nil, common.ErrInvalidAPIType
}
Expand Down
75 changes: 75 additions & 0 deletions ws/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ws

import (
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/marshal"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-notifier-go/data"
)

type WSConnection interface {
Send(payload []byte, topic string) error
Close() error
IsInterfaceNil() bool
}

var log = logger.GetOrCreate("ws")

type WSPublisherArgs struct {
Marshaller marshal.Marshalizer
WSConn WSConnection
}

type wsPublisher struct {
marshaller marshal.Marshalizer
wsConn WSConnection
}

// NewWSPublisher will create a new instance of websocket publisher
func NewWSPublisher(args WSPublisherArgs) (*wsPublisher, error) {
return &wsPublisher{
marshaller: args.Marshaller,
wsConn: args.WSConn,
}, nil
}

// Publish will publish logs and events to websocket clients
func (w *wsPublisher) Publish(events data.BlockEvents) {
eventBytes, err := w.marshaller.Marshal(events)
if err != nil {
log.Error("failure marshalling events", "err", err.Error())
return
}

w.wsConn.Send(eventBytes, outport.TopicSaveBlock)
}

// PublishRevert will publish revert event to websocket clients
func (w *wsPublisher) PublishRevert(revertBlock data.RevertBlock) {
}

// PublishFinalized will publish finalized event to websocket clients
func (w *wsPublisher) PublishFinalized(finalizedBlock data.FinalizedBlock) {
}

// PublishTxs will publish txs event to websocket clients
func (w *wsPublisher) PublishTxs(blockTxs data.BlockTxs) {
}

// PublishScrs will publish scrs event to websocket clients
func (w *wsPublisher) PublishScrs(blockScrs data.BlockScrs) {
}

// PublishBlockEventsWithOrder will publish block events with order to websocket clients
func (w *wsPublisher) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
}

// Close will trigger to close ws publisher
func (w *wsPublisher) Close() error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (w *wsPublisher) IsInterfaceNil() bool {
return w == nil
}
Loading