Skip to content

Commit 6cec813

Browse files
committed
refactoring
1 parent 796cb3d commit 6cec813

23 files changed

+738
-272
lines changed

commands.go

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package cqrs
22

33
import (
4-
"log"
54
"reflect"
65
"time"
7-
8-
"github.com/pborman/uuid"
96
)
107

118
// Command represents an actor intention to alter the state of the system
@@ -20,39 +17,60 @@ type Command struct {
2017
// CreateCommand is a helper for creating a new command object with populated default properties
2118
func CreateCommand(body interface{}) Command {
2219
commandType := reflect.TypeOf(body)
23-
return Command{uuid.New(), uuid.New(), commandType.String(), time.Now(), body}
20+
return Command{MessageID: "mid:" + NewUUIDString(),
21+
CorrelationID: "cid:" + NewUUIDString(),
22+
CommandType: commandType.String(),
23+
Created: time.Now(),
24+
25+
Body: body}
2426
}
2527

2628
// CreateCommandWithCorrelationID is a helper for creating a new command object with populated default properties
2729
func CreateCommandWithCorrelationID(body interface{}, correlationID string) Command {
2830
commandType := reflect.TypeOf(body)
29-
return Command{uuid.New(), correlationID, commandType.String(), time.Now(), body}
31+
return Command{MessageID: "mid:" + NewUUIDString(),
32+
CorrelationID: correlationID,
33+
CommandType: commandType.String(),
34+
Created: time.Now(),
35+
Body: body}
3036
}
3137

3238
// CommandPublisher is responsilbe for publishing commands
3339
type CommandPublisher interface {
34-
Publish(Command) error
40+
PublishCommands([]Command) error
3541
}
3642

3743
// CommandReceiver is responsible for receiving commands
3844
type CommandReceiver interface {
3945
ReceiveCommands(CommandReceiverOptions) error
4046
}
4147

48+
// CommandBus ...
49+
type CommandBus interface {
50+
CommandReceiver
51+
CommandPublisher
52+
}
53+
4254
// CommandDispatchManager is responsible for coordinating receiving messages from command receivers and dispatching them to the command dispatcher.
4355
type CommandDispatchManager struct {
4456
commandDispatcher *MapBasedCommandDispatcher
4557
typeRegistry TypeRegistry
4658
receiver CommandReceiver
4759
}
4860

61+
// CommandDispatcher the internal command dispatcher
62+
func (m *CommandDispatchManager) CommandDispatcher() CommandDispatcher {
63+
return m.commandDispatcher
64+
}
65+
4966
// CommandReceiverOptions is an initalization structure to communicate to and from a command receiver go routine
5067
type CommandReceiverOptions struct {
5168
TypeRegistry TypeRegistry
5269
Close chan chan error
5370
Error chan error
54-
ReceiveCommand chan CommandTransactedAccept
71+
ReceiveCommand CommandHandler
5572
Exclusive bool
73+
ListenerCount int
5674
}
5775

5876
// CommandTransactedAccept is the message routed from a command receiver to the command manager.
@@ -106,17 +124,21 @@ func (m *MapBasedCommandDispatcher) DispatchCommand(command Command) error {
106124
if handlers, ok := m.registry[bodyType]; ok {
107125
for _, handler := range handlers {
108126
if err := handler(command); err != nil {
127+
metricsCommandsFailed.WithLabelValues(command.CommandType).Inc()
109128
return err
110129
}
111130
}
112131
}
113132

114133
for _, handler := range m.globalHandlers {
115134
if err := handler(command); err != nil {
135+
metricsCommandsFailed.WithLabelValues(command.CommandType).Inc()
116136
return err
117137
}
118138
}
119139

140+
metricsCommandsDispatched.WithLabelValues(command.CommandType).Inc()
141+
120142
return nil
121143
}
122144

@@ -137,46 +159,48 @@ func (m *CommandDispatchManager) RegisterGlobalHandler(handler CommandHandler) {
137159
}
138160

139161
// Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests
140-
func (m *CommandDispatchManager) Listen(stop <-chan bool, exclusive bool) error {
162+
func (m *CommandDispatchManager) Listen(stop <-chan bool, exclusive bool, listenerCount int) error {
141163
// Create communication channels
142164
//
143165
// for closing the queue listener,
144166
closeChannel := make(chan chan error)
145167
// receiving errors from the listener thread (go routine)
146168
errorChannel := make(chan error)
147-
// and receiving commands from the queue
148-
receiveCommandChannel := make(chan CommandTransactedAccept)
169+
170+
// Command received channel receives a result with a channel to respond to, signifying successful processing of the message.
171+
// This should eventually call a command handler. See cqrs.NewVersionedCommandDispatcher()
172+
receiveCommandHandler := func(command Command) error {
173+
PackageLogger().Debugf("CommandDispatchManager.DispatchCommand: %v", command.CorrelationID)
174+
err := m.commandDispatcher.DispatchCommand(command)
175+
if err != nil {
176+
PackageLogger().Debugf("Error dispatching command: %v", err)
177+
}
178+
179+
return err
180+
}
149181

150182
// Start receiving commands by passing these channels to the worker thread (go routine)
151-
options := CommandReceiverOptions{m.typeRegistry, closeChannel, errorChannel, receiveCommandChannel, exclusive}
183+
options := CommandReceiverOptions{m.typeRegistry, closeChannel, errorChannel, receiveCommandHandler, exclusive, listenerCount}
152184
if err := m.receiver.ReceiveCommands(options); err != nil {
153185
return err
154186
}
187+
go func() {
188+
for {
189+
// Wait on multiple channels using the select control flow.
190+
select {
191+
case <-stop:
192+
PackageLogger().Debugf("CommandDispatchManager.Stopping")
193+
closeSignal := make(chan error)
194+
closeChannel <- closeSignal
195+
PackageLogger().Debugf("CommandDispatchManager.Stopped")
196+
<-closeSignal
197+
// Receiving on this channel signifys an error has occured worker processor side
198+
case err := <-errorChannel:
199+
PackageLogger().Debugf("CommandDispatchManager.ErrorReceived: %s", err)
155200

156-
for {
157-
// Wait on multiple channels using the select control flow.
158-
select {
159-
// Command received channel receives a result with a channel to respond to, signifying successful processing of the message.
160-
// This should eventually call a command handler. See cqrs.NewVersionedCommandDispatcher()
161-
case command := <-receiveCommandChannel:
162-
log.Println("CommandDispatchManager.DispatchCommand: ", command.Command)
163-
if err := m.commandDispatcher.DispatchCommand(command.Command); err != nil {
164-
log.Println("Error dispatching command: ", err)
165-
command.ProcessedSuccessfully <- false
166-
} else {
167-
command.ProcessedSuccessfully <- true
168-
log.Println("CommandDispatchManager.DispatchSuccessful")
169201
}
170-
case <-stop:
171-
log.Println("CommandDispatchManager.Stopping")
172-
closeSignal := make(chan error)
173-
closeChannel <- closeSignal
174-
defer log.Println("CommandDispatchManager.Stopped")
175-
return <-closeSignal
176-
// Receiving on this channel signifys an error has occured worker processor side
177-
case err := <-errorChannel:
178-
log.Println("CommandDispatchManager.ErrorReceived: ", err)
179-
return err
180202
}
181-
}
203+
}()
204+
205+
return nil
182206
}

commands_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package cqrs_test
22

33
import (
4-
"log"
54
"testing"
65

76
"github.com/andrewwebber/cqrs"
@@ -15,12 +14,15 @@ func TestCommandDispatcher(t *testing.T) {
1514
dispatcher := cqrs.NewMapBasedCommandDispatcher()
1615
success := false
1716
dispatcher.RegisterCommandHandler(SampleMessageCommand{}, func(command cqrs.Command) error {
18-
log.Println("Received Command : ", command.Body.(SampleMessageCommand).Message)
17+
cqrs.PackageLogger().Debugf("Received Command : ", command.Body.(SampleMessageCommand).Message)
1918
success = true
2019
return nil
2120
})
2221

23-
dispatcher.DispatchCommand(cqrs.Command{Body: SampleMessageCommand{"Hello world"}})
22+
err := dispatcher.DispatchCommand(cqrs.Command{Body: SampleMessageCommand{"Hello world"}})
23+
if err != nil {
24+
t.Fatal(err)
25+
}
2426
if !success {
2527
t.Fatal("Expected success")
2628
}

correlation.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,29 @@ package cqrs
22

33
import (
44
"time"
5-
6-
"github.com/pborman/uuid"
75
)
86

9-
const CQRSErrorEventType = "cqrs.CQRSErrorEvent"
7+
// CQRSErrorEventType ...
8+
const CQRSErrorEventType = "cqrs.ErrorEvent"
109

11-
// CQRSErrorEvent is a generic event raised within the CQRS framework
12-
type CQRSErrorEvent struct {
10+
// ErrorEvent is a generic event raised within the CQRS framework
11+
type ErrorEvent struct {
1312
Message string
1413
}
1514

15+
// DeliverCQRSError will deliver a CQRS error
1616
func DeliverCQRSError(correlationID string, err error, repo EventSourcingRepository) {
17-
repo.GetEventStreamRepository().SaveIntegrationEvent(VersionedEvent{
18-
ID: uuid.New(),
17+
err = repo.GetEventStreamRepository().SaveIntegrationEvent(VersionedEvent{
18+
ID: "ve:" + NewUUIDString(),
1919
CorrelationID: correlationID,
2020
SourceID: "",
2121
Version: 0,
2222
EventType: CQRSErrorEventType,
2323
Created: time.Now(),
24-
Event: CQRSErrorEvent{Message: err.Error()}})
24+
25+
Event: ErrorEvent{Message: err.Error()}})
26+
27+
if err != nil {
28+
PackageLogger().Debugf("ERROR saving integration event: %v\n", err)
29+
}
2530
}

cqrs-testentities_test.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package cqrs_test
33
import (
44
"errors"
55
"fmt"
6-
"log"
6+
"reflect"
77

88
"github.com/andrewwebber/cqrs"
9+
910
"golang.org/x/crypto/bcrypt"
1011
)
1112

@@ -72,6 +73,28 @@ type Account struct {
7273
Balance float64
7374
}
7475

76+
func (account *Account) CopyFrom(source interface{}) {
77+
78+
account.FirstName = reflect.Indirect(reflect.ValueOf(source)).FieldByName("FirstName").Interface().(string)
79+
account.LastName = reflect.Indirect(reflect.ValueOf(source)).FieldByName("LastName").Interface().(string)
80+
account.EmailAddress = reflect.Indirect(reflect.ValueOf(source)).FieldByName("EmailAddress").Interface().(string)
81+
account.PasswordHash = reflect.Indirect(reflect.ValueOf(source)).FieldByName("PasswordHash").Interface().([]byte)
82+
account.Balance = reflect.Indirect(reflect.ValueOf(source)).FieldByName("Balance").Interface().(float64)
83+
84+
/*cqrs.PackageLogger().Debugf("valueOfSource", valueOfSource)
85+
fieldValue := valueOfSource
86+
cqrs.PackageLogger().Debugf("fieldValue", fieldValue)
87+
fieldValueInterface := fieldValue
88+
cqrs.PackageLogger().Debugf("fieldValueInterface", fieldValueInterface)
89+
firstName := fieldValueInterface.(string)
90+
cqrs.PackageLogger().Debugf("firstName", firstName)*/
91+
92+
/*account.LastName = reflect.Indirect(reflect.ValueOf(source).FieldByName("LastName")).Interface().(string)
93+
account.EmailAddress = reflect.Indirect(reflect.ValueOf(source).FieldByName("EmailAddress")).Interface().(string)
94+
account.PasswordHash = reflect.Indirect(reflect.ValueOf(source).FieldByName("PasswordHash")).Interface().([]byte)
95+
account.Balance = reflect.Indirect(reflect.ValueOf(source).FieldByName("FirstName")).Interface().(float64)*/
96+
}
97+
7598
func (account *Account) String() string {
7699
return fmt.Sprintf("Account %s with Email Address %s has balance %f", account.ID(), account.EmailAddress, account.Balance)
77100
}
@@ -89,10 +112,20 @@ func NewAccountFromHistory(id string, repository cqrs.EventSourcingRepository) (
89112
account := new(Account)
90113
account.EventSourceBased = cqrs.NewEventSourceBasedWithID(account, id)
91114

92-
if error := repository.Get(id, account); error != nil {
93-
return account, error
115+
snapshot, err := repository.GetSnapshot(id)
116+
if err == nil {
117+
cqrs.PackageLogger().Debugf("Loaded snapshot: %+v", snapshot)
118+
account.SetVersion(snapshot.Version())
119+
account.CopyFrom(snapshot)
120+
cqrs.PackageLogger().Debugf("Updated account: %+v ", account)
94121
}
95122

123+
if err := repository.Get(id, account); err != nil {
124+
return nil, err
125+
}
126+
127+
cqrs.PackageLogger().Debugf("Loaded account: %+v", account)
128+
96129
return account, nil
97130
}
98131

@@ -134,7 +167,7 @@ func (account *Account) ChangePassword(newPassword string) error {
134167

135168
hashedPassword, err := GetHashForPassword(newPassword)
136169
if err != nil {
137-
panic(err)
170+
return (err)
138171
}
139172

140173
account.Update(PasswordChangedEvent{hashedPassword})
@@ -147,7 +180,7 @@ func GetHashForPassword(password string) ([]byte, error) {
147180
// Hashing the password with the cost of 10
148181
hashedPassword, err := bcrypt.GenerateFromPassword(passwordBytes, 10)
149182
if err != nil {
150-
log.Println("Error getting password hash: ", err)
183+
cqrs.PackageLogger().Debugf("Error getting password hash: ", err)
151184
return nil, err
152185
}
153186

0 commit comments

Comments
 (0)