Skip to content

Commit e3950cf

Browse files
committed
memory mode: prefix swap related struct members, function names with swap_.
1 parent 7b937d8 commit e3950cf

40 files changed

+278
-294
lines changed

src/blocked.c

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@
6565
#include "latency.h"
6666
#include "monotonic.h"
6767

68+
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto
6869
#ifdef ENABLE_SWAP
69-
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto, list* swap_wrong_type_error_keys);
70-
#else
71-
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
70+
, list* swap_wrong_type_error_keys
7271
#endif
72+
);
7373
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
7474

7575
/* This structure represents the blocked key information that we store
@@ -272,11 +272,11 @@ void disconnectAllBlockedClients(void) {
272272
/* Helper function for handleClientsBlockedOnKeys(). This function is called
273273
* when there may be clients blocked on a list key, and there may be new
274274
* data to fetch (the key is ready). */
275+
void serveClientsBlockedOnListKey(robj *o, readyList *rl
275276
#ifdef ENABLE_SWAP
276-
void serveClientsBlockedOnListKey(robj *o, readyList *rl, list* swap_wrong_type_error_keys) {
277-
#else
278-
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
277+
, list* swap_wrong_type_error_keys
279278
#endif
279+
) {
280280
/* We serve clients in the same order they blocked for
281281
* this key, from the first blocked to the last. */
282282
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
@@ -299,7 +299,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
299299
int wherefrom = receiver->bpop.listpos.wherefrom;
300300
int whereto = receiver->bpop.listpos.whereto;
301301
#ifdef ENABLE_SWAP
302-
robj *value = ctripListTypePop(o, wherefrom, rl->db, rl->key);
302+
robj *value = swapListTypePop(o, wherefrom, rl->db, rl->key);
303303
#else
304304
robj *value = listTypePop(o, wherefrom);
305305
#endif
@@ -314,16 +314,16 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
314314
elapsedStart(&replyTimer);
315315
if (serveClientBlockedOnList(receiver,
316316
rl->key,dstkey,rl->db,value,
317+
wherefrom, whereto
317318
#ifdef ENABLE_SWAP
318-
wherefrom, whereto, swap_wrong_type_error_keys) == C_ERR)
319-
#else
320-
wherefrom, whereto) == C_ERR)
319+
, swap_wrong_type_error_keys
321320
#endif
321+
) == C_ERR)
322322
{
323323
/* If we failed serving the client we need
324324
* to also undo the POP operation. */
325325
#ifdef ENABLE_SWAP
326-
ctripListTypePush(o,value,wherefrom, rl->db, rl->key);
326+
swapListTypePush(o,value,wherefrom, rl->db, rl->key);
327327
#else
328328
listTypePush(o,value,wherefrom);
329329
#endif
@@ -340,8 +340,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
340340
}
341341

