diff --git a/docs/user-guide/12-replica-hot-key.md b/docs/user-guide/12-replica-hot-key.md new file mode 100644 index 000000000..a3df54fda --- /dev/null +++ b/docs/user-guide/12-replica-hot-key.md @@ -0,0 +1,83 @@ +# 12. Hot Key Replication 기능 + +## 개요 +분산 캐시 환경에서 특정 Key에 대한 요청이 급증하는 경우, Key가 저장된 특정 캐시 노드에만 부하가 집중될 수 있다. +이로 인해 해당 노드의 응답 속도가 저하되거나 장애가 발생하면 전체 서비스에 영향을 줄 수 있다. + +ARCUS Java Client는 이러한 문제를 해결하기 위해 Hot Key Replication(복제) 기능을 제공한다. +이 기능은 클라이언트 레벨에서 하나의 아이템을 여러 개의 복제본으로 생성하여 가능한 한 서로 다른 캐시 노드에 분산 저장하고, +조회 시에는 이 중 하나를 임의로 선택하여 읽어옴으로써 읽기 부하를 효과적으로 분산시킨다. + +## 동작 원리 +Hot Key Replication은 ARCUS의 표준 분산 방식인 Consistent Hashing을 그대로 활용한다. + +1. 저장 (Set): 사용자가 요청한 replicaCount 만큼 복제 키를 생성한다. + - 생성 규칙: 원본키 + # + 인덱스 (예: `hot:item#0`, `hot:item#1`, `hot:item#2`) + - 생성된 복제 키들은 해시 알고리즘에 의해 캐시 클러스터 내 여러 노드로 분산되어 저장된다. + +2. 조회 (Get): 사용자가 조회 요청을 하면, 클라이언트는 0 ~ replicaCount-1 사이의 인덱스 중 하나를 무작위로 선택하여 복제 키를 생성하고 조회한다. +이를 통해 읽기 요청이 여러 노드로 자연스럽게 분산된다. + +## 주의 사항 +- **데이터를 저장할 때 사용한 replicaCount와 조회할 때 사용하는 replicaCount는 반드시 동일해야 한다.** + 서로 다를 경우 존재하지 않는 복제 키를 조회하거나, 일부 데이터를 조회하지 못할 수 있다. +- 복제본 간의 데이터 동기화는 클라이언트의 setReplicas 호출 시점에 이루어진다. + 따라서 갱신 중 아주 짧은 순간이나 일부 노드 저장 실패 시, 복제본 간 데이터가 일치하지 않을 수 있다. +- 현재는 Key-Value (KV) 아이템에 대해서만 지원하며, List, Set, B+Tree 등의 Collection 타입은 지원하지 않는다. + +## 사용법 +### 1. 데이터 저장 (`setReplicas()`) +Hot Key를 여러 노드에 복제하여 저장한다. 내부적으로 `asyncStoreBulk()`를 사용하여 병렬로 저장을 수행한다. + +```java +String key = "product:12345:best_seller"; +int replicaCount = 3; +int expTime = 60; +String value = "This is hot data"; + +Future> future = + client.setReplicas(key, replicaCount, expTime, value); // (1) + +try { + Map result = future.get(); // (2) + + for (Map.Entry entry : result.entrySet()) { + if (!entry.getValue().isSuccess()) { // (3) + System.err.println("Failed to store replica: " + entry.getKey()); + } + } +} catch (Exception e) { + // handle exception + return; +} +``` + +1. `setReplicas`를 호출하여 비동기적으로 replicaCount만큼의 복제본 저장을 시도한다. + 반환된 Future 객체는 각 복제 키별 저장 결과를 담고 있다. +2. `future.get()`을 통해 실제 저장이 완료될 때까지 대기하고 결과를 받아온다. +3. 반환된 Map을 순회하며 실패한 노드가 있는지 확인한다. 분산 환경 특성상 일부 노드 저장에 실패할 수 있으므로, 비즈니스 로직에 따라 재시도 여부를 결정해야 한다. +- `set`연산은 멱등성(Idempotent)을 가지므로, 실패한 키에 대해서 반복적으로 재시도해도 데이터 일관성에 문제가 없다. + +### 2. 비동기 데이터 조회 (`asyncGetFromReplica()`) +재시도 로직 없이, 단순히 임의의 복제본 하나를 비동기로 조회한다. 속도와 부하 분산이 중요하고 재시도가 필요 없는 경우 사용한다. + +```java +GetFuture future = client.asyncGetFromReplica(key, replicaCount); +Object result = future.get(); +``` + +### 3. 데이터 조회 (`getFromReplica()`) +복제된 데이터 중 하나를 조회한다. 이 메서드는 동기 방식으로 동작하며, 내부적으로 **재시도 로직**이 포함되어 있다. +- 재시도 로직: 임의로 선택한 첫 번째 복제본 조회에 실패(null)하거나 에러가 발생하면, 즉시 null을 반환하지 않고 다른 복제본에 대해 조회를 재시도한다. + +```java + +Object result = client.getFromReplica(key, replicaCount); // (1) + +if (result != null) { + System.out.println("Value: " + result); +} +``` + +1. `getFromReplica`를 호출하여 데이터를 조회한다. 임의로 선택한 첫 번째 복제본 조회에 실패하거나 에러가 발생하면, + 즉시 null을 반환하지 않고 다른 복제본에 대해 조회를 재시도한다. diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 31cc8583f..07ab1b824 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -26,12 +26,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -205,6 +207,7 @@ public class ArcusClient extends FrontCacheMemcachedClient implements ArcusClien private static final AtomicInteger CLIENT_ID = new AtomicInteger(1); private static final int MAX_DNS_CACHE_TTL = 300; + private final Random RANDOM = new Random(); private CacheManager cacheManager; @@ -511,6 +514,118 @@ public void gotAttribute(String k, String attr) { return rv; } + /** + * Store a value to multiple replica keys to distribute read traffic. + * The replica keys are generated by appending a suffix (e.g. #0, #1...) to the given base key. + * + * @param key the base key + * @param replicaCount the number of replicas to create + * @param exp the expiration time of the value + * @param value the object to store + * @param tc the transcoder to serialize the value + * + * @return future holding the map of failed keys and their operation status + */ + @Override + public Future> setReplicas(final String key, + final int replicaCount, + final int exp, T value, + final Transcoder tc) { + if (replicaCount <= 1) { + throw new IllegalArgumentException("Replica count must be greater than 1"); + } + + Map items = new HashMap<>(); + for (int i = 0; i < replicaCount; i++) { + String replicaKey = getReplicaKey(key, i); + items.put(replicaKey, value); + } + + return asyncStoreBulk(StoreType.set, items, exp, tc); + } + + @Override + public Future> setReplicas(String key, + int replicaCount, + int exp, + Object value) { + return setReplicas(key, replicaCount, exp, value, transcoder); + } + + /** + * Asynchronously retrieve a value from a random replica key. + * This method selects one random index within the replica count and requests the value + * to distribute read load across multiple nodes. + * + * @param key the base key + * @param replicaCount the number of replicas available + * @param tc the transcoder to deserialize the value + * @return a future holding the fetched value + */ + @Override + public Future asyncGetFromReplica(final String key, + final int replicaCount, + final Transcoder tc) { + if (replicaCount <= 1) { + throw new IllegalArgumentException("Replica count must be greater than 1"); + } + + int randomIndex = RANDOM.nextInt(replicaCount); + String replicaKey = getReplicaKey(key, randomIndex); + + return asyncGet(replicaKey, tc); + } + + @Override + public Future asyncGetFromReplica(String key, int replicaCount) { + return asyncGetFromReplica(key, replicaCount, transcoder); + } + + /** + * Synchronously retrieve a value from replicas. + * This method tries to fetch data from replica keys in a random order until a value is found. + * This provides higher availability than reading from a single key. + * + * @param k the base key + * @param replicaCount the number of replicas available + * @param tc the transcoder to deserialize the value + * @return the fetched value, or null if all replicas fail or do not exist + */ + @Override + public T getFromReplica(final String k, final int replicaCount, final Transcoder tc) { + if (replicaCount <= 1) { + throw new IllegalArgumentException("Replica count must be greater than 1"); + } + + List indexList = new ArrayList<>(); + for (int i = 0; i < replicaCount; i++) { + indexList.add(i); + } + Collections.shuffle(indexList); + + for (int index : indexList) { + String replicaKey = getReplicaKey(k, index); + try { + T value = get(replicaKey, tc); + if (value != null) { + return value; + } + } catch (Exception e) { + getLogger().warn("Exception while getting replica key: " + replicaKey, e); + } + } + return null; + } + + @Override + public Object getFromReplica(String key, int replicaCount) { + return getFromReplica(key, replicaCount, transcoder); + } + + private String getReplicaKey(String key, int index) { + return key + "#" + index; + } + /** * Generic get operation for list items. Public methods for list items call this method. * diff --git a/src/main/java/net/spy/memcached/ArcusClientIF.java b/src/main/java/net/spy/memcached/ArcusClientIF.java index 5ad4f108c..9441723b2 100644 --- a/src/main/java/net/spy/memcached/ArcusClientIF.java +++ b/src/main/java/net/spy/memcached/ArcusClientIF.java @@ -66,6 +66,75 @@ public interface ArcusClientIF extends MemcachedClientIF { */ CollectionFuture asyncGetAttr(final String key); + /** + * Set an item with multiple nodes using a specific transcoder. + * + * @param the expected class of the value + * @param key the base key + * @param replicaCount number of replicas + * @param exp expiration time + * @param value value to store + * @param tc transcoder to encode value + * @return a future indicating the success/failure of each replica store + */ + Future> setReplicas( + String key, int replicaCount, int exp, T value, Transcoder tc); + + /** + * Set an item with multiple nodes to distribute read traffic. + * The item is stored in N different nodes using replica keys (key#0, key#1, ...). + * + * @param key the base key + * @param replicaCount number of replicas + * @param exp expiration time + * @param value value to store + * @return a future indicating the success/failure of each replica store + */ + Future> setReplicas( + String key, int replicaCount, int exp, Object value); + + /** + * Get an item from one of the nodes asynchronously using a specific transcoder. + * + * @param the expected class of the value + * @param key the base key + * @param replicaCount number of replicas + * @param tc transcoder to decode value + * @return a future holding the value + */ + Future asyncGetFromReplica(String key, int replicaCount, Transcoder tc); + + /** + * Get an item from one of the nodes asynchronously. + * Selects a random replica key (key#N) to distribute load. + * + * @param key the base key + * @param replicaCount number of replicas + * @return a future holding the value + */ + Future asyncGetFromReplica(String key, int replicaCount); + + /** + * Get an item from nodes synchronously with internal retry logic using a transcoder. + * + * @param the expected class of the value + * @param key the base key + * @param replicaCount number of replicas + * @param tc transcoder to decode value + * @return the value, or null if all replicas miss + */ + T getFromReplica(String key, int replicaCount, Transcoder tc); + + /** + * Get an item from nodes synchronously with internal retry logic. + * If the selected replica returns null (miss), it tries other replicas + * to prevent false-negative misses due to node failure or eviction. + * + * @param key the base key + * @param replicaCount number of replicas + * @return the value, or null if all replicas miss + */ + Object getFromReplica(String key, int replicaCount); /** * Checks an item membership in a set. diff --git a/src/main/java/net/spy/memcached/ArcusClientPool.java b/src/main/java/net/spy/memcached/ArcusClientPool.java index bec62f40b..5a5134575 100644 --- a/src/main/java/net/spy/memcached/ArcusClientPool.java +++ b/src/main/java/net/spy/memcached/ArcusClientPool.java @@ -496,6 +496,38 @@ public CollectionFuture asyncGetAttr(String key) { return this.getClient().asyncGetAttr(key); } + @Override + public Future> setReplicas( + String key, int replicaCount, int exp, Object value) { + return this.getClient().setReplicas(key, replicaCount, exp, value); + } + + @Override + public Future> setReplicas( + String key, int replicaCount, int exp, T value, Transcoder tc) { + return this.getClient().setReplicas(key, replicaCount, exp, value, tc); + } + + @Override + public Future asyncGetFromReplica(String key, int replicaCount) { + return this.getClient().asyncGetFromReplica(key, replicaCount); + } + + @Override + public Future asyncGetFromReplica(String key, int replicaCount, Transcoder tc) { + return this.getClient().asyncGetFromReplica(key, replicaCount, tc); + } + + @Override + public Object getFromReplica(String key, int replicaCount) { + return this.getClient().getFromReplica(key, replicaCount); + } + + @Override + public T getFromReplica(String key, int replicaCount, Transcoder tc) { + return this.getClient().getFromReplica(key, replicaCount, tc); + } + @Override public CollectionFuture asyncSopExist(String key, T value, Transcoder tc) { diff --git a/src/test/manual/net/spy/memcached/ArcusClientReplicaTest.java b/src/test/manual/net/spy/memcached/ArcusClientReplicaTest.java new file mode 100644 index 000000000..ffc610cc6 --- /dev/null +++ b/src/test/manual/net/spy/memcached/ArcusClientReplicaTest.java @@ -0,0 +1,104 @@ +package net.spy.memcached; + +import java.util.Map; +import java.util.concurrent.Future; + +import net.spy.memcached.collection.BaseIntegrationTest; +import net.spy.memcached.ops.OperationStatus; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public class ArcusClientReplicaTest { + + private ArcusClientPool arcusClientPool; + private static final int POOL_SIZE = 4; + private static final String KEY = "test:replica:key"; + private static final String VALUE = "replica-value"; + private static final int REPLICA_COUNT = 3; + private static final int EXPIRE_TIME = 60; + + @BeforeEach + protected void setUp() throws Exception { + // This test assumes we use ZK + assumeTrue(BaseIntegrationTest.USE_ZK); + + arcusClientPool = ArcusClient.createArcusClientPool( + BaseIntegrationTest.ZK_ADDRESS, + BaseIntegrationTest.SERVICE_CODE, + POOL_SIZE + ); + + arcusClientPool.flush().get(); + } + + @Test + void testReplicaSetAndGet_Sync() throws Exception { + + Future> future = + arcusClientPool.setReplicas(KEY, REPLICA_COUNT, EXPIRE_TIME, VALUE); + Map result = future.get(); + + // result.size(): fail request count + assertEquals(0, result.size()); + + Object gotValue = arcusClientPool.getFromReplica(KEY, REPLICA_COUNT); + assertNotNull(gotValue); + assertEquals(VALUE, gotValue); + + for (int i = 0; i < REPLICA_COUNT; i++) { + Object rValue = arcusClientPool.get(KEY + "#" + i); + assertNotNull(rValue); + assertEquals(VALUE, rValue); + } + } + + @Test + void testReplicaSetAndGet_Async() throws Exception { + Future> future = + arcusClientPool.setReplicas(KEY, REPLICA_COUNT, EXPIRE_TIME, VALUE); + Map result = future.get(); + + assertEquals(0, result.size()); + + Object asyncGotValue = arcusClientPool.asyncGetFromReplica(KEY, REPLICA_COUNT).get(); + + assertNotNull(asyncGotValue); + assertEquals(VALUE, asyncGotValue); + } + + @Test + void testGetFromReplica_Miss() { + Object value = arcusClientPool.getFromReplica(KEY, REPLICA_COUNT); + assertNull(value); + } + + @Test + void testGetFromReplica_PartialHit() throws Exception { + arcusClientPool.set(KEY + "#1", EXPIRE_TIME, VALUE).get(); + Object gotValue = arcusClientPool.getFromReplica(KEY, REPLICA_COUNT); + + assertNotNull(gotValue); + assertEquals(VALUE, gotValue); + } + + @Test + void testReplicaUpdate() throws Exception { + arcusClientPool.setReplicas(KEY, REPLICA_COUNT, EXPIRE_TIME, "initial-value").get(); + + String newValue = "updated-value"; + arcusClientPool.setReplicas(KEY, REPLICA_COUNT, EXPIRE_TIME, newValue).get(); + + Object gotValue = arcusClientPool.getFromReplica(KEY, REPLICA_COUNT); + assertEquals(newValue, gotValue); + + for (int i = 0; i < REPLICA_COUNT; i++) { + assertEquals(newValue, arcusClientPool.get(KEY + "#" + i)); + } + } +}