Skip to content

Commit 3d23f0b

Browse files
authored
Update ws_client.go
1 parent c34a79e commit 3d23f0b

File tree

1 file changed

+30
-49
lines changed

1 file changed

+30
-49
lines changed

qosapi/ws_client.go

Lines changed: 30 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package qosapi
33
import (
44
"encoding/json"
55
"errors"
6-
"fmt"
76
"log"
87
"net/url"
98
"sync"
@@ -99,6 +98,10 @@ func (c *WSClient) readLoop() {
9998
continue
10099
}
101100

101+
if baseResp.Type == "" {
102+
baseResp.Type = baseResp.TP
103+
}
104+
102105
// 处理订阅数据推送
103106
switch baseResp.Type {
104107
case "S":
@@ -107,36 +110,43 @@ func (c *WSClient) readLoop() {
107110
log.Printf("Failed to unmarshal snapshot: %v", err)
108111
continue
109112
}
110-
if cb, ok := c.subscribers["S_"+snapshot.Code]; ok {
111-
cb(snapshot)
113+
if cb, ok := c.subscribers["S"]; ok {
114+
if len(snapshot.Code) > 0 {
115+
cb(snapshot)
116+
}
112117
}
113118
case "T":
114119
var trade WSTrade
115120
if err := json.Unmarshal(message, &trade); err != nil {
116121
log.Printf("Failed to unmarshal trade: %v", err)
117122
continue
118123
}
119-
if cb, ok := c.subscribers["T_"+trade.Code]; ok {
120-
cb(trade)
124+
if cb, ok := c.subscribers["T"]; ok {
125+
if len(trade.Code) > 0 {
126+
cb(trade)
127+
}
121128
}
122129
case "D":
123130
var depth WSDepth
124131
if err := json.Unmarshal(message, &depth); err != nil {
125132
log.Printf("Failed to unmarshal depth: %v", err)
126133
continue
127134
}
128-
if cb, ok := c.subscribers["D_"+depth.Code]; ok {
129-
cb(depth)
135+
if cb, ok := c.subscribers["D"]; ok {
136+
if len(depth.Code) > 0 {
137+
cb(depth)
138+
}
130139
}
131140
case "K":
132141
var kline WSKLine
133142
if err := json.Unmarshal(message, &kline); err != nil {
134143
log.Printf("Failed to unmarshal kline: %v", err)
135144
continue
136145
}
137-
key := fmt.Sprintf("K_%d_%s", kline.KLineType, kline.Code)
138-
if cb, ok := c.subscribers[key]; ok {
139-
cb(kline)
146+
if cb, ok := c.subscribers["K"]; ok {
147+
if len(kline.Code) > 0 {
148+
cb(kline)
149+
}
140150
}
141151
default:
142152
// 处理请求响应
@@ -148,7 +158,7 @@ func (c *WSClient) readLoop() {
148158
if baseResp.Msg != "OK" {
149159
cb(nil, errors.New(baseResp.Msg))
150160
} else {
151-
cb(baseResp.Data, nil)
161+
cb(baseResp, nil)
152162
}
153163
} else {
154164
c.mu.Unlock()
@@ -180,10 +190,8 @@ func (c *WSClient) sendRequest(req WSRequest, callback func(interface{}, error))
180190
// SubscribeSnapshot 订阅实时快照
181191
func (c *WSClient) SubscribeSnapshot(codes []string, callback func(WSSnapshot)) error {
182192
key := "S"
183-
for _, code := range codes {
184-
c.subscribers[key+"_"+code] = func(data interface{}) {
185-
callback(data.(WSSnapshot))
186-
}
193+
c.subscribers[key] = func(data interface{}) {
194+
callback(data.(WSSnapshot))
187195
}
188196

189197
return c.sendRequest(WSRequest{
@@ -194,11 +202,6 @@ func (c *WSClient) SubscribeSnapshot(codes []string, callback func(WSSnapshot))
194202

195203
// UnsubscribeSnapshot 取消订阅实时快照
196204
func (c *WSClient) UnsubscribeSnapshot(codes []string) error {
197-
key := "S"
198-
for _, code := range codes {
199-
delete(c.subscribers, key+"_"+code)
200-
}
201-
202205
return c.sendRequest(WSRequest{
203206
Type: "SC",
204207
Codes: codes,
@@ -208,10 +211,8 @@ func (c *WSClient) UnsubscribeSnapshot(codes []string) error {
208211
// SubscribeTrade 订阅实时逐笔成交
209212
func (c *WSClient) SubscribeTrade(codes []string, callback func(WSTrade)) error {
210213
key := "T"
211-
for _, code := range codes {
212-
c.subscribers[key+"_"+code] = func(data interface{}) {
213-
callback(data.(WSTrade))
214-
}
214+
c.subscribers[key] = func(data interface{}) {
215+
callback(data.(WSTrade))
215216
}
216217

217218
return c.sendRequest(WSRequest{
@@ -222,11 +223,6 @@ func (c *WSClient) SubscribeTrade(codes []string, callback func(WSTrade)) error
222223

223224
// UnsubscribeTrade 取消订阅实时逐笔成交
224225
func (c *WSClient) UnsubscribeTrade(codes []string) error {
225-
key := "T"
226-
for _, code := range codes {
227-
delete(c.subscribers, key+"_"+code)
228-
}
229-
230226
return c.sendRequest(WSRequest{
231227
Type: "TC",
232228
Codes: codes,
@@ -236,10 +232,8 @@ func (c *WSClient) UnsubscribeTrade(codes []string) error {
236232
// SubscribeDepth 订阅实时盘口
237233
func (c *WSClient) SubscribeDepth(codes []string, callback func(WSDepth)) error {
238234
key := "D"
239-
for _, code := range codes {
240-
c.subscribers[key+"_"+code] = func(data interface{}) {
241-
callback(data.(WSDepth))
242-
}
235+
c.subscribers[key] = func(data interface{}) {
236+
callback(data.(WSDepth))
243237
}
244238

245239
return c.sendRequest(WSRequest{
@@ -250,11 +244,6 @@ func (c *WSClient) SubscribeDepth(codes []string, callback func(WSDepth)) error
250244

251245
// UnsubscribeDepth 取消订阅实时盘口
252246
func (c *WSClient) UnsubscribeDepth(codes []string) error {
253-
key := "D"
254-
for _, code := range codes {
255-
delete(c.subscribers, key+"_"+code)
256-
}
257-
258247
return c.sendRequest(WSRequest{
259248
Type: "DC",
260249
Codes: codes,
@@ -263,12 +252,9 @@ func (c *WSClient) UnsubscribeDepth(codes []string) error {
263252

264253
// SubscribeKLine 订阅实时K线
265254
func (c *WSClient) SubscribeKLine(codes []string, klineType int, callback func(WSKLine)) error {
266-
key := fmt.Sprintf("K_%d", klineType)
267-
for _, code := range codes {
268-
fullKey := key + "_" + code
269-
c.subscribers[fullKey] = func(data interface{}) {
270-
callback(data.(WSKLine))
271-
}
255+
key := "K"
256+
c.subscribers[key] = func(data interface{}) {
257+
callback(data.(WSKLine))
272258
}
273259

274260
return c.sendRequest(WSRequest{
@@ -280,11 +266,6 @@ func (c *WSClient) SubscribeKLine(codes []string, klineType int, callback func(W
280266

281267
// UnsubscribeKLine 取消订阅实时K线
282268
func (c *WSClient) UnsubscribeKLine(codes []string, klineType int) error {
283-
key := fmt.Sprintf("K_%d", klineType)
284-
for _, code := range codes {
285-
delete(c.subscribers, key+"_"+code)
286-
}
287-
288269
return c.sendRequest(WSRequest{
289270
Type: "KC",
290271
Codes: codes,

0 commit comments

Comments
 (0)