Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions docs/user-guide/12-replica-hot-key.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# 12. Hot Key Replication 기능

## 개요
분산 캐시 환경에서 특정 Key에 대한 요청이 급증하는 경우, Key가 저장된 특정 캐시 노드에만 부하가 집중될 수 있다.
이로 인해 해당 노드의 응답 속도가 저하되거나 장애가 발생하면 전체 서비스에 영향을 줄 수 있다.

ARCUS Java Client는 이러한 문제를 해결하기 위해 Hot Key Replication(복제) 기능을 제공한다.
이 기능은 클라이언트 레벨에서 하나의 아이템을 여러 개의 복제본으로 생성하여 가능한 한 서로 다른 캐시 노드에 분산 저장하고,
조회 시에는 이 중 하나를 임의로 선택하여 읽어옴으로써 읽기 부하를 효과적으로 분산시킨다.

## 동작 원리
Hot Key Replication은 ARCUS의 표준 분산 방식인 Consistent Hashing을 그대로 활용한다.

1. 저장 (Set): 사용자가 요청한 replicaCount 만큼 복제 키를 생성한다.
- 생성 규칙: 원본키 + # + 인덱스 (예: `hot:item#0`, `hot:item#1`, `hot:item#2`)
- 생성된 복제 키들은 해시 알고리즘에 의해 캐시 클러스터 내 여러 노드로 분산되어 저장된다.

2. 조회 (Get): 사용자가 조회 요청을 하면, 클라이언트는 0 ~ replicaCount-1 사이의 인덱스 중 하나를 무작위로 선택하여 복제 키를 생성하고 조회한다.
이를 통해 읽기 요청이 여러 노드로 자연스럽게 분산된다.

## 주의 사항
- **데이터를 저장할 때 사용한 replicaCount와 조회할 때 사용하는 replicaCount는 반드시 동일해야 한다.**
서로 다를 경우 존재하지 않는 복제 키를 조회하거나, 일부 데이터를 조회하지 못할 수 있다.
- 복제본 간의 데이터 동기화는 클라이언트의 setReplicas 호출 시점에 이루어진다.
따라서 갱신 중 아주 짧은 순간이나 일부 노드 저장 실패 시, 복제본 간 데이터가 일치하지 않을 수 있다.
- 현재는 Key-Value (KV) 아이템에 대해서만 지원하며, List, Set, B+Tree 등의 Collection 타입은 지원하지 않는다.

## 사용법
### 1. 데이터 저장 (`setReplicas()`)
Hot Key를 여러 노드에 복제하여 저장한다. 내부적으로 `asyncStoreBulk()`를 사용하여 병렬로 저장을 수행한다.

```java
String key = "product:12345:best_seller";
int replicaCount = 3;
int expTime = 60;
String value = "This is hot data";

Future<Map<String, OperationStatus>> future =
client.setReplicas(key, replicaCount, expTime, value); // (1)

try {
Map<String, OperationStatus> result = future.get(); // (2)

for (Map.Entry <String, OperationStatus> entry : result.entrySet()) {
if (!entry.getValue().isSuccess()) { // (3)
System.err.println("Failed to store replica: " + entry.getKey());
}
}
} catch (Exception e) {
// handle exception
return;
}
```

1. `setReplicas`를 호출하여 비동기적으로 replicaCount만큼의 복제본 저장을 시도한다.
반환된 Future 객체는 각 복제 키별 저장 결과를 담고 있다.
2. `future.get()`을 통해 실제 저장이 완료될 때까지 대기하고 결과를 받아온다.
3. 반환된 Map을 순회하며 실패한 노드가 있는지 확인한다. 분산 환경 특성상 일부 노드 저장에 실패할 수 있으므로, 비즈니스 로직에 따라 재시도 여부를 결정해야 한다.
- `set`연산은 멱등성(Idempotent)을 가지므로, 실패한 키에 대해서 반복적으로 재시도해도 데이터 일관성에 문제가 없다.

### 2. 비동기 데이터 조회 (`asyncGetFromReplica()`)
재시도 로직 없이, 단순히 임의의 복제본 하나를 비동기로 조회한다. 속도와 부하 분산이 중요하고 재시도가 필요 없는 경우 사용한다.

```java
GetFuture<Object> future = client.asyncGetFromReplica(key, replicaCount);
Object result = future.get();
```

### 3. 데이터 조회 (`getFromReplica()`)
복제된 데이터 중 하나를 조회한다. 이 메서드는 동기 방식으로 동작하며, 내부적으로 **재시도 로직**이 포함되어 있다.
- 재시도 로직: 임의로 선택한 첫 번째 복제본 조회에 실패(null)하거나 에러가 발생하면, 즉시 null을 반환하지 않고 다른 복제본에 대해 조회를 재시도한다.

```java

Object result = client.getFromReplica(key, replicaCount); // (1)

if (result != null) {
System.out.println("Value: " + result);
}
```

1. `getFromReplica`를 호출하여 데이터를 조회한다. 임의로 선택한 첫 번째 복제본 조회에 실패하거나 에러가 발생하면,
즉시 null을 반환하지 않고 다른 복제본에 대해 조회를 재시도한다.
115 changes: 115 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -205,6 +207,7 @@ public class ArcusClient extends FrontCacheMemcachedClient implements ArcusClien
private static final AtomicInteger CLIENT_ID = new AtomicInteger(1);

private static final int MAX_DNS_CACHE_TTL = 300;
private final Random RANDOM = new Random();

private CacheManager cacheManager;

Expand Down Expand Up @@ -511,6 +514,118 @@ public void gotAttribute(String k, String attr) {
return rv;
}

/**
* Store a value to multiple replica keys to distribute read traffic.
* The replica keys are generated by appending a suffix (e.g. #0, #1...) to the given base key.
*
* @param key the base key
* @param replicaCount the number of replicas to create
* @param exp the expiration time of the value
* @param value the object to store
* @param tc the transcoder to serialize the value
*
* @return future holding the map of failed keys and their operation status
*/
@Override
public <T> Future<Map<String, OperationStatus>> setReplicas(final String key,
final int replicaCount,
final int exp, T value,
final Transcoder<T> tc) {
if (replicaCount <= 1) {
throw new IllegalArgumentException("Replica count must be greater than 1");
}

Map<String, T> items = new HashMap<>();
for (int i = 0; i < replicaCount; i++) {
String replicaKey = getReplicaKey(key, i);
items.put(replicaKey, value);
}

return asyncStoreBulk(StoreType.set, items, exp, tc);
}

@Override
public Future<Map<String, OperationStatus>> setReplicas(String key,
int replicaCount,
int exp,
Object value) {
return setReplicas(key, replicaCount, exp, value, transcoder);
}

/**
* Asynchronously retrieve a value from a random replica key.
* This method selects one random index within the replica count and requests the value
* to distribute read load across multiple nodes.
*
* @param key the base key
* @param replicaCount the number of replicas available
* @param tc the transcoder to deserialize the value
* @return a future holding the fetched value
*/
@Override
public <T> Future<T> asyncGetFromReplica(final String key,
final int replicaCount,
final Transcoder<T> tc) {
if (replicaCount <= 1) {
throw new IllegalArgumentException("Replica count must be greater than 1");
}

int randomIndex = RANDOM.nextInt(replicaCount);
String replicaKey = getReplicaKey(key, randomIndex);

return asyncGet(replicaKey, tc);
}

@Override
public Future<Object> asyncGetFromReplica(String key, int replicaCount) {
return asyncGetFromReplica(key, replicaCount, transcoder);
}

/**
* Synchronously retrieve a value from replicas.
* This method tries to fetch data from replica keys in a random order until a value is found.
* This provides higher availability than reading from a single key.
*
* @param k the base key
* @param replicaCount the number of replicas available
* @param tc the transcoder to deserialize the value
* @return the fetched value, or null if all replicas fail or do not exist
*/
@Override
public <T> T getFromReplica(final String k, final int replicaCount, final Transcoder<T> tc) {
if (replicaCount <= 1) {
throw new IllegalArgumentException("Replica count must be greater than 1");
}

List<Integer> indexList = new ArrayList<>();
for (int i = 0; i < replicaCount; i++) {
indexList.add(i);
}
Collections.shuffle(indexList);

for (int index : indexList) {
String replicaKey = getReplicaKey(k, index);
try {
T value = get(replicaKey, tc);
if (value != null) {
return value;
}
} catch (Exception e) {
getLogger().warn("Exception while getting replica key: " + replicaKey, e);
}
}
return null;
}

@Override
public Object getFromReplica(String key, int replicaCount) {
return getFromReplica(key, replicaCount, transcoder);
}

private String getReplicaKey(String key, int index) {
return key + "#" + index;
}

/**
* Generic get operation for list items. Public methods for list items call this method.
*
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,75 @@ public interface ArcusClientIF extends MemcachedClientIF {
*/
CollectionFuture<CollectionAttributes> asyncGetAttr(final String key);

/**
* Set an item with multiple nodes using a specific transcoder.
*
* @param <T> the expected class of the value
* @param key the base key
* @param replicaCount number of replicas
* @param exp expiration time
* @param value value to store
* @param tc transcoder to encode value
* @return a future indicating the success/failure of each replica store
*/
<T> Future<Map<String, OperationStatus>> setReplicas(
String key, int replicaCount, int exp, T value, Transcoder<T> tc);

/**
* Set an item with multiple nodes to distribute read traffic.
* The item is stored in N different nodes using replica keys (key#0, key#1, ...).
*
* @param key the base key
* @param replicaCount number of replicas
* @param exp expiration time
* @param value value to store
* @return a future indicating the success/failure of each replica store
*/
Future<Map<String, OperationStatus>> setReplicas(
String key, int replicaCount, int exp, Object value);

/**
* Get an item from one of the nodes asynchronously using a specific transcoder.
*
* @param <T> the expected class of the value
* @param key the base key
* @param replicaCount number of replicas
* @param tc transcoder to decode value
* @return a future holding the value
*/
<T> Future<T> asyncGetFromReplica(String key, int replicaCount, Transcoder<T> tc);

/**
* Get an item from one of the nodes asynchronously.
* Selects a random replica key (key#N) to distribute load.
*
* @param key the base key
* @param replicaCount number of replicas
* @return a future holding the value
*/
Future<Object> asyncGetFromReplica(String key, int replicaCount);

/**
* Get an item from nodes synchronously with internal retry logic using a transcoder.
*
* @param <T> the expected class of the value
* @param key the base key
* @param replicaCount number of replicas
* @param tc transcoder to decode value
* @return the value, or null if all replicas miss
*/
<T> T getFromReplica(String key, int replicaCount, Transcoder<T> tc);

/**
* Get an item from nodes synchronously with internal retry logic.
* If the selected replica returns null (miss), it tries other replicas
* to prevent false-negative misses due to node failure or eviction.
*
* @param key the base key
* @param replicaCount number of replicas
* @return the value, or null if all replicas miss
*/
Object getFromReplica(String key, int replicaCount);

/**
* Checks an item membership in a set.
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,38 @@ public CollectionFuture<CollectionAttributes> asyncGetAttr(String key) {
return this.getClient().asyncGetAttr(key);
}

@Override
public Future<Map<String, OperationStatus>> setReplicas(
String key, int replicaCount, int exp, Object value) {
return this.getClient().setReplicas(key, replicaCount, exp, value);
}

@Override
public <T> Future<Map<String, OperationStatus>> setReplicas(
String key, int replicaCount, int exp, T value, Transcoder<T> tc) {
return this.getClient().setReplicas(key, replicaCount, exp, value, tc);
}

@Override
public Future<Object> asyncGetFromReplica(String key, int replicaCount) {
return this.getClient().asyncGetFromReplica(key, replicaCount);
}

@Override
public <T> Future<T> asyncGetFromReplica(String key, int replicaCount, Transcoder<T> tc) {
return this.getClient().asyncGetFromReplica(key, replicaCount, tc);
}

@Override
public Object getFromReplica(String key, int replicaCount) {
return this.getClient().getFromReplica(key, replicaCount);
}

@Override
public <T> T getFromReplica(String key, int replicaCount, Transcoder<T> tc) {
return this.getClient().getFromReplica(key, replicaCount, tc);
}

@Override
public <T> CollectionFuture<Boolean> asyncSopExist(String key, T value,
Transcoder<T> tc) {
Expand Down
Loading