diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 415fca52dd6..7d96e1cf1de 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [ xredis_2_ror ] + branches: [ main,deprecated/xredis_2_ror_v1 ] pull_request: - branches: [ xredis_2_ror ] + branches: [ main,deprecated/xredis_2_ror_v1 ] jobs: unit: diff --git a/redis.conf b/redis.conf index 14faaa8fd0d..9276dcbf639 100644 --- a/redis.conf +++ b/redis.conf @@ -2422,9 +2422,9 @@ rocksdb.ratelimiter.rate_per_sec 1024mb # rocksdb.data.max_bytes_for_level_base 512mb # rocksdb.meta.max_bytes_for_level_base 256mb -# Default: 10 +# Default: # -# rocksdb.data.max_bytes_for_level_multiplier 10 +# rocksdb.data.max_bytes_for_level_multiplier 8 # rocksdb.meta.max_bytes_for_level_multiplier 10 # If true, RocksDB will pick target size of each level dynamically. @@ -2500,9 +2500,9 @@ rocksdb.ratelimiter.rate_per_sec 1024mb # # max_bytes_for_level_multiplier_additional is ignored with this flag on. # -# Default: no +# Default: # -# rocksdb.data.compaction_dynamic_level_bytes no +# rocksdb.data.compaction_dynamic_level_bytes yes # rocksdb.meta.compaction_dynamic_level_bytes no # If suggest_compact_deletion_percentage > 0, using CompactOnDeletionCollector, diff --git a/src/config.c b/src/config.c index 15eac554035..c0298c6df65 100644 --- a/src/config.c +++ b/src/config.c @@ -2612,6 +2612,17 @@ static int updateRocksdbMetaBlobGarbageCollectionForceThresholdPercentage(long l return updateRocksdbCFOptionPersent(META_CF, "blob_garbage_collection_force_threshold", val, err); } +static int updateRocksdbDataLevel0FileNumCompactionTrigger(long long val, long long prev, const char **err) { + UNUSED(prev); + return updateRocksdbCFOptionNumber(DATA_CF, "level0_file_num_compaction_trigger", val, err); + updateRocksdbCFOptionNumber(SCORE_CF, "level0_file_num_compaction_trigger", val, err); +} + +static int updateRocksdbMetaLevel0FileNumCompactionTrigger(long long val, long long prev, const char **err) { + UNUSED(prev); + return updateRocksdbCFOptionNumber(META_CF, "level0_file_num_compaction_trigger", val, err); +} + const char *rocksdbCompressionTypeName(int val) { const char *name = configEnumGetNameOrUnknown(rocksdb_compression_enum, val); if (!strcmp(name, "no")) { @@ -2912,7 +2923,7 @@ standardConfig configs[] = { createBoolConfig("rocksdb.enable_pipelined_write", NULL, IMMUTABLE_CONFIG, server.rocksdb_enable_pipelined_write, 0, NULL, NULL), createBoolConfig("rocksdb.data.disable_auto_compactions", "rocksdb.disable_auto_compactions", MODIFIABLE_CONFIG, server.rocksdb_data_disable_auto_compactions, 0, NULL, updateRocksdbDataDisableAutoCompactions), createBoolConfig("rocksdb.meta.disable_auto_compactions", NULL, MODIFIABLE_CONFIG, server.rocksdb_meta_disable_auto_compactions, 0, NULL, updateRocksdbMetaDisableAutoCompactions), - createBoolConfig("rocksdb.data.compaction_dynamic_level_bytes", "rocksdb.compaction_dynamic_level_bytes", IMMUTABLE_CONFIG, server.rocksdb_data_compaction_dynamic_level_bytes, 0, NULL, NULL), + createBoolConfig("rocksdb.data.compaction_dynamic_level_bytes", "rocksdb.compaction_dynamic_level_bytes", IMMUTABLE_CONFIG, server.rocksdb_data_compaction_dynamic_level_bytes, 1, NULL, NULL), createBoolConfig("rocksdb.meta.compaction_dynamic_level_bytes", NULL, IMMUTABLE_CONFIG, server.rocksdb_meta_compaction_dynamic_level_bytes, 0, NULL, NULL), createBoolConfig("rocksdb.data.enable_blob_files", "rocksdb.enable_blob_files", MODIFIABLE_CONFIG, server.rocksdb_data_enable_blob_files, 0, NULL, updateRocksdbDataEnableBlobFiles), createBoolConfig("rocksdb.meta.enable_blob_files", NULL, MODIFIABLE_CONFIG, server.rocksdb_meta_enable_blob_files, 0, NULL, updateRocksdbMetaEnableBlobFiles), @@ -3034,7 +3045,7 @@ standardConfig configs[] = { createIntConfig("rocksdb.meta.block_size", NULL, IMMUTABLE_CONFIG, 512, INT_MAX, server.rocksdb_meta_block_size, 8192, INTEGER_CONFIG, NULL, NULL), createIntConfig("rocksdb.data.level0_slowdown_writes_trigger", "rocksdb.level0_slowdown_writes_trigger", MODIFIABLE_CONFIG, 1, INT_MAX, server.rocksdb_data_level0_slowdown_writes_trigger, 20, INTEGER_CONFIG, NULL, updateRocksdbDataLevel0SlowdownWritesTrigger), createIntConfig("rocksdb.meta.level0_slowdown_writes_trigger", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.rocksdb_meta_level0_slowdown_writes_trigger, 20, INTEGER_CONFIG, NULL, updateRocksdbMetaLevel0SlowdownWritesTrigger), - createIntConfig("rocksdb.data.max_bytes_for_level_multiplier", "rocksdb.max_bytes_for_level_multiplier", MODIFIABLE_CONFIG, 1, INT_MAX, server.rocksdb_data_max_bytes_for_level_multiplier, 10, INTEGER_CONFIG, NULL, updateRocksdbDataMaxBytesForLevelMultiplier), + createIntConfig("rocksdb.data.max_bytes_for_level_multiplier", "rocksdb.max_bytes_for_level_multiplier", MODIFIABLE_CONFIG, 1, INT_MAX, server.rocksdb_data_max_bytes_for_level_multiplier, 8, INTEGER_CONFIG, NULL, updateRocksdbDataMaxBytesForLevelMultiplier), createIntConfig("rocksdb.meta.max_bytes_for_level_multiplier", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.rocksdb_meta_max_bytes_for_level_multiplier, 10, INTEGER_CONFIG, NULL, updateRocksdbMetaMaxBytesForLevelMultiplier), createIntConfig("rocksdb.data.suggest_compact_deletion_percentage", "rocksdb.suggest_compact_deletion_percentage", IMMUTABLE_CONFIG, 0, 100, server.rocksdb_data_suggest_compact_deletion_percentage, 95, INTEGER_CONFIG, NULL, NULL), createIntConfig("rocksdb.meta.suggest_compact_deletion_percentage", NULL, IMMUTABLE_CONFIG, 0, 100, server.rocksdb_meta_suggest_compact_deletion_percentage, 95, INTEGER_CONFIG, NULL, NULL), @@ -3042,8 +3053,10 @@ standardConfig configs[] = { createIntConfig("rocksdb.WAL_size_limit_MB", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.rocksdb_WAL_size_limit_MB, 16384, INTEGER_CONFIG, NULL, NULL), createIntConfig("rocksdb.data.blob_garbage_collection_age_cutoff_percentage", "rocksdb.blob_garbage_collection_age_cutoff_percentage", MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_data_blob_garbage_collection_age_cutoff_percentage, 5, INTEGER_CONFIG, NULL, updateRocksdbDataBlobGarbageCollectionAgeCutoffPercentage), createIntConfig("rocksdb.meta.blob_garbage_collection_age_cutoff_percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_meta_blob_garbage_collection_age_cutoff_percentage, 5, INTEGER_CONFIG, NULL, updateRocksdbMetaBlobGarbageCollectionAgeCutoffPercentage), - createIntConfig("rocksdb.data.blob_garbage_collection_force_threshold_percentage", "rocksdb.blob_garbage_collection_force_threshold_percentage", MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_data_blob_garbage_collection_force_threshold_percentage, 90, INTEGER_CONFIG, NULL, updateRocksdbDataBlobGarbageCollectionForceThresholdPercentage), + createIntConfig("rocksdb.data.blob_garbage_collection_force_threshold_percentage", "rocksdb.blob_garbage_collection_force_threshold_percentage", MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_data_blob_garbage_collection_force_threshold_percentage, 50, INTEGER_CONFIG, NULL, updateRocksdbDataBlobGarbageCollectionForceThresholdPercentage), createIntConfig("rocksdb.meta.blob_garbage_collection_force_threshold_percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_meta_blob_garbage_collection_force_threshold_percentage, 90, INTEGER_CONFIG, NULL, updateRocksdbMetaBlobGarbageCollectionForceThresholdPercentage), + createIntConfig("rocksdb.data.level0_file_num_compaction_trigger", "rocksdb.level0_file_num_compaction_trigger", MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_data_level0_file_num_compaction_trigger, 4, INTEGER_CONFIG, NULL, updateRocksdbDataLevel0FileNumCompactionTrigger), + createIntConfig("rocksdb.meta.level0_file_num_compaction_trigger", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rocksdb_meta_level0_file_num_compaction_trigger, 4, INTEGER_CONFIG, NULL, updateRocksdbMetaLevel0FileNumCompactionTrigger), #endif /* Unsigned int configs */ diff --git a/src/ctrip_swap.c b/src/ctrip_swap.c index 9041be0bfec..26ec1982f21 100644 --- a/src/ctrip_swap.c +++ b/src/ctrip_swap.c @@ -299,15 +299,6 @@ void keyRequestSwapFinished(swapData *data, void *pd, int errcode) { ctx->finished(ctx->c,ctx); } -/* Expired key should delete only if server is master, check expireIfNeeded - * for more details. */ -int keyExpiredAndShouldDelete(redisDb *db, robj *key) { - if (!keyIsExpired(db,key)) return 0; - if (server.masterhost != NULL) return 0; - if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; - return 1; -} - #define NOSWAP_REASON_KEYNOTEXISTS 1 #define NOSWAP_REASON_NOTKEYLEVEL 2 #define NOSWAP_REASON_KEYNOTSUPPORT 3 diff --git a/src/ctrip_swap_cmd.c b/src/ctrip_swap_cmd.c index 4ee5f41a598..5dcee80657b 100644 --- a/src/ctrip_swap_cmd.c +++ b/src/ctrip_swap_cmd.c @@ -1007,6 +1007,9 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { "admin no-script ok-loading fast may-replicate", 0,NULL,NULL,SWAP_NOP,0,0,0,0,0,0,0}, + {"import",importCommand,-2, + "admin use-memory ok-loading fast @dangerous", + 0,NULL,NULL,SWAP_NOP,0,0,0,0,0,0,0}, }; struct SwapDataTypeItem { diff --git a/src/ctrip_swap_data.c b/src/ctrip_swap_data.c index 3ff940819c1..9a34f26e2dd 100644 --- a/src/ctrip_swap_data.c +++ b/src/ctrip_swap_data.c @@ -61,6 +61,8 @@ int swapDataMarkedPropagateExpire(swapData *data) { } static int swapDataExpiredAndShouldDelete(swapData *data) { + if (isImportingExpireDisabled()) return 0; + if (!timestampIsExpired(data->expire)) return 0; if (server.masterhost != NULL) return 0; if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; diff --git a/src/ctrip_swap_expire.c b/src/ctrip_swap_expire.c index fc5e6abe7a6..4550bb39a7d 100644 --- a/src/ctrip_swap_expire.c +++ b/src/ctrip_swap_expire.c @@ -61,11 +61,13 @@ int scanMetaExpireIfNeeded(redisDb *db, scanMeta *meta) { if (server.masterhost != NULL) return 1; if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; - /* Delete the key */ - c = server.swap_expire_clients[db->id]; - key = createStringObject(meta->key,sdslen(meta->key)); - submitExpireClientRequest(c, key, 0); - decrRefCount(key); + if (!isImportingExpireDisabled()) { + /* Delete the key */ + c = server.swap_expire_clients[db->id]; + key = createStringObject(meta->key,sdslen(meta->key)); + submitExpireClientRequest(c, key, 0); + decrRefCount(key); + } return 1; } diff --git a/src/ctrip_swap_rocks.c b/src/ctrip_swap_rocks.c index 0bb1588b227..64020efe082 100644 --- a/src/ctrip_swap_rocks.c +++ b/src/ctrip_swap_rocks.c @@ -96,7 +96,6 @@ static int rocksOpen(rocks *rocks) { rocksdb_options_optimize_for_point_lookup(rocks->db_opts, 1); rocksdb_options_set_min_write_buffer_number_to_merge(rocks->db_opts, 2); - rocksdb_options_set_level0_file_num_compaction_trigger(rocks->db_opts, 2); rocksdb_options_set_max_bytes_for_level_base(rocks->db_opts, 256*MB); rocksdb_options_compaction_readahead_size(rocks->db_opts, 2*1024*1024); /* default 0 */ @@ -152,6 +151,7 @@ static int rocksOpen(rocks *rocks) { rocksdb_options_set_max_bytes_for_level_base(rocks->cf_opts[DATA_CF],server.rocksdb_data_max_bytes_for_level_base); rocksdb_options_set_max_bytes_for_level_multiplier(rocks->cf_opts[DATA_CF], server.rocksdb_data_max_bytes_for_level_multiplier); rocksdb_options_set_level_compaction_dynamic_level_bytes(rocks->cf_opts[DATA_CF], server.rocksdb_data_compaction_dynamic_level_bytes); + rocksdb_options_set_level0_file_num_compaction_trigger(rocks->cf_opts[DATA_CF], server.rocksdb_data_level0_file_num_compaction_trigger); if (server.rocksdb_data_suggest_compact_deletion_percentage) { double deletion_ratio = (double)server.rocksdb_data_suggest_compact_deletion_percentage / 100; rocksdb_options_add_compact_on_deletion_collector_factory(rocks->cf_opts[DATA_CF], @@ -189,6 +189,7 @@ static int rocksOpen(rocks *rocks) { rocksdb_options_set_max_bytes_for_level_base(rocks->cf_opts[SCORE_CF],server.rocksdb_data_max_bytes_for_level_base); rocksdb_options_set_max_bytes_for_level_multiplier(rocks->cf_opts[SCORE_CF], server.rocksdb_data_max_bytes_for_level_multiplier); rocksdb_options_set_level_compaction_dynamic_level_bytes(rocks->cf_opts[SCORE_CF], server.rocksdb_data_compaction_dynamic_level_bytes); + rocksdb_options_set_level0_file_num_compaction_trigger(rocks->cf_opts[SCORE_CF], server.rocksdb_data_level0_file_num_compaction_trigger); if (server.rocksdb_data_suggest_compact_deletion_percentage) { double deletion_ratio = (double)server.rocksdb_data_suggest_compact_deletion_percentage / 100; rocksdb_options_add_compact_on_deletion_collector_factory(rocks->cf_opts[SCORE_CF], @@ -226,6 +227,7 @@ static int rocksOpen(rocks *rocks) { rocksdb_options_set_max_bytes_for_level_base(rocks->cf_opts[META_CF],server.rocksdb_meta_max_bytes_for_level_base); rocksdb_options_set_max_bytes_for_level_multiplier(rocks->cf_opts[META_CF], server.rocksdb_meta_max_bytes_for_level_multiplier); rocksdb_options_set_level_compaction_dynamic_level_bytes(rocks->cf_opts[META_CF], server.rocksdb_meta_compaction_dynamic_level_bytes); + rocksdb_options_set_level0_file_num_compaction_trigger(rocks->cf_opts[META_CF], server.rocksdb_meta_level0_file_num_compaction_trigger); if (server.rocksdb_meta_suggest_compact_deletion_percentage) { double deletion_ratio = (double)server.rocksdb_meta_suggest_compact_deletion_percentage / 100; rocksdb_options_add_compact_on_deletion_collector_factory(rocks->cf_opts[META_CF], diff --git a/src/ctrip_swap_server.h b/src/ctrip_swap_server.h index 349a0cf7b3b..0729cfeda76 100644 --- a/src/ctrip_swap_server.h +++ b/src/ctrip_swap_server.h @@ -258,6 +258,8 @@ typedef struct swapBatchLimitsConfig { int rocksdb_meta_blob_garbage_collection_age_cutoff_percentage; \ int rocksdb_data_blob_garbage_collection_force_threshold_percentage; \ int rocksdb_meta_blob_garbage_collection_force_threshold_percentage; \ + int rocksdb_data_level0_file_num_compaction_trigger; \ + int rocksdb_meta_level0_file_num_compaction_trigger; \ /* swap block*/ \ struct swapUnblockCtx* swap_dependency_block_ctx; \ /* absent cache */ \ diff --git a/src/ctrip_swap_thread.c b/src/ctrip_swap_thread.c index 0d5a8ca0411..7f486c28cdf 100644 --- a/src/ctrip_swap_thread.c +++ b/src/ctrip_swap_thread.c @@ -47,8 +47,8 @@ void *swapThreadMain (void *arg) { listRewind(thread->pending_reqs, &li); processing_reqs = listCreate(); while ((ln = listNext(&li))) { - swapRequest *req = listNodeValue(ln); - listAddNodeHead(processing_reqs, req); + swapRequestBatch *reqs = listNodeValue(ln); + listAddNodeTail(processing_reqs, reqs); listDelNode(thread->pending_reqs, ln); } pthread_mutex_unlock(&thread->lock); diff --git a/src/db.c b/src/db.c index 01a391d00c3..ba8793f3fe2 100644 --- a/src/db.c +++ b/src/db.c @@ -1769,10 +1769,13 @@ int expireIfNeeded(redisDb *db, robj *key) { * have failed over and the new primary will send us the expire. */ if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; - /* Delete the key */ + if (!isImportingExpireDisabled()) { + /* Delete the key */ #ifndef ENABLE_SWAP - deleteExpiredKeyAndPropagate(db,key); + deleteExpiredKeyAndPropagate(db,key); #endif + } + return 1; } diff --git a/src/server.c b/src/server.c index 0eb746f8337..95bc748c052 100644 --- a/src/server.c +++ b/src/server.c @@ -1129,6 +1129,10 @@ struct redisCommand redisCommandTable[] = { {"xslaveof",xslaveofCommand,3, "admin no-script ok-stale", 0,NULL,0,0,0,0,0,0}, + + {"import",importCommand,-2, + "admin use-memory ok-loading fast @dangerous", + 0,NULL,0,0,0,0,0,0}, }; #endif @@ -1896,7 +1900,7 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (server.active_expire_enabled) { + if (server.active_expire_enabled && !isImportingExpireDisabled()) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { @@ -2523,7 +2527,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) + if (server.active_expire_enabled && !isImportingExpireDisabled() && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Unblock all the clients blocked for synchronous replication @@ -3548,6 +3552,8 @@ void InitServerLast() { initThreadedIO(); set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); + server.importing_expire_enabled = 1; + server.importing_end_time = 0; } /* Parse the flags string description 'strflags' and set them to the @@ -4823,6 +4829,115 @@ NULL } } +/* The import command, only recommended to use in target server + * during keys migration. It means time-based importing mode in server. + * As default, Some action will not be executed during this mode, + * which are: + * 1. expire, + + * import [[arg] [value] [opt] ...] + + * subcommand supported: + * import start [ttl] + * import end + * imoort status + * import set ((ttl ) | ( expire < 1|0 > ) ) + * import get (ttl | expire) + * + * */ + +static inline int isImporting() { + return server.importing_end_time >= server.mstime; +} + +void importCommand(client *c) { + if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { + const char *help[] = { + "start [ttl]", + " importing mode start with seconds of ttl, default ttl as 60s,", + " expire is default disabled.", + "end", + " importing mode end.", + "status", + " return 1 in importing mode, return 0 if not.", + "set ((ttl ) | ( expire < 1|0 > > ) )", + " set some options.", + "get (ttl | expire)", + " get some options.", + NULL}; + addReplyHelp(c, help); + } else if ((c->argc == 2 || c->argc == 3) && !strcasecmp(c->argv[1]->ptr,"start")) { + if (!isImporting()) { + /* if importing mode already off, sub-status will not be inherited, + * which is reset to default value. + */ + server.importing_expire_enabled = 0; + } + + long long ttl; + if (c->argc == 2) { + ttl = 3600; + } else if (c->argc == 3) { + if (getLongLongFromObjectOrReply(c, c->argv[2], &ttl, NULL) != C_OK) { + return; + } + } + server.importing_end_time = mstime() + ttl * 1000; + addReply(c,shared.ok); + } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"end")) { + if (!isImporting()) { + addReplyError(c,"Importing mode already ended."); + return; + } + server.importing_end_time = -1; + addReply(c,shared.ok); + } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"status")) { + if (isImporting()) { + addReplyLongLong(c, 1); + } else { + addReplyLongLong(c, 0); + } + } else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"get")) { + if (!isImporting()) { + addReplyError(c,"IMPORT GET must be called in importing mode."); + return; + } + + if (!strcasecmp(c->argv[2]->ptr,"ttl")) { + addReplyLongLong(c, (server.importing_end_time - mstime()) / 1000); + } else if (!strcasecmp(c->argv[2]->ptr,"expire")) { + addReplyLongLong(c, (long long)server.importing_expire_enabled); + } else { + addReplyError(c,"Invalid option."); + } + } else if (c->argc == 4 && !strcasecmp(c->argv[1]->ptr,"set")) { + if (!isImporting()) { + addReplyError(c,"IMPORT SET must be called in importing mode."); + return; + } + + if (!strcasecmp(c->argv[2]->ptr,"ttl")) { + long long ttl; + if (getLongLongFromObjectOrReply(c, c->argv[3], &ttl, NULL) != C_OK) { + return; + } + server.importing_end_time = mstime() + ttl * 1000; + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr,"expire")) { + server.importing_expire_enabled = (atoi(c->argv[3]->ptr) != 0); + addReply(c,shared.ok); + } else { + addReplyError(c,"Invalid option."); + } + } else { + addReplyError(c,"Invalid subcommand."); + } +} + +int isImportingExpireDisabled() { + return (isImporting() && (server.importing_expire_enabled == 0)); +} + /* Convert an amount of bytes into a human readable string in the form * of 100B, 2G, 100M, 4K, and so forth. */ void bytesToHuman(char *s, unsigned long long n) { @@ -5288,6 +5403,8 @@ sds genRedisInfoString(const char *section) { atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); + long long importing_ttl = isImporting()? (server.importing_end_time - mstime()):0; + if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Stats\r\n" @@ -5328,7 +5445,8 @@ sds genRedisInfoString(const char *section) { "total_reads_processed:%lld\r\n" "total_writes_processed:%lld\r\n" "io_threaded_reads_processed:%lld\r\n" - "io_threaded_writes_processed:%lld\r\n", + "io_threaded_writes_processed:%lld\r\n" + "importing:status=%d,ttl=%lld,expire=%d\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -5366,7 +5484,8 @@ sds genRedisInfoString(const char *section) { stat_total_reads_processed, stat_total_writes_processed, server.stat_io_reads_processed, - server.stat_io_writes_processed); + server.stat_io_writes_processed, + isImporting(),importing_ttl,server.importing_expire_enabled); } /* Replication */ diff --git a/src/server.h b/src/server.h index 9812915e6e3..2788cdf1da2 100644 --- a/src/server.h +++ b/src/server.h @@ -1747,6 +1747,10 @@ struct redisServer { long long gtid_ignored_cmd_count; long long gtid_executed_cmd_count; long long gtid_sync_stat[GTID_SYNC_TYPES]; + + /* importing mode */ + mstime_t importing_end_time; /* in milliseconds */ + int importing_expire_enabled; }; #define MAX_KEYS_BUFFER 256 @@ -2909,6 +2913,10 @@ void stralgoCommand(client *c); void resetCommand(client *c); void failoverCommand(client *c); +/* importing mode */ +void importCommand(client *c); +int isImportingExpireDisabled(); + #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); void free(void *ptr) __attribute__ ((deprecated)); diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d3fe17c0e6e..c0de2d1a89b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -148,6 +148,7 @@ set ::disk_tests { swap/unit/slowlog swap/unit/scripting swap/unit/monitor + unit/import_mode } set ::all_tests [concat $::gtid_tests $::all_tests] diff --git a/tests/unit/import_mode.tcl b/tests/unit/import_mode.tcl new file mode 100644 index 00000000000..13256078ccd --- /dev/null +++ b/tests/unit/import_mode.tcl @@ -0,0 +1,191 @@ +start_server {tags {"import mode"} overrides {}} { + test "import start end" { + + r import start + assert_equal [r import status] {1} + assert_range [r import get ttl] 3595 3600 + + assert_equal [r import end] {OK} + assert_equal [r import status] {0} + + r import start 120 + assert_equal [r import status] {1} + assert_range [r import get ttl] 115 120 + + r import end + assert_equal [r import status] {0} + + } + + test "import mode restart" { + + r import start + assert_equal [r import status] {1} + + # not yet ending, restarting directing + r import start 120 + assert_range [r import get ttl] 115 120 + + } + + test "import set get " { + + r import start + assert_equal [r import set ttl 300] {OK} + assert_range [r import get ttl] 295 300 + + assert_equal [r import get expire] {0} + + assert_equal [r import set expire 1] {OK} + assert_equal [r import get expire] {1} + + # not ending, restart, options will be inherited + r import start + assert_equal [r import get expire] {1} + + # after ending, then starting, options are default value + r import end + + r import start + assert_equal [r import get expire] {0} + + r import end + } + + test "import passive-expire " { + r flushdb + + r setex key1 1 k + r setex key2 1 k + r setex key3 1 k + + r import start + + + after 1500 + + assert_equal [r get key1] {} + + assert_equal [r dbsize] {3} + + assert_equal [r import set expire 1] {OK} + + assert_equal [r get key1] {} + assert_equal [r get key2] {} + assert_equal [r get key3] {} + + assert_equal [r dbsize] {0} + + assert_equal [r import set expire 0] {OK} + + r setex key1 1 k + + after 1500 + assert_equal [r get key1] {} + + assert_equal [r dbsize] {1} + + r import end + } + + test "import active-expire " { + r flushdb + + r import start + + assert_equal [r import set expire 1] {OK} + assert_equal [r import get expire] {1} + + r setex key1 1 k + r setex key2 1 k + r setex key3 1 k + + after 1500 + + assert_equal [r dbsize] {0} + + assert_equal [r get key1] {} + + assert_equal [r import set expire 0] {OK} + + r setex key1 1 k + r setex key2 1 k + r setex key3 1 k + + after 1500 + + assert_equal [r get key1] {} + + assert_equal [r dbsize] {3} + + r import end + } + + test "import expire cold keys " { + r flushdb + + r import start + + r setex key1 1 1 + r swap.evict key1 + wait_key_cold r key1 + + after 1500 + + assert_equal [r dbsize] {1} + assert_equal [r get key1] {} + assert_equal [r dbsize] {1} + r import end + + assert_equal [r get key1] {} + assert_equal [r dbsize] {0} + } + + test "import end works " { + + r flushdb + + r setex key1 1 k + r setex key2 1 k + r setex key3 1 k + + r import start + + after 1500 + assert_equal [r dbsize] {3} + + r import end + + after 1500 + assert_equal [r dbsize] {0} + } + + test "illegal operation " { + + r import start + + assert_equal [r import get expire] {0} + + assert_equal [r import set expire 1] {OK} + + assert_equal [r import set expire no] {OK} + + assert_error "*Invalid option*" {r import get auto-compaction} + + assert_error "*not an integer*" {r import set ttl no} + + assert_error "*Invalid option*" {r import set auto-compaction ok} + + r import end + + assert_error "*already ended*" {r import end} + + assert_error "*IMPORT GET must be*" {r import get expire} + + assert_error "*IMPORT SET must be*" {r import set expire 1} + + assert_error "*not an integer*" {r import start six} + + assert_error "*Invalid subcommand*" {r import set auto-compaction ok no 1} + } +} \ No newline at end of file