3
3
4
4
module Stub where
5
5
6
-
6
+ -- import Data.Int (fromIntegral)
7
7
import Data.Bifunctor
8
8
import Data.ByteString as BS
9
9
import Data.Text
@@ -23,7 +23,6 @@ import Data.Vector as Vector
23
23
, (!)
24
24
)
25
25
import qualified Data.ByteString.Lazy as LBS
26
- import Data.IORef (readIORef , newIORef , modifyIORef )
27
26
import Control.Monad.Except (ExceptT (.. ), runExceptT )
28
27
29
28
import qualified Peer.ChaincodeShim as Pb
@@ -39,6 +38,7 @@ import Interfaces
39
38
import Messages
40
39
import Types
41
40
41
+ import Debug.Trace
42
42
-- NOTE: When support for concurrency transaction is added, this function will no longer be required
43
43
-- as the stub function will block and listen for responses over a channel when the code is concurrent
44
44
listenForResponse :: StreamRecv Pb. ChaincodeMessage -> IO (Either Error ByteString )
@@ -125,47 +125,41 @@ instance ChaincodeStubInterface DefaultChaincodeStub where
125
125
-- TODO: Implement better error handling/checks etc
126
126
-- getStateByRange :: ccs -> Text -> Text -> IO (Either Error StateQueryIterator)
127
127
getStateByRange ccs startKey endKey =
128
- let payload = getStateByRangePayload startKey endKey
129
- message = buildChaincodeMessage GET_STATE_BY_RANGE payload (txId ccs) (channelId ccs)
130
- -- ExceptT is a monad transformer that allows us to compose these by binding over IO Either
131
- bsToSqi :: ByteString -> ExceptT Error IO StateQueryIterator
132
- bsToSqi bs =
133
- let eeaQueryResponse = parse (decodeMessage (FieldNumber 1 )) bs :: Either ParseError Pb. QueryResponse
134
- in
135
- case eeaQueryResponse of
136
- -- TODO: refactor out pattern matching, e.g. using >>= or <*>
137
- Left err -> ExceptT $ pure $ Left $ DecodeError err
138
- Right queryResponse -> ExceptT $ do
139
- -- queryResponse and currentLoc are IORefs as they need to be mutated
140
- -- as a part of the next() function
141
- queryResponseIORef <- newIORef queryResponse
142
- currentLocIORef <- newIORef 0
143
- pure $ Right StateQueryIterator {
144
- sqiChaincodeStub = ccs
145
- , sqiChannelId = getChannelId ccs
146
- , sqiTxId = getTxId ccs
147
- , sqiResponse = queryResponseIORef
148
- , sqiCurrentLoc = currentLocIORef
149
- }
128
+ let payload = getStateByRangePayload startKey endKey Nothing
129
+ message = buildChaincodeMessage GET_STATE_BY_RANGE payload (txId ccs) (channelId ccs)
150
130
in do
151
131
e <- (sendStream ccs) message
152
132
case e of
153
133
Left err -> error (" Error while streaming: " ++ show err)
154
134
Right _ -> pure ()
155
- runExceptT $ ExceptT (listenForResponse (recvStream ccs)) >>= bsToSqi
135
+ runExceptT $ ExceptT (listenForResponse (recvStream ccs)) >>= ( bsToSqi ccs)
156
136
157
137
-- TODO: We need to implement this so we can test the fetchNextQueryResult functionality
158
- -- getStateByRangeWithPagination :: ccs -> String -> String -> Int32 -> String -> Either Error (StateQueryIterator, Pb.QueryResponseMetadata)
159
- getStateByRangeWithPagination ccs startKey endKey pageSize bookmark = pure $ Left $ Error " Not implemented"
138
+ -- getStateByRangeWithPagination :: ccs -> Text -> Text -> Int -> Text -> IO (Either Error (StateQueryIterator, Pb.QueryResponseMetadata))
139
+ getStateByRangeWithPagination ccs startKey endKey pageSize bookmark =
140
+ let metadata = Pb. QueryMetadata {
141
+ Pb. queryMetadataPageSize = fromIntegral pageSize
142
+ , Pb. queryMetadataBookmark = TL. fromStrict bookmark
143
+ }
144
+ payload = (trace " Building getStateByRangeWithPagination payload" ) getStateByRangePayload startKey endKey $ Just metadata
145
+ message = buildChaincodeMessage GET_STATE_BY_RANGE payload (txId ccs) (channelId ccs)
146
+ in do
147
+ e <- (sendStream ccs) message
148
+ case e of
149
+ Left err -> error (" Error while streaming: " ++ show err)
150
+ Right _ -> pure ()
151
+ runExceptT $ ExceptT (listenForResponse (recvStream ccs)) >>= (bsToSqiAndMeta ccs)
152
+
160
153
161
154
-- TODO : implement all these interface functions
162
155
instance StateQueryIteratorInterface StateQueryIterator where
163
156
-- TODO: remove the IO from this function (possibly with the State monad)
164
157
-- hasNext :: sqi -> IO Bool
165
158
hasNext sqi = do
166
159
queryResponse <- readIORef $ sqiResponse sqi
167
- currentLoc <- readIORef $ sqiCurrentLoc sqi
168
- pure $ currentLoc < Prelude. length (Pb. queryResponseResults queryResponse) || (Pb. queryResponseHasMore queryResponse)
160
+ currentLoc <- (trace $ " Query response: " ++ show queryResponse) readIORef $ sqiCurrentLoc sqi
161
+ pure $ (currentLoc < Prelude. length (Pb. queryResponseResults queryResponse))
162
+ || (Pb. queryResponseHasMore queryResponse)
169
163
-- close :: sqi -> IO (Maybe Error)
170
164
close _ = pure Nothing
171
165
-- next :: sqi -> IO (Either Error Pb.KV)
@@ -176,6 +170,53 @@ instance StateQueryIteratorInterface StateQueryIterator where
176
170
Right queryResultBytes -> pure $ first DecodeError (parse (decodeMessage (FieldNumber 1 )) (Pb. queryResultBytesResultBytes queryResultBytes) :: Either ParseError Pb. KV )
177
171
178
172
173
+ -- ExceptT is a monad transformer that allows us to compose these by binding over IO Either
174
+ bsToSqi :: DefaultChaincodeStub -> ByteString -> ExceptT Error IO StateQueryIterator
175
+ bsToSqi ccs bs =
176
+ let eeaQueryResponse = parse (decodeMessage (FieldNumber 1 )) bs :: Either ParseError Pb. QueryResponse
177
+ in
178
+ case eeaQueryResponse of
179
+ -- TODO: refactor out pattern matching, e.g. using >>= or <*>
180
+ Left err -> ExceptT $ pure $ Left $ DecodeError err
181
+ Right queryResponse -> ExceptT $ do
182
+ -- queryResponse and currentLoc are IORefs as they need to be mutated
183
+ -- as a part of the next() function
184
+ queryResponseIORef <- newIORef queryResponse
185
+ currentLocIORef <- newIORef 0
186
+ pure $ Right StateQueryIterator {
187
+ sqiChaincodeStub = ccs
188
+ , sqiChannelId = getChannelId ccs
189
+ , sqiTxId = getTxId ccs
190
+ , sqiResponse = queryResponseIORef
191
+ , sqiCurrentLoc = currentLocIORef
192
+ }
193
+
194
+ -- ExceptT is a monad transformer that allows us to compose these by binding over IO Either
195
+ bsToSqiAndMeta :: DefaultChaincodeStub -> ByteString -> ExceptT Error IO (StateQueryIterator , Pb. QueryResponseMetadata )
196
+ bsToSqiAndMeta ccs bs =
197
+ let eeaQueryResponse = parse (decodeMessage (FieldNumber 1 )) bs :: Either ParseError Pb. QueryResponse
198
+ in
199
+ case eeaQueryResponse of
200
+ -- TODO: refactor out pattern matching, e.g. using >>= or <*>
201
+ Left err -> ExceptT $ pure $ Left $ DecodeError err
202
+ Right queryResponse ->
203
+ let eeMetadata = parse (decodeMessage (FieldNumber 1 )) (Pb. queryResponseMetadata queryResponse) :: Either ParseError Pb. QueryResponseMetadata
204
+ in
205
+ case eeMetadata of
206
+ Left err -> ExceptT $ pure $ Left $ DecodeError err
207
+ Right metadata -> (trace $ " Metadata from bsToSqiAndMeta: " ++ show metadata) ExceptT $ do
208
+ -- queryResponse and currentLoc are IORefs as they need to be mutated
209
+ -- as a part of the next() function
210
+ queryResponseIORef <- newIORef queryResponse
211
+ currentLocIORef <- newIORef 0
212
+ pure $ Right (StateQueryIterator {
213
+ sqiChaincodeStub = ccs
214
+ , sqiChannelId = getChannelId ccs
215
+ , sqiTxId = getTxId ccs
216
+ , sqiResponse = queryResponseIORef
217
+ , sqiCurrentLoc = currentLocIORef
218
+ }, metadata)
219
+
179
220
nextResult :: StateQueryIterator -> IO (Either Error Pb. QueryResultBytes )
180
221
nextResult sqi = do
181
222
currentLoc <- readIORef $ sqiCurrentLoc sqi
@@ -187,10 +228,10 @@ nextResult sqi = do
187
228
modifyIORef (sqiCurrentLoc sqi) (+ 1 )
188
229
if ((currentLoc + 1 ) == Prelude. length (Pb. queryResponseResults $ queryResponse)) then
189
230
do
190
- fetchNextQueryResult sqi
231
+ (trace " Fetching next query result from the peer " ) fetchNextQueryResult sqi
191
232
queryResult
192
233
else
193
- queryResult
234
+ (trace " Returning local query result " ) queryResult
194
235
else pure $ Left $ Error " Invalid iterator state"
195
236
196
237
0 commit comments