Skip to content

Commit 6587d33

Browse files
committed
[feat & fix] add importing mode && fix func of swapThreadMain
1. Aimed at controling of some action during importing. 2. See details in Command IMPORT. 3. fixed of list adding node in swapThreadMain.
1 parent 7cb2ec0 commit 6587d33

File tree

11 files changed

+344
-24
lines changed

11 files changed

+344
-24
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: CI
22

33
on:
44
push:
5-
branches: [ xredis_2_ror ]
5+
branches: [ main,deprecated/xredis_2_ror_v1 ]
66
pull_request:
7-
branches: [ xredis_2_ror ]
7+
branches: [ main,deprecated/xredis_2_ror_v1 ]
88

99
jobs:
1010
unit:

src/ctrip_swap.c

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,6 @@ void keyRequestSwapFinished(swapData *data, void *pd, int errcode) {
299299
ctx->finished(ctx->c,ctx);
300300
}
301301

302-
/* Expired key should delete only if server is master, check expireIfNeeded
303-
* for more details. */
304-
int keyExpiredAndShouldDelete(redisDb *db, robj *key) {
305-
if (!keyIsExpired(db,key)) return 0;
306-
if (server.masterhost != NULL) return 0;
307-
if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;
308-
return 1;
309-
}
310-
311302
#define NOSWAP_REASON_KEYNOTEXISTS 1
312303
#define NOSWAP_REASON_NOTKEYLEVEL 2
313304
#define NOSWAP_REASON_KEYNOTSUPPORT 3

src/ctrip_swap_cmd.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,9 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = {
10071007
"admin no-script ok-loading fast may-replicate",
10081008
0,NULL,NULL,SWAP_NOP,0,0,0,0,0,0,0},
10091009

1010+
{"import",importCommand,-2,
1011+
"admin use-memory ok-loading fast @dangerous",
1012+
0,NULL,NULL,SWAP_NOP,0,0,0,0,0,0,0},
10101013
};
10111014

10121015
struct SwapDataTypeItem {

src/ctrip_swap_data.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ int swapDataMarkedPropagateExpire(swapData *data) {
6161
}
6262

6363
static int swapDataExpiredAndShouldDelete(swapData *data) {
64+
if (isImportingExpireDisabled()) return 0;
65+
6466
if (!timestampIsExpired(data->expire)) return 0;
6567
if (server.masterhost != NULL) return 0;
6668
if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;

src/ctrip_swap_expire.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ int scanMetaExpireIfNeeded(redisDb *db, scanMeta *meta) {
6161
if (server.masterhost != NULL) return 1;
6262
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
6363

64-
/* Delete the key */
65-
c = server.swap_expire_clients[db->id];
66-
key = createStringObject(meta->key,sdslen(meta->key));
67-
submitExpireClientRequest(c, key, 0);
68-
decrRefCount(key);
64+
if (!isImportingExpireDisabled()) {
65+
/* Delete the key */
66+
c = server.swap_expire_clients[db->id];
67+
key = createStringObject(meta->key,sdslen(meta->key));
68+
submitExpireClientRequest(c, key, 0);
69+
decrRefCount(key);
70+
}
6971

7072
return 1;
7173
}

src/ctrip_swap_thread.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ void *swapThreadMain (void *arg) {
4747
listRewind(thread->pending_reqs, &li);
4848
processing_reqs = listCreate();
4949
while ((ln = listNext(&li))) {
50-
swapRequest *req = listNodeValue(ln);
51-
listAddNodeHead(processing_reqs, req);
50+
swapRequestBatch *reqs = listNodeValue(ln);
51+
listAddNodeTail(processing_reqs, reqs);
5252
listDelNode(thread->pending_reqs, ln);
5353
}
5454
pthread_mutex_unlock(&thread->lock);

src/db.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,10 +1769,13 @@ int expireIfNeeded(redisDb *db, robj *key) {
17691769
* have failed over and the new primary will send us the expire. */
17701770
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
17711771

1772-
/* Delete the key */
1772+
if (!isImportingExpireDisabled()) {
1773+
/* Delete the key */
17731774
#ifndef ENABLE_SWAP
1774-
deleteExpiredKeyAndPropagate(db,key);
1775+
deleteExpiredKeyAndPropagate(db,key);
17751776
#endif
1777+
}
1778+
17761779
return 1;
17771780
}
17781781

src/server.c

Lines changed: 123 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,10 @@ struct redisCommand redisCommandTable[] = {
11291129
{"xslaveof",xslaveofCommand,3,
11301130
"admin no-script ok-stale",
11311131
0,NULL,0,0,0,0,0,0},
1132+
1133+
{"import",importCommand,-2,
1134+
"admin use-memory ok-loading fast @dangerous",
1135+
0,NULL,0,0,0,0,0,0},
11321136
};
11331137
#endif
11341138

@@ -1896,7 +1900,7 @@ void clientsCron(void) {
18961900
void databasesCron(void) {
18971901
/* Expire keys by random sampling. Not required for slaves
18981902
* as master will synthesize DELs for us. */
1899-
if (server.active_expire_enabled) {
1903+
if (server.active_expire_enabled && !isImportingExpireDisabled()) {
19001904
if (iAmMaster()) {
19011905
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
19021906
} else {
@@ -2523,7 +2527,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
25232527

25242528
/* Run a fast expire cycle (the called function will return
25252529
* ASAP if a fast cycle is not needed). */
2526-
if (server.active_expire_enabled && server.masterhost == NULL)
2530+
if (server.active_expire_enabled && !isImportingExpireDisabled() && server.masterhost == NULL)
25272531
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
25282532

25292533
/* Unblock all the clients blocked for synchronous replication
@@ -3548,6 +3552,8 @@ void InitServerLast() {
35483552
initThreadedIO();
35493553
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
35503554
server.initial_memory_usage = zmalloc_used_memory();
3555+
server.importing_expire_enabled = 1;
3556+
server.importing_end_time = 0;
35513557
}
35523558

35533559
/* Parse the flags string description 'strflags' and set them to the
@@ -4823,6 +4829,115 @@ NULL
48234829
}
48244830
}
48254831

4832+
/* The import command, only recommended to use in target server
4833+
* during keys migration. It means time-based importing mode in server.
4834+
* As default, Some action will not be executed during this mode,
4835+
* which are:
4836+
* 1. expire,
4837+
4838+
* import <subcommand> [[arg] [value] [opt] ...]
4839+
4840+
* subcommand supported:
4841+
* import start [ttl]
4842+
* import end
4843+
* imoort status
4844+
* import set ((ttl <seconds>) | ( expire < 1|0 > ) )
4845+
* import get (ttl | expire)
4846+
*
4847+
* */
4848+
4849+
static inline int isImporting() {
4850+
return server.importing_end_time >= server.mstime;
4851+
}
4852+
4853+
void importCommand(client *c) {
4854+
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
4855+
const char *help[] = {
4856+
"start [ttl]",
4857+
" importing mode start with seconds of ttl, default ttl as 60s,",
4858+
" expire is default disabled.",
4859+
"end",
4860+
" importing mode end.",
4861+
"status",
4862+
" return 1 in importing mode, return 0 if not.",
4863+
"set ((ttl <seconds>) | ( expire < 1|0 > > ) )",
4864+
" set some options.",
4865+
"get (ttl | expire)",
4866+
" get some options.",
4867+
NULL};
4868+
addReplyHelp(c, help);
4869+
} else if ((c->argc == 2 || c->argc == 3) && !strcasecmp(c->argv[1]->ptr,"start")) {
4870+
if (!isImporting()) {
4871+
/* if importing mode already off, sub-status will not be inherited,
4872+
* which is reset to default value.
4873+
*/
4874+
server.importing_expire_enabled = 0;
4875+
}
4876+
4877+
long long ttl;
4878+
if (c->argc == 2) {
4879+
ttl = 3600;
4880+
} else if (c->argc == 3) {
4881+
if (getLongLongFromObjectOrReply(c, c->argv[2], &ttl, NULL) != C_OK) {
4882+
return;
4883+
}
4884+
}
4885+
server.importing_end_time = mstime() + ttl * 1000;
4886+
addReply(c,shared.ok);
4887+
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"end")) {
4888+
if (!isImporting()) {
4889+
addReplyError(c,"Importing mode already ended.");
4890+
return;
4891+
}
4892+
server.importing_end_time = -1;
4893+
addReply(c,shared.ok);
4894+
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"status")) {
4895+
if (isImporting()) {
4896+
addReplyLongLong(c, 1);
4897+
} else {
4898+
addReplyLongLong(c, 0);
4899+
}
4900+
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"get")) {
4901+
if (!isImporting()) {
4902+
addReplyError(c,"IMPORT GET must be called in importing mode.");
4903+
return;
4904+
}
4905+
4906+
if (!strcasecmp(c->argv[2]->ptr,"ttl")) {
4907+
addReplyLongLong(c, (server.importing_end_time - mstime()) / 1000);
4908+
} else if (!strcasecmp(c->argv[2]->ptr,"expire")) {
4909+
addReplyLongLong(c, (long long)server.importing_expire_enabled);
4910+
} else {
4911+
addReplyError(c,"Invalid option.");
4912+
}
4913+
} else if (c->argc == 4 && !strcasecmp(c->argv[1]->ptr,"set")) {
4914+
if (!isImporting()) {
4915+
addReplyError(c,"IMPORT SET must be called in importing mode.");
4916+
return;
4917+
}
4918+
4919+
if (!strcasecmp(c->argv[2]->ptr,"ttl")) {
4920+
long long ttl;
4921+
if (getLongLongFromObjectOrReply(c, c->argv[3], &ttl, NULL) != C_OK) {
4922+
return;
4923+
}
4924+
server.importing_end_time = mstime() + ttl * 1000;
4925+
addReply(c,shared.ok);
4926+
} else if (!strcasecmp(c->argv[2]->ptr,"expire")) {
4927+
server.importing_expire_enabled = (atoi(c->argv[3]->ptr) != 0);
4928+
addReply(c,shared.ok);
4929+
} else {
4930+
addReplyError(c,"Invalid option.");
4931+
}
4932+
} else {
4933+
addReplyError(c,"Invalid subcommand.");
4934+
}
4935+
}
4936+
4937+
int isImportingExpireDisabled() {
4938+
return (isImporting() && (server.importing_expire_enabled == 0));
4939+
}
4940+
48264941
/* Convert an amount of bytes into a human readable string in the form
48274942
* of 100B, 2G, 100M, 4K, and so forth. */
48284943
void bytesToHuman(char *s, unsigned long long n) {
@@ -5288,6 +5403,8 @@ sds genRedisInfoString(const char *section) {
52885403
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
52895404
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
52905405

5406+
long long importing_ttl = isImporting()? (server.importing_end_time - mstime()):0;
5407+
52915408
if (sections++) info = sdscat(info,"\r\n");
52925409
info = sdscatprintf(info,
52935410
"# Stats\r\n"
@@ -5328,7 +5445,8 @@ sds genRedisInfoString(const char *section) {
53285445
"total_reads_processed:%lld\r\n"
53295446
"total_writes_processed:%lld\r\n"
53305447
"io_threaded_reads_processed:%lld\r\n"
5331-
"io_threaded_writes_processed:%lld\r\n",
5448+
"io_threaded_writes_processed:%lld\r\n"
5449+
"importing:status=%d,ttl=%lld,expire=%d\r\n",
53325450
server.stat_numconnections,
53335451
server.stat_numcommands,
53345452
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -5366,7 +5484,8 @@ sds genRedisInfoString(const char *section) {
53665484
stat_total_reads_processed,
53675485
stat_total_writes_processed,
53685486
server.stat_io_reads_processed,
5369-
server.stat_io_writes_processed);
5487+
server.stat_io_writes_processed,
5488+
isImporting(),importing_ttl,server.importing_expire_enabled);
53705489
}
53715490

53725491
/* Replication */

src/server.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,6 +1747,10 @@ struct redisServer {
17471747
long long gtid_ignored_cmd_count;
17481748
long long gtid_executed_cmd_count;
17491749
long long gtid_sync_stat[GTID_SYNC_TYPES];
1750+
1751+
/* importing mode */
1752+
mstime_t importing_end_time; /* in milliseconds */
1753+
int importing_expire_enabled;
17501754
};
17511755

17521756
#define MAX_KEYS_BUFFER 256
@@ -2909,6 +2913,10 @@ void stralgoCommand(client *c);
29092913
void resetCommand(client *c);
29102914
void failoverCommand(client *c);
29112915

2916+
/* importing mode */
2917+
void importCommand(client *c);
2918+
int isImportingExpireDisabled();
2919+
29122920
#if defined(__GNUC__)
29132921
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
29142922
void free(void *ptr) __attribute__ ((deprecated));

tests/test_helper.tcl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ set ::disk_tests {
148148
swap/unit/slowlog
149149
swap/unit/scripting
150150
swap/unit/monitor
151+
unit/import_mode
151152
}
152153

153154
set ::all_tests [concat $::gtid_tests $::all_tests]

0 commit comments

Comments
 (0)