342342
#ifdef ENABLE_SWAP
343-
objectMeta *om = lookupMeta(rl->db, rl->key);
344-
if (ctripListTypeLength(o, om) == 0) {
343+
if (swapListTypeLength(o, lookupMeta(rl->db, rl->key)) == 0) {
345344
#else
346345
if (listTypeLength(o) == 0) {
347346
#endif

src/config.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2998,7 +2998,7 @@ standardConfig configs[] = {
29982998
createIntConfig("ctrip-monitor-port", NULL, IMMUTABLE_CONFIG, 0, 65535, server.ctrip_monitor_port, 6380, INTEGER_CONFIG, NULL, NULL), /* Monitor TCP port. */
29992999
createIntConfig("swap-slow-expire-effort", NULL, MODIFIABLE_CONFIG, -10, 10, server.swap_slow_expire_effort, -5, INTEGER_CONFIG, NULL, NULL),
30003000
createIntConfig("swap-debug-evict-keys", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, server.swap_debug_evict_keys, 0, INTEGER_CONFIG, NULL, NULL),
3001-
createIntConfig("ps-parallism-rdb", NULL, MODIFIABLE_CONFIG, 4, 16384, server.ps_parallism_rdb, 32, INTEGER_CONFIG, NULL, NULL),
3001+
createIntConfig("swap-ps-parallism-rdb", "ps-parallism-rdb", MODIFIABLE_CONFIG, 4, 16384, server.swap_ps_parallism_rdb, 32, INTEGER_CONFIG, NULL, NULL),
30023002
createIntConfig("swap-evict-step-max-subkeys", NULL, MODIFIABLE_CONFIG, 0, 65536, server.swap_evict_step_max_subkeys, 1024, INTEGER_CONFIG, NULL, NULL),
30033003
createIntConfig("swap-debug-rio-delay-micro", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, server.swap_debug_rio_delay_micro, 0, INTEGER_CONFIG, NULL, NULL),
30043004
createIntConfig("swap-threads", NULL, IMMUTABLE_CONFIG, 4, 64, server.swap_threads_num, 4, INTEGER_CONFIG, NULL, NULL),

src/ctrip_swap.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -545,14 +545,14 @@ void swapMutexopCommand(client *c) {
545545
}
546546

547547
int lockGlobalAndExec(clientKeyRequestFinished locked_op, uint64_t exclude_mark) {
548-
if (exclude_mark && server.req_submitted&exclude_mark) {
548+
if (exclude_mark && server.swap_req_submitted&exclude_mark) {
549549
return 0;
550550
}
551551
/* add flag before submit request otherwise when
552552
* global lock no block, flag may be del just after submit */
553-
server.req_submitted |= exclude_mark;
553+
server.swap_req_submitted |= exclude_mark;
554554

555-
client *c = server.mutex_client;
555+
client *c = server.swap_mutex_client;
556556
getKeyRequestsResult result = GET_KEYREQUESTS_RESULT_INIT;
557557
getKeyRequestsPrepareResult(&result,1);
558558
getKeyRequestsAppendSubkeyResult(&result,REQUEST_LEVEL_SVR,NULL,0,NULL,

src/ctrip_swap.h

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ void swapPersistCtxPersistKeys(swapPersistCtx *ctx);
409409
sds genSwapPersistInfoString(sds info);
410410
void swapPersistKeyRequestFinished(swapPersistCtx *ctx, int dbid, robj *key, uint64_t persist_version);
411411
void loadDataFromDisk(void);
412-
void ctripLoadDataFromDisk(void);
412+
void swap_loadDataFromDisk(void);
413413
int submitEvictClientRequest(client *c, robj *key, int persist_keep, uint64_t persist_version);
414414

415415
#define setObjectPersistKeep(o) do { \
@@ -864,7 +864,7 @@ void startSwapRewind(swap_rewind_type rewind_type);
864864
void endSwapRewind(void);
865865
void freeClientSwapCmdTrace(client *c);
866866

867-
/* see server.req_submitted */
867+
/* see server.swap_req_submitted */
868868
#define REQ_SUBMITTED_NONE 0
869869
#define REQ_SUBMITTED_BGSAVE (1ULL<<0)
870870
#define REQ_SUBMITTED_REPL_START (1ULL<<1)
@@ -927,7 +927,7 @@ int swapDataSetupSet(swapData *d, OUT void **datactx);
927927

928928
unsigned long swap_setTypeSize(const objectMeta *meta, const robj *o);
929929
unsigned long swap_setTypeSizeLookup(redisDb *db, robj *key, const robj *o);
930-
void ctrip_scardCommand(client *c);
930+
void swap_scardCommand(client *c);
931931

932932
/* List */
933933
typedef struct listSwapData {
@@ -964,10 +964,10 @@ void clientArgRewritesRestore(client *c);
964964
void clientArgRewrite(client *c, argRewriteRequest arg_req, MOVE robj *new_arg);
965965

966966

967-
long ctripListTypeLength(robj *list, objectMeta *object_meta);
968-
void ctripListTypePush(robj *subject, robj *value, int where, redisDb *db, robj *key);
969-
robj *ctripListTypePop(robj *subject, int where, redisDb *db, robj *key);
970-
void ctripListMetaDelRange(redisDb *db, robj *key, long ltrim, long rtrim);
967+
long swapListTypeLength(robj *list, objectMeta *object_meta);
968+
void swapListTypePush(robj *subject, robj *value, int where, redisDb *db, robj *key);
969+
robj *swapListTypePop(robj *subject, int where, redisDb *db, robj *key);
970+
void swapListMetaDelRange(redisDb *db, robj *key, long ltrim, long rtrim);
971971
/* zset */
972972
typedef struct zsetSwapData {
973973
swapData sd;
@@ -1712,25 +1712,25 @@ typedef struct swapEvictKeysCtx {
17121712
int ended;
17131713
} swapEvictKeysCtx;
17141714

1715-
void ctrip_startEvictionTimeProc(void);
1716-
size_t ctrip_getMemoryToFree(size_t mem_used);
1717-
void ctrip_performEvictionStart(swapEvictKeysCtx *sectx);
1718-
int ctrip_performEvictionLoopStartShouldBreak(swapEvictKeysCtx *sectx);
1715+
void swap_startEvictionTimeProc(void);
1716+
size_t swap_getMemoryToFree(size_t mem_used);
1717+
void swap_performEvictionStart(swapEvictKeysCtx *sectx);
1718+
int swap_performEvictionLoopStartShouldBreak(swapEvictKeysCtx *sectx);
17191719
size_t performEvictionSwapSelectedKey(swapEvictKeysCtx *sectx, redisDb *db, robj *keyobj);
1720-
int ctrip_performEvictionLoopCheckShouldBreak(swapEvictKeysCtx *sectx);
1721-
void ctrip_performEvictionEnd(swapEvictKeysCtx *sectx);
1722-
static inline int ctrip_performEvictionLoopCheckInterval(int keys_freed) {
1720+
int swap_performEvictionLoopCheckShouldBreak(swapEvictKeysCtx *sectx);
1721+
void swap_performEvictionEnd(swapEvictKeysCtx *sectx);
1722+
static inline int swap_performEvictionLoopCheckInterval(int keys_freed) {
17231723
return keys_freed % server.swap_evict_loop_check_interval == 0;
17241724
}
17251725
/* used memory in disk swap mode */
17261726
size_t coldFiltersUsedMemory(void); /* cuckoo filter not counted in maxmemory */
1727-
static inline size_t ctrip_getUsedMemory(void) {
1727+
static inline size_t swap_getUsedMemory(void) {
17281728
int swap_inprogress_memory;
17291729
atomicGet(server.swap_inprogress_memory, swap_inprogress_memory);
17301730
return zmalloc_used_memory() - swap_inprogress_memory -
17311731
coldFiltersUsedMemory() - swapPersistCtxUsedMemory(server.swap_persist_ctx);
17321732
}
1733-
static inline int ctrip_evictionTimeProcGetDelayMillis(void) {
1733+
static inline int swap_evictionTimeProcGetDelayMillis(void) {
17341734
if (swapEvictionReachedInprogressLimit()) return 1;
17351735
else return 0;
17361736
}
@@ -2302,7 +2302,7 @@ typedef struct rdbSaveRocksStats {
23022302
} rdbSaveRocksStats;
23032303

23042304
/* rdb save */
2305-
int rdbSaveKeyHeader(rio *rdb, robj *key, robj *evict, unsigned char rdbtype, long long expiretime);
2305+
int rdbSaveKeyHeader(rio *rdb, robj *key, robj *o, unsigned char rdbtype, long long expiretime);
23062306
int rdbKeySaveHotExtensionInit(rdbKeySaveData *keydata, redisDb *db, sds keystr);
23072307
int rdbKeySaveWarmColdInit(rdbKeySaveData *keydata, redisDb *db, decodedResult *dr);
23082308
void rdbKeySaveDataDeinit(rdbKeySaveData *keydata);
@@ -2324,7 +2324,7 @@ int bitmapSaveInit(rdbKeySaveData *save, uint64_t version, const char *extend, s
23242324
#define RDB_LOAD_BATCH_COUNT 50
23252325
#define RDB_LOAD_BATCH_CAPACITY (4*1024*1024)
23262326

2327-
typedef struct ctripRdbLoadCtx {
2327+
typedef struct swapRdbLoadObjectCtx {
23282328
size_t errors;
23292329
size_t idx;
23302330
struct {
@@ -2336,10 +2336,10 @@ typedef struct ctripRdbLoadCtx {
23362336
sds *rawkeys;
23372337
sds *rawvals;
23382338
} batch;
2339-
} ctripRdbLoadCtx;
2339+
} swapRdbLoadObjectCtx;
23402340

2341-
void evictStartLoading(void);
2342-
void evictStopLoading(int success);
2341+
void swapStartLoading(void);
2342+
void swapStopLoading(int success);
23432343

23442344
struct rdbKeyLoadData;
23452345

@@ -2374,7 +2374,7 @@ static inline sds rdbVerbatimNew(unsigned char rdbtype) {
23742374

23752375
int rdbLoadStringVerbatim(rio *rdb, sds *verbatim);
23762376
int rdbLoadHashFieldsVerbatim(rio *rdb, unsigned long long len, sds *verbatim);
2377-
int ctripRdbLoadObject(int rdbtype, rio *rdb, redisDb *db, sds key, long long expire, long long now, rdbKeyLoadData *keydata);
2377+
int swapRdbLoadObject(int rdbtype, rio *rdb, redisDb *db, sds key, long long expire, long long now, rdbKeyLoadData *keydata);
23782378
robj *rdbKeyLoadGetObject(rdbKeyLoadData *keydata);
23792379
int rdbKeyLoadDataInit(rdbKeyLoadData *keydata, int rdbtype, redisDb *db, sds key, long long expire, long long now);
23802380
void rdbKeyLoadDataDeinit(rdbKeyLoadData *keydata);
@@ -2610,11 +2610,22 @@ void swapDebugCommand(client *c);
26102610
void swapExpiredCommand(client *c);
26112611
const char *strObjectType(int type);
26122612
int timestampIsExpired(mstime_t expire);
2613-
size_t ctripDbSize(redisDb *db);
2613+
size_t swap_dbSize(redisDb *db);
26142614
long get_dir_size(char *dirname);
26152615
robj *swapRandomKey(redisDb *db, metaScanResult *metas);
26162616
void dbPauseRehash(redisDb *db);
26172617
void dbResumeRehash(redisDb *db);
2618+
int debugGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
2619+
void commandProcessed(client *c);
2620+
ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
2621+
void trackInstantaneousMetric(int metric, long long current_reading);
2622+
long long getInstantaneousMetric(int metric);
2623+
void swapInitServerConfig(void);
2624+
void swapInitServer(void);
2625+
void freeClientsInDeferedQueue(void);
2626+
void swap_replicationStartPendingFork(void);
2627+
void debugSwapOutCommand(client *c);
2628+
size_t swapobjectComputeSize(robj *val, int samples, objectMeta *object_meta);
26182629

26192630
void notifyKeyspaceEventDirty(int type, char *event, robj *key, int dbid, ...);
26202631
void notifyKeyspaceEventDirtyKey(int type, char *event, robj *key, int dbid);

src/ctrip_swap_async.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ int asyncCompleteQueueInit() {
124124
return -1;
125125
}
126126

127-
server.CQ = cq;
127+
server.swap_CQ = cq;
128128
return 0;
129129
}
130130

@@ -137,7 +137,7 @@ void asyncCompleteQueueDeinit(asyncCompleteQueue *cq) {
137137

138138
void asyncSwapRequestNotifyCallback(swapRequestBatch *reqs, void *pd) {
139139
UNUSED(pd);
140-
asyncCompleteQueueAppend(server.CQ, reqs);
140+
asyncCompleteQueueAppend(server.swap_CQ, reqs);
141141
}
142142

143143
void asyncCompleteQueueAppend(asyncCompleteQueue *cq, swapRequestBatch *reqs) {
@@ -164,9 +164,9 @@ static int asyncCompleteQueueDrained() {
164164
int drained = 1;
165165

166166
if (!swapThreadsDrained()) return 0;
167-
pthread_mutex_lock(&server.CQ->lock);
168-
if (listLength(server.CQ->complete_queue)) drained = 0;
169-
pthread_mutex_unlock(&server.CQ->lock);
167+
pthread_mutex_lock(&server.swap_CQ->lock);
168+
if (listLength(server.swap_CQ->complete_queue)) drained = 0;
169+
pthread_mutex_unlock(&server.swap_CQ->lock);
170170

171171
return drained;
172172
}
@@ -176,7 +176,7 @@ int asyncCompleteQueueDrain(mstime_t time_limit) {
176176
mstime_t start = mstime();
177177

178178
while (!asyncCompleteQueueDrained()) {
179-
asyncCompleteQueueProcess(server.CQ);
179+
asyncCompleteQueueProcess(server.swap_CQ);
180180

181181
if (time_limit >= 0 && mstime() - start > time_limit) {
182182
result = -1;

src/ctrip_swap_blocked.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ int serveClientsBlockedOnListKeyWithoutTargetKey(robj *o, readyList *rl) {
210210
}
211211
int wherefrom = receiver->bpop.listpos.wherefrom;
212212
int whereto = receiver->bpop.listpos.whereto;
213-
robj *value = ctripListTypePop(o, wherefrom, rl->db, rl->key);
213+
robj *value = swapListTypePop(o, wherefrom, rl->db, rl->key);
214214
if (value) {
215215
/* Protect receiver->bpop.target, that will be
216216
* freed by the next unblockClient()
@@ -225,7 +225,7 @@ int serveClientsBlockedOnListKeyWithoutTargetKey(robj *o, readyList *rl) {
225225
{
226226
/* If we failed serving the client we need
227227
* to also undo the POP operation. */
228-
ctripListTypePush(o,value,wherefrom, rl->db, rl->key);
228+
swapListTypePush(o,value,wherefrom, rl->db, rl->key);
229229
}
230230
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
231231
unblockClient(receiver);
@@ -240,7 +240,7 @@ int serveClientsBlockedOnListKeyWithoutTargetKey(robj *o, readyList *rl) {
240240
}
241241
end:
242242
om = lookupMeta(rl->db, rl->key);
243-
if (ctripListTypeLength(o, om) == 0) {
243+
if (swapListTypeLength(o, om) == 0) {
244244
dbDelete(rl->db,rl->key);
245245
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
246246
exists_list_blocked_with_target_key = 0;

src/ctrip_swap_cmd.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = {
226226
"read-only fast @set @swap_set",
227227
0,NULL,getKeyRequestSmembers,SWAP_IN,0,1,1,1,0,0,0},
228228

229-
{"scard",ctrip_scardCommand,2,
229+
{"scard",swap_scardCommand,2,
230230
"read-only fast @set @swap_set",
231231
0,NULL,NULL,SWAP_IN,SWAP_IN_META,1,1,1,0,0,0},
232232

src/ctrip_swap_compact.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ static unsigned char metaVersionFilterFilt(void* mvfilter_, int level, int cf, c
112112
if (state == FILTER_STATE_CLOSE) return 0;
113113
/* Since release 6.0, with compaction filter enabled, RocksDB always invoke filtering for any key,
114114
* even if it knows it will make a snapshot not repeatable. */
115-
atomicGet(server.inflight_snapshot, inflight_snapshot);
115+
atomicGet(server.rocksdb_inflight_snapshot, inflight_snapshot);
116116
if (inflight_snapshot > 0) return 0;
117117

118118
updateCompactionFiltScanCount(cf);

src/ctrip_swap_debug.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@
2828

2929
#include "ctrip_swap.h"
3030

31+
int debugGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
32+
int *keys = NULL;
33+
UNUSED(cmd);
34+
if (argc == 3 && !strcasecmp(argv[1]->ptr,"object")) {
35+
keys = getKeysPrepareResult(result,1);
36+
result->numkeys = 1;
37+
keys[0] = 2;
38+
} else if (argc >= 3 && !strcasecmp(argv[1]->ptr,"digest-value")) {
39+
keys = getKeysPrepareResult(result,argc-2);
40+
result->numkeys = argc-2;
41+
for (int i = 2; i < argc; i++) keys[i-2] = i;
42+
} else {
43+
keys = getKeysPrepareResult(result,1);
44+
result->numkeys = 0;
45+
}
46+
return result->numkeys;
47+
}
48+
3149
static sds debugRioGet(int cf, sds rawkey) {
3250
sds rawval;
3351
RIO _rio, *rio = &_rio;

0 commit comments

Comments
 (0)