Skip to content

Commit c982eeb

Browse files
committed
adjust the maintainer manager handle message
1 parent d31429c commit c982eeb

File tree

1 file changed

+19
-41
lines changed

1 file changed

+19
-41
lines changed

maintainer/maintainer_manager.go

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ type Manager struct {
5656
pdClock pdutil.Clock
5757
regionCache *tikv.RegionCache
5858

59-
// msgCh is used to cache messages from coordinator
60-
msgCh chan *messaging.TargetMessage
61-
6259
taskScheduler threadpool.ThreadPool
6360
}
6461

@@ -95,16 +92,11 @@ func NewMaintainerManager(selfNode *node.Info,
9592
// recvMessages is the message handler for maintainer manager
9693
func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
9794
switch msg.Type {
98-
// Coordinator related messages
99-
case messaging.TypeAddMaintainerRequest,
100-
messaging.TypeRemoveMaintainerRequest,
101-
messaging.TypeCoordinatorBootstrapRequest:
102-
select {
103-
case <-ctx.Done():
104-
return ctx.Err()
105-
case m.msgCh <- msg:
106-
}
107-
return nil
95+
case messaging.TypeCoordinatorBootstrapRequest:
96+
log.Info("received coordinator bootstrap request", zap.String("from", msg.From.String()))
97+
m.onCoordinatorBootstrapRequest(msg)
98+
case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest:
99+
m.onDispatchMaintainerRequest(msg)
108100
// receive bootstrap response message from the dispatcher manager
109101
case messaging.TypeMaintainerBootstrapResponse:
110102
req := msg.Message[0].(*heartbeatpb.MaintainerBootstrapResponse)
@@ -140,8 +132,6 @@ func (m *Manager) Run(ctx context.Context) error {
140132
select {
141133
case <-ctx.Done():
142134
return ctx.Err()
143-
case msg := <-m.msgCh:
144-
m.handleMessage(msg)
145135
case <-ticker.C:
146136
// 1. try to send heartbeat to coordinator
147137
m.sendHeartbeat()
@@ -274,24 +264,33 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart
274264

275265
func (m *Manager) onDispatchMaintainerRequest(
276266
msg *messaging.TargetMessage,
277-
) *heartbeatpb.MaintainerStatus {
267+
) {
268+
if !m.isBootstrap() {
269+
return
270+
}
278271
if m.coordinatorID != msg.From {
279272
log.Warn("ignore invalid coordinator id",
280273
zap.Any("request", msg),
281274
zap.Any("coordinatorID", m.coordinatorID),
282275
zap.Stringer("from", msg.From))
283-
return nil
284276
}
277+
var status *heartbeatpb.MaintainerStatus
285278
switch msg.Type {
286279
case messaging.TypeAddMaintainerRequest:
287280
req := msg.Message[0].(*heartbeatpb.AddMaintainerRequest)
288-
return m.onAddMaintainerRequest(req)
281+
status = m.onAddMaintainerRequest(req)
289282
case messaging.TypeRemoveMaintainerRequest:
290-
return m.onRemoveMaintainerRequest(msg)
283+
status = m.onRemoveMaintainerRequest(msg)
291284
default:
292285
log.Warn("unknown message type", zap.Any("message", msg.Message))
293286
}
294-
return nil
287+
if status == nil {
288+
return
289+
}
290+
response := &heartbeatpb.MaintainerHeartbeat{
291+
Statuses: []*heartbeatpb.MaintainerStatus{status},
292+
}
293+
m.sendMessages(response)
295294
}
296295

297296
func (m *Manager) sendHeartbeat() {
@@ -314,27 +313,6 @@ func (m *Manager) sendHeartbeat() {
314313
}
315314
}
316315

317-
func (m *Manager) handleMessage(msg *messaging.TargetMessage) {
318-
switch msg.Type {
319-
case messaging.TypeCoordinatorBootstrapRequest:
320-
log.Info("received coordinator bootstrap request", zap.String("from", msg.From.String()))
321-
m.onCoordinatorBootstrapRequest(msg)
322-
case messaging.TypeAddMaintainerRequest,
323-
messaging.TypeRemoveMaintainerRequest:
324-
if m.isBootstrap() {
325-
status := m.onDispatchMaintainerRequest(msg)
326-
if status == nil {
327-
return
328-
}
329-
response := &heartbeatpb.MaintainerHeartbeat{
330-
Statuses: []*heartbeatpb.MaintainerStatus{status},
331-
}
332-
m.sendMessages(response)
333-
}
334-
default:
335-
}
336-
}
337-
338316
func (m *Manager) dispatcherMaintainerMessage(
339317
ctx context.Context, changefeed common.ChangeFeedID, msg *messaging.TargetMessage,
340318
) error {

0 commit comments

Comments
 (0)