diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 1744ea4af..43bf25f94 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -818,126 +818,6 @@ public void complete() { return rv; } - /** - * Generic pipelined insert operation for collection items. - * Public methods for collection items call this method. - * - * @param key collection item's key - * @param insert operation parameters (values, attributes, and so on) - * @return future holding the success/failure codes of individual operations and their index - */ - CollectionFuture> asyncCollectionPipedInsert( - final String key, final CollectionPipedInsert insert) { - - if (insert.getItemCount() == 0) { - throw new IllegalArgumentException( - "The number of piped operations must be larger than 0."); - } - if (insert.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - throw new IllegalArgumentException( - "The number of piped operations must not exceed a maximum of " - + CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + "."); - } - - final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = - new CollectionFuture>(latch, operationTimeout); - - Operation op = opFact.collectionPipedInsert(key, insert, - new CollectionPipedInsertOperation.Callback() { - private final Map result = - new TreeMap(); - - public void receivedStatus(OperationStatus status) { - CollectionOperationStatus cstatus; - - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { - getLogger().warn("Unhandled state: " + status); - cstatus = new CollectionOperationStatus(status); - } - rv.set(result, cstatus); - } - - public void complete() { - latch.countDown(); - } - - public void gotStatus(Integer index, OperationStatus status) { - if (status instanceof CollectionOperationStatus) { - result.put(index, (CollectionOperationStatus) status); - } else { - result.put(index, new CollectionOperationStatus(status)); - } - } - }); - - rv.setOperation(op); - addOp(key, op); - return rv; - } - - /** - * Generic pipelined update operation for collection items. - * Public methods for collection items call this method. - * - * @param key collection item's key - * @param update operation parameters (values and so on) - * @return future holding the success/failure codes of individual operations and their index - */ - CollectionFuture> asyncCollectionPipedUpdate( - final String key, final CollectionPipedUpdate update) { - - if (update.getItemCount() == 0) { - throw new IllegalArgumentException( - "The number of piped operations must be larger than 0."); - } - if (update.getItemCount() > CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) { - throw new IllegalArgumentException( - "The number of piped operations must not exceed a maximum of " - + CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT + "."); - } - - final CountDownLatch latch = new CountDownLatch(1); - final CollectionFuture> rv = - new CollectionFuture>(latch, operationTimeout); - - Operation op = opFact.collectionPipedUpdate(key, update, - new CollectionPipedUpdateOperation.Callback() { - private final Map result = - new TreeMap(); - - public void receivedStatus(OperationStatus status) { - CollectionOperationStatus cstatus; - - if (status instanceof CollectionOperationStatus) { - cstatus = (CollectionOperationStatus) status; - } else { - getLogger().warn("Unhandled state: " + status); - cstatus = new CollectionOperationStatus(status); - } - rv.set(result, cstatus); - } - - public void complete() { - latch.countDown(); - } - - public void gotStatus(Integer index, OperationStatus status) { - if (status instanceof CollectionOperationStatus) { - result.put(index, (CollectionOperationStatus) status); - } else { - result.put(index, new CollectionOperationStatus(status)); - } - } - }); - - rv.setOperation(op); - addOp(key, op); - return rv; - } - /** * Generic pipelined update operation for collection items. * Public methods for collection items call this method. @@ -1875,6 +1755,13 @@ public CollectionFuture> asyncBopPipedIn return asyncBopPipedInsertBulk(key, elements, attributesForCreate, collectionTranscoder); } + @Override + public CollectionFuture> asyncBopPipedInsertBulk( + String key, List> elements, + CollectionAttributes attributesForCreate) { + return asyncBopPipedInsertBulk(key, elements, attributesForCreate, collectionTranscoder); + } + @Override public CollectionFuture> asyncMopPipedInsertBulk( String key, Map elements, @@ -1900,85 +1787,124 @@ public CollectionFuture> asyncSopPipedIn public CollectionFuture> asyncBopPipedInsertBulk( String key, Map elements, CollectionAttributes attributesForCreate, Transcoder tc) { + + if (elements.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> insertList = new ArrayList>(); + if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - BTreePipedInsert insert = new BTreePipedInsert(key, elements, attributesForCreate, tc); - return asyncCollectionPipedInsert(key, insert); + insertList.add(new BTreePipedInsert(key, elements, attributesForCreate, tc)); } else { - List> insertList = new ArrayList>(); - PartitionedMap list = new PartitionedMap( elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); + for (Map elementMap : list) { + insertList.add(new BTreePipedInsert(key, elementMap, attributesForCreate, tc)); + } + } + return asyncCollectionPipedInsert(key, insertList); + } + + @Override + public CollectionFuture> asyncBopPipedInsertBulk( + String key, List> elements, + CollectionAttributes attributesForCreate, Transcoder tc) { + + if (elements.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> insertList = new ArrayList>(); - for (int i = 0; i < list.size(); i++) { - insertList.add(new BTreePipedInsert(key, list.get(i), attributesForCreate, tc)); + if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { + insertList.add(new ByteArraysBTreePipedInsert(key, elements, attributesForCreate, tc)); + } else { + PartitionedList> list = new PartitionedList>( + elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); + for (List> elementList : list) { + insertList.add(new ByteArraysBTreePipedInsert(key, elementList, attributesForCreate, tc)); } - return asyncCollectionPipedInsert(key, insertList); } + return asyncCollectionPipedInsert(key, insertList); } @Override public CollectionFuture> asyncMopPipedInsertBulk( String key, Map elements, CollectionAttributes attributesForCreate, Transcoder tc) { + + if (elements.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + for (Map.Entry checkMKey : elements.entrySet()) { validateMKey(checkMKey.getKey()); } + + List> insertList = new ArrayList>(); + if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - MapPipedInsert insert = new MapPipedInsert(key, elements, attributesForCreate, tc); - return asyncCollectionPipedInsert(key, insert); + insertList.add(new MapPipedInsert(key, elements, attributesForCreate, tc)); } else { - List> insertList = new ArrayList>(); PartitionedMap list = new PartitionedMap( elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); - - for (int i = 0; i < list.size(); i++) { - insertList.add(new MapPipedInsert(key, list.get(i), attributesForCreate, tc)); + for (Map elementMap : list) { + insertList.add(new MapPipedInsert(key, elementMap, attributesForCreate, tc)); } - return asyncCollectionPipedInsert(key, insertList); } + return asyncCollectionPipedInsert(key, insertList); } @Override public CollectionFuture> asyncLopPipedInsertBulk( String key, int index, List valueList, CollectionAttributes attributesForCreate, Transcoder tc) { + + if (valueList.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> insertList = new ArrayList>(); + if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - ListPipedInsert insert = new ListPipedInsert(key, index, valueList, attributesForCreate, tc); - return asyncCollectionPipedInsert(key, insert); + insertList.add(new ListPipedInsert(key, index, valueList, attributesForCreate, tc)); } else { PartitionedList list = new PartitionedList(valueList, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); - - List> insertList = new ArrayList>( - list.size()); - - for (int i = 0; i < list.size(); i++) { - insertList.add(new ListPipedInsert(key, index, list.get(i), attributesForCreate, tc)); + for (List elementList : list) { + insertList.add(new ListPipedInsert(key, index, elementList, attributesForCreate, tc)); } - return asyncCollectionPipedInsert(key, insertList); } + return asyncCollectionPipedInsert(key, insertList); } @Override public CollectionFuture> asyncSopPipedInsertBulk( String key, List valueList, CollectionAttributes attributesForCreate, Transcoder tc) { + + if (valueList.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> insertList = new ArrayList>(); + if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - SetPipedInsert insert = new SetPipedInsert(key, valueList, attributesForCreate, tc); - return asyncCollectionPipedInsert(key, insert); + insertList.add(new SetPipedInsert(key, valueList, attributesForCreate, tc)); } else { PartitionedList list = new PartitionedList(valueList, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); - - List> insertList = new ArrayList>( - list.size()); - - for (int i = 0; i < list.size(); i++) { - insertList.add(new SetPipedInsert(key, list.get(i), attributesForCreate, tc)); + for (List elementList : list) { + insertList.add(new SetPipedInsert(key, elementList, attributesForCreate, tc)); } - - return asyncCollectionPipedInsert(key, insertList); } + return asyncCollectionPipedInsert(key, insertList); } @Override @@ -2876,23 +2802,23 @@ public CollectionFuture> asyncBopPipedUp public CollectionFuture> asyncBopPipedUpdateBulk( String key, List> elements, Transcoder tc) { + if (elements.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + + List> updateList = new ArrayList>(); + if (elements.size() <= CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) { - CollectionPipedUpdate collectionPipedUpdate = new BTreePipedUpdate( - key, elements, tc); - return asyncCollectionPipedUpdate(key, collectionPipedUpdate); + updateList.add(new BTreePipedUpdate(key, elements, tc)); } else { PartitionedList> list = new PartitionedList>( elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT); - - List> collectionPipedUpdateList = - new ArrayList>(list.size()); - - for (int i = 0; i < list.size(); i++) { - collectionPipedUpdateList.add(new BTreePipedUpdate(key, list.get(i), tc)); + for (List> elementList : list) { + updateList.add(new BTreePipedUpdate(key, elementList, tc)); } - - return asyncCollectionPipedUpdate(key, collectionPipedUpdateList); } + return asyncCollectionPipedUpdate(key, updateList); } @Override @@ -2905,26 +2831,28 @@ public CollectionFuture> asyncMopPipedUp public CollectionFuture> asyncMopPipedUpdateBulk( String key, Map elements, Transcoder tc) { + if (elements.isEmpty()) { + throw new IllegalArgumentException( + "The number of piped operations must be larger than 0."); + } + for (Map.Entry checkMKey : elements.entrySet()) { validateMKey(checkMKey.getKey()); } - if (elements.size() <= CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) { - CollectionPipedUpdate collectionPipedUpdate = new MapPipedUpdate( - key, elements, tc); - return asyncCollectionPipedUpdate(key, collectionPipedUpdate); + + List> updateList = new ArrayList>(); + + if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { + updateList.add(new MapPipedUpdate(key, elements, tc)); } else { PartitionedMap list = new PartitionedMap( elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT); - List> collectionPipedUpdateList = - new ArrayList>(list.size()); - - for (int i = 0; i < list.size(); i++) { - collectionPipedUpdateList.add(new MapPipedUpdate(key, list.get(i), tc)); + for (Map elementMap : list) { + updateList.add(new MapPipedUpdate(key, elementMap, tc)); } - - return asyncCollectionPipedUpdate(key, collectionPipedUpdateList); } + return asyncCollectionPipedUpdate(key, updateList); } @Override @@ -3718,35 +3646,6 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } - @Override - public CollectionFuture> asyncBopPipedInsertBulk( - String key, List> elements, - CollectionAttributes attributesForCreate) { - return asyncBopPipedInsertBulk(key, elements, attributesForCreate, collectionTranscoder); - } - - @Override - public CollectionFuture> asyncBopPipedInsertBulk( - String key, List> elements, - CollectionAttributes attributesForCreate, Transcoder tc) { - if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { - CollectionPipedInsert insert = new ByteArraysBTreePipedInsert(key, elements, attributesForCreate, tc); - return asyncCollectionPipedInsert(key, insert); - } else { - PartitionedList> list = new PartitionedList>( - elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); - - List> insertList = new ArrayList>( - list.size()); - - for (int i = 0; i < list.size(); i++) { - insertList.add(new ByteArraysBTreePipedInsert(key, list.get(i), attributesForCreate, tc)); - } - - return asyncCollectionPipedInsert(key, insertList); - } - } - @Override public SMGetFuture>> asyncBopSortMergeGet( List keyList, byte[] from, byte[] to,