@@ -30,6 +30,31 @@ This file deals with asynchronous delivery of MQ messages via the MQCTL/MQCB ver
30
30
31
31
extern void MQCALLBACK_Go(MQHCONN, MQMD *, MQGMO *, PMQVOID, MQCBC *);
32
32
extern void MQCALLBACK_C(MQHCONN hc,MQMD *md,MQGMO *gmo,PMQVOID buf,MQCBC *cbc);
33
+
34
+ // These functions deal with stashing the hObj value across callbacks, because
35
+ // the MQ C client will sometimes set hObj=0 for EVENTS (eg qmgr stopping) instead
36
+ // of the registered hObj. That can lead to unexpected callback invocations.
37
+ static void *saveHObj(PMQCBD mqcbd, MQHOBJ hObj) {
38
+ mqcbd->CallbackArea = malloc(sizeof(MQHOBJ));
39
+ if (mqcbd->CallbackArea) {
40
+ memcpy(mqcbd->CallbackArea,&hObj,sizeof(MQHOBJ));
41
+ }
42
+ return mqcbd->CallbackArea;
43
+ }
44
+
45
+ static MQHOBJ getHObj(PMQCBC mqcbc) {
46
+ MQHOBJ ho = 0;
47
+ if (mqcbc->CallbackArea != NULL) {
48
+ memcpy(&ho,mqcbc->CallbackArea,sizeof(MQHOBJ));
49
+ }
50
+ return ho;
51
+ }
52
+
53
+ static void freeHObj(void *p) {
54
+ if (p) {
55
+ free(p);
56
+ }
57
+ }
33
58
*/
34
59
import "C"
35
60
import (
@@ -47,6 +72,7 @@ type MQCB_FUNCTION func(*MQQueueManager, *MQObject, *MQMD, *MQGMO, []byte, *MQCB
47
72
// be passed onwards
48
73
type cbInfo struct {
49
74
hObj * MQObject
75
+ stashedHObj unsafe.Pointer
50
76
callbackFunction MQCB_FUNCTION
51
77
callbackArea interface {}
52
78
connectionArea interface {}
@@ -98,14 +124,28 @@ func MQCALLBACK_Go(hConn C.MQHCONN, mqmd *C.MQMD, mqgmo *C.MQGMO, mqBuffer C.PMQ
98
124
verb : "MQCALLBACK" ,
99
125
}
100
126
101
- key := makeKey (hConn , mqcbc .Hobj )
127
+ // The MQ C client seems to sometimes use 0 as the hObj for EVENTs even for
128
+ // callbacks that should be going to something registered for a specific queue. This
129
+ // is different from using local bindings and is possibly a bug in the underlying MQ library.
130
+ // To try to work round this, the real hObj has been stashed during the REGISTER phase and if it's
131
+ // available via the C context structures, then we try to use that in
132
+ // an attempt to find the appropriate function.
133
+ passedHo := mqcbc .Hobj
134
+ savedHo := C .getHObj (mqcbc )
135
+ logTrace ("HOs in callback are passed: %d stashed: %d" , passedHo , savedHo )
136
+
137
+ if passedHo == C .MQHO_NONE && savedHo != C .MQHO_NONE {
138
+ passedHo = savedHo
139
+ }
140
+
141
+ key := makeKey (hConn , passedHo )
102
142
mapLock ()
103
143
info , ok := cbMap [key ]
104
144
mapUnlock ()
105
145
106
- // The MQ Client libraries sometimes call us with an EVENT that is
107
- // not associated with a particular hObj.
108
- // The way I've chosen is to find the first entry in
146
+ // With the stashed hObj available, we should now be able to find the
147
+ // correct callback routine from the map. But just in case we can't, we will
148
+ // have a fallback position. The way I've chosen is to find the first entry in
109
149
// the map associated with the hConn and call its registered function with
110
150
// a dummy hObj.
111
151
if ! ok {
@@ -197,6 +237,7 @@ func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *M
197
237
if f1 != nil {
198
238
f1 (gogmo .OtelOpts , object .qMgr , object , gogmo , true )
199
239
}
240
+
200
241
copyCBDtoC (& mqcbd , gocbd )
201
242
copyMDtoC (& mqmd , gomd )
202
243
copyGMOtoC (& mqgmo , gogmo )
@@ -207,6 +248,13 @@ func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *M
207
248
// defined here. And that in turn will call the user's callback function
208
249
mqcbd .CallbackFunction = (C .MQPTR )(unsafe .Pointer (C .MQCALLBACK_C ))
209
250
251
+ // See earlier comments about how the hObj is not always passed back. Here's where
252
+ // we stash it via a malloc
253
+ p := C .NULL
254
+ if mqOperation == C .MQOP_REGISTER {
255
+ p = C .saveHObj (& mqcbd , object .hObj )
256
+ }
257
+
210
258
C .MQCB (object .qMgr .hConn , mqOperation , (C .PMQVOID )(unsafe .Pointer (& mqcbd )),
211
259
object .hObj ,
212
260
(C .PMQVOID )(unsafe .Pointer (& mqmd )), (C .PMQVOID )(unsafe .Pointer (& mqgmo )),
@@ -218,6 +266,10 @@ func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *M
218
266
}
219
267
220
268
if mqcc != C .MQCC_OK {
269
+ // Don't leak these 4 bytes when the register failed
270
+ if mqOperation == C .MQOP_REGISTER {
271
+ C .freeHObj (p )
272
+ }
221
273
traceExitErr ("CB" , 3 , & mqreturn )
222
274
return & mqreturn
223
275
}
@@ -226,11 +278,17 @@ func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *M
226
278
switch mqOperation {
227
279
case C .MQOP_DEREGISTER :
228
280
mapLock ()
281
+ // ... and this is where the corresponding free should happen
282
+ info , ok := cbMap [key ]
283
+ if ok {
284
+ C .freeHObj (info .stashedHObj )
285
+ }
229
286
delete (cbMap , key )
230
287
mapUnlock ()
231
288
case C .MQOP_REGISTER :
232
289
// Stash the hObj and real function to be called
233
290
info := & cbInfo {hObj : object ,
291
+ stashedHObj : p ,
234
292
callbackFunction : gocbd .CallbackFunction ,
235
293
connectionArea : nil ,
236
294
callbackArea : gocbd .CallbackArea ,
0 commit comments