Skip to content

Commit 82b82c8

Browse files
authored
Merge pull request #401 from basho/mas-i1788-reconcileapi
Mas i1788 reconcileapi
2 parents ea4fcd0 + da907c6 commit 82b82c8

File tree

2 files changed

+78
-6
lines changed

2 files changed

+78
-6
lines changed

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
]}.
1818

1919
{deps, [
20-
{riak_pb, {git, "https://github.com/basho/riak_pb", {tag, "3.0.4"}}}
20+
{riak_pb, {git, "https://github.com/basho/riak_pb", {branch, "develop-3.0"}}}
2121
]}.
2222

2323
{edoc_opts, [

src/riakc_pb_socket.erl

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
set_client_id/2, set_client_id/3,
5050
get_server_info/1, get_server_info/2,
5151
get/3, get/4, get/5,
52-
fetch/2,
52+
fetch/2, fetch/3, push/3,
5353
put/2, put/3, put/4,
5454
delete/3, delete/4, delete/5,
5555
delete_vclock/4, delete_vclock/5, delete_vclock/6,
@@ -338,14 +338,45 @@ get(Pid, Bucket, Key, Options, Timeout) ->
338338
Req = get_options(Options, #rpbgetreq{type =T, bucket = B, key = Key}),
339339
call_infinity(Pid, {req, Req, Timeout}).
340340

341+
341342
%% @doc Fetch replicated objects from a queue
342343
-spec fetch(pid(), binary()) ->
343344
{ok, queue_empty}|
344-
{ok|crc_wonky, {deleted, term(), binary()}|binary()}.
345+
{ok|crc_wonky,
346+
{deleted, term(), binary()}|binary()}.
345347
fetch(Pid, QueueName) ->
346-
Req = #rpbfetchreq{queuename = QueueName},
348+
fetch(Pid, QueueName, internal).
349+
350+
%% @doc Fetch with specific format may also return segment ID and hash
351+
-spec fetch(pid(), binary(), internal|internal_aaehash) ->
352+
{ok, queue_empty}|
353+
{ok|crc_wonky,
354+
{deleted, term(), binary()}|
355+
binary()|
356+
{deleted, term(), binary(), non_neg_integer(), non_neg_integer()}|
357+
{binary(), non_neg_integer(), non_neg_integer()}}.
358+
fetch(Pid, QueueName, ObjectFormat)
359+
when ObjectFormat =:= internal; ObjectFormat =:= internal_aaehash ->
360+
Req = #rpbfetchreq{queuename = QueueName,
361+
encoding = erlang:atom_to_binary(ObjectFormat, utf8)},
347362
call_infinity(Pid, {req, Req, default_timeout(get_timeout)}).
348363

364+
%% @doc Push to nextgenrepl replication queue a list of Buckets/Keys/Clocks
365+
%% where the push will occur if the object is fetched an is currently at that
366+
%% clock
367+
-spec push(pid(),
368+
binary(),
369+
[{riakc_obj:bucket(), riakc_obj:key(), riakc_obj:vclock()}]) ->
370+
{error, term()}|{ok, iolist()}.
371+
push(Pid, QueueName, BucketKeyClockList) ->
372+
KeysValue = lists:map(fun make_keyvalue/1, BucketKeyClockList),
373+
Req = #rpbpushreq{queuename = QueueName, keys_value = KeysValue},
374+
call_infinity(Pid, {req, Req, default_timeout(get_timeout)}).
375+
376+
make_keyvalue({{T, B}, K, C}) ->
377+
#rpbkeysvalue{type = T, bucket = B, key = K, value = C};
378+
make_keyvalue({B, K, C}) ->
379+
#rpbkeysvalue{bucket = B, key = K, value = C}.
349380

350381
%% @doc Put the metadata/value in the object under bucket/key
351382
%% @equiv put(Pid, Obj, [])
@@ -2382,14 +2413,55 @@ process_response(#request{msg = #rpbfetchreq{}},
23822413
#rpbfetchresp{deleted = true,
23832414
crc_check = CRC,
23842415
replencoded_object = ObjBin,
2385-
deleted_vclock = VclockBin}, State) ->
2416+
deleted_vclock = VclockBin,
2417+
segment_id = undefined,
2418+
segment_hash = undefined}, State) ->
23862419
{reply,
23872420
{crc_check(CRC,ObjBin), {deleted, VclockBin, ObjBin}},
23882421
State};
2422+
process_response(#request{msg = #rpbfetchreq{}},
2423+
#rpbfetchresp{deleted = true,
2424+
crc_check = CRC,
2425+
replencoded_object = ObjBin,
2426+
deleted_vclock = VclockBin,
2427+
segment_id = SegID,
2428+
segment_hash = SegHash}, State) ->
2429+
{reply,
2430+
{crc_check(CRC,ObjBin), {deleted, VclockBin, ObjBin, SegID, SegHash}},
2431+
State};
23892432
process_response(#request{msg = #rpbfetchreq{}},
23902433
#rpbfetchresp{crc_check = CRC,
2391-
replencoded_object = ObjBin}, State) ->
2434+
replencoded_object = ObjBin,
2435+
segment_id = undefined,
2436+
segment_hash = undefined}, State) ->
23922437
{reply, {crc_check(CRC,ObjBin), ObjBin}, State};
2438+
process_response(#request{msg = #rpbfetchreq{}},
2439+
#rpbfetchresp{crc_check = CRC,
2440+
replencoded_object = ObjBin,
2441+
segment_id = SegID,
2442+
segment_hash = SegHash}, State) ->
2443+
{reply, {crc_check(CRC,ObjBin), {ObjBin, SegID, SegHash}}, State};
2444+
2445+
%% rpbpushreq
2446+
process_response(#request{msg = #rpbpushreq{queuename = Q}},
2447+
#rpbpushresp{queue_exists = true,
2448+
queuename = Q,
2449+
foldq_length = FL,
2450+
fsync_length = FSL,
2451+
realt_length = RTL}, State) ->
2452+
{reply,
2453+
{ok,
2454+
iolist_to_binary(
2455+
io_lib:format("Queue ~s: ~w ~w ~w", [Q, FL, FSL, RTL]))},
2456+
State};
2457+
process_response(#request{msg = #rpbpushreq{queuename = Q}},
2458+
#rpbpushresp{queue_exists = false}, State) ->
2459+
{reply,
2460+
{ok,
2461+
iolist_to_binary(io_lib:format("No queue ~s", [Q]))},
2462+
State};
2463+
2464+
23932465

23942466
%% rpbputreq
23952467
process_response(#request{msg = #rpbputreq{}},

0 commit comments

Comments
 (0)