Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ config.h.in~
config.h
config.sub
configure
configure~
compile
depcomp
install-sh
Expand Down Expand Up @@ -33,9 +34,18 @@ tests/tls/*
*.txt
!/tests/test_requirements.txt
__pycache__
*.csv
*.json

# Code coverage with lcov/gcov
*.gcno
*.gcov
*.gcda
*.info

# redis related
*.rdb
*.aof
appendonlydir/
*.conf

22 changes: 22 additions & 0 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,18 @@ int client_group::create_clients(int num)
}

m_clients.push_back(c);

// Add jitter between connection creation (except for the last connection)
if (i < num - 1 && m_config->thread_conn_start_max_jitter_micros > 0) {
unsigned int jitter_range = m_config->thread_conn_start_max_jitter_micros - m_config->thread_conn_start_min_jitter_micros;
unsigned int jitter_micros = m_config->thread_conn_start_min_jitter_micros;

if (jitter_range > 0) {
jitter_micros += rand() % (jitter_range + 1);
}

usleep(jitter_micros);
}
}

return num;
Expand Down Expand Up @@ -714,6 +726,16 @@ unsigned long int client_group::get_duration_usec(void)
return duration;
}

unsigned long int client_group::get_total_connection_errors(void)
{
unsigned long int total_errors = 0;
for (std::vector<client*>::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
total_errors += (*i)->get_stats()->get_total_connection_errors();
}

return total_errors;
}

void client_group::merge_run_stats(run_stats* target)
{
assert(target != NULL);
Expand Down
3 changes: 2 additions & 1 deletion client.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,13 @@ class client_group {
struct event_base *get_event_base(void) { return m_base; }
benchmark_config *get_config(void) { return m_config; }
abstract_protocol* get_protocol(void) { return m_protocol; }
object_generator* get_obj_gen(void) { return m_obj_gen; }
object_generator* get_obj_gen(void) { return m_obj_gen; }

unsigned long int get_total_bytes(void);
unsigned long int get_total_ops(void);
unsigned long int get_total_latency(void);
unsigned long int get_duration_usec(void);
unsigned long int get_total_connection_errors(void);

void merge_run_stats(run_stats* target);
};
Expand Down
2 changes: 1 addition & 1 deletion cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ bool cluster_client::connect_shard_connection(shard_connection* sc, char* addres
memcpy(ci.addr_buf, addr_info->ai_addr, addr_info->ai_addrlen);
ci.ci_addr = (struct sockaddr *) ci.addr_buf;
ci.ci_addrlen = addr_info->ai_addrlen;

freeaddrinfo(addr_info);

// call connect
Expand Down Expand Up @@ -497,4 +498,3 @@ void cluster_client::handle_response(unsigned int conn_id, struct timeval timest
// continue with base class
client::handle_response(conn_id, timestamp, request, response);
}

18 changes: 18 additions & 0 deletions memtier_benchmark.1
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,24 @@ Number of concurrent pipelined requests (default: 1)
\fB\-\-reconnect\-interval\fR=\fI\,NUM\/\fR
Number of requests after which re\-connection is performed
.TP
\fB\-\-reconnect\-on\-error\fR
Enable automatic reconnection on connection errors (default: disabled)
.TP
\fB\-\-max\-reconnect\-attempts\fR=\fI\,NUM\/\fR
Maximum number of reconnection attempts, 0 for unlimited (default: 0)
.TP
\fB\-\-reconnect\-backoff\-factor\fR=\fI\,NUM\/\fR
Backoff factor for reconnection delays, 0 for no backoff (default: 0)
.TP
\fB\-\-connection\-timeout\fR=\fI\,SECS\/\fR
Connection timeout in seconds, 0 to disable (default: 0)
.TP
\fB\-\-thread\-conn\-start\-min\-jitter\-micros\fR=\fI\,NUM\/\fR
Minimum jitter in microseconds between connection creation (default: 0)
.TP
\fB\-\-thread\-conn\-start\-max\-jitter\-micros\fR=\fI\,NUM\/\fR
Maximum jitter in microseconds between connection creation (default: 0)
.TP
\fB\-\-multi\-key\-get\fR=\fI\,NUM\/\fR
Enable multi\-key get commands, up to NUM keys (default: 0)
.TP
Expand Down
164 changes: 159 additions & 5 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static void sigint_handler(int signum)
(void)signum; // unused parameter
g_interrupted = 1;
}

void benchmark_log_file_line(int level, const char *filename, unsigned int line, const char *fmt, ...)
{
if (level > log_level)
Expand Down Expand Up @@ -165,6 +166,9 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"key_stddev = %f\n"
"key_median = %f\n"
"reconnect_interval = %u\n"
"connection_timeout = %u\n"
"thread_conn_start_min_jitter_micros = %u\n"
"thread_conn_start_max_jitter_micros = %u\n"
"multi_key_get = %u\n"
"authenticate = %s\n"
"select-db = %d\n"
Expand Down Expand Up @@ -217,6 +221,9 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->key_stddev,
cfg->key_median,
cfg->reconnect_interval,
cfg->connection_timeout,
cfg->thread_conn_start_min_jitter_micros,
cfg->thread_conn_start_max_jitter_micros,
cfg->multi_key_get,
cfg->authenticate ? cfg->authenticate : "",
cfg->select_db,
Expand Down Expand Up @@ -278,6 +285,9 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co
jsonhandler->write_obj("key_median" ,"%f", cfg->key_median);
jsonhandler->write_obj("key_zipf_exp" ,"%f", cfg->key_zipf_exp);
jsonhandler->write_obj("reconnect_interval","%u", cfg->reconnect_interval);
jsonhandler->write_obj("connection_timeout","%u", cfg->connection_timeout);
jsonhandler->write_obj("thread_conn_start_min_jitter_micros","%u", cfg->thread_conn_start_min_jitter_micros);
jsonhandler->write_obj("thread_conn_start_max_jitter_micros","%u", cfg->thread_conn_start_max_jitter_micros);
jsonhandler->write_obj("multi_key_get" ,"%u", cfg->multi_key_get);
jsonhandler->write_obj("authenticate" ,"\"%s\"", cfg->authenticate ? cfg->authenticate : "");
jsonhandler->write_obj("select-db" ,"%d", cfg->select_db);
Expand Down Expand Up @@ -449,6 +459,7 @@ static void config_init_defaults(struct benchmark_config *cfg)
cfg->hdr_prefix = "";
if (!cfg->print_percentiles.is_defined())
cfg->print_percentiles = config_quantiles("50,99,99.9");

#ifdef USE_TLS
if (!cfg->tls_protocols)
cfg->tls_protocols = REDIS_TLS_PROTO_DEFAULT;
Expand Down Expand Up @@ -545,6 +556,12 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_randomize,
o_client_stats,
o_reconnect_interval,
o_reconnect_on_error,
o_max_reconnect_attempts,
o_reconnect_backoff_factor,
o_connection_timeout,
o_thread_conn_start_min_jitter_micros,
o_thread_conn_start_max_jitter_micros,
o_generate_keys,
o_multi_key_get,
o_select_db,
Expand Down Expand Up @@ -623,6 +640,12 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "key-median", 1, 0, o_key_median },
{ "key-zipf-exp", 1, 0, o_key_zipf_exp},
{ "reconnect-interval", 1, 0, o_reconnect_interval },
{ "reconnect-on-error", 0, 0, o_reconnect_on_error },
{ "max-reconnect-attempts", 1, 0, o_max_reconnect_attempts },
{ "reconnect-backoff-factor", 1, 0, o_reconnect_backoff_factor },
{ "connection-timeout", 1, 0, o_connection_timeout },
{ "thread-conn-start-min-jitter-micros", 1, 0, o_thread_conn_start_min_jitter_micros },
{ "thread-conn-start-max-jitter-micros", 1, 0, o_thread_conn_start_max_jitter_micros },
{ "multi-key-get", 1, 0, o_multi_key_get },
{ "authenticate", 1, 0, 'a' },
{ "select-db", 1, 0, o_select_db },
Expand Down Expand Up @@ -933,6 +956,49 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
return -1;
}
break;
case o_reconnect_on_error:
cfg->reconnect_on_error = true;
break;
case o_max_reconnect_attempts:
endptr = NULL;
cfg->max_reconnect_attempts = (unsigned int) strtoul(optarg, &endptr, 10);
if (!endptr || *endptr != '\0') {
fprintf(stderr, "error: max-reconnect-attempts must be a valid number.\n");
return -1;
}
break;
case o_reconnect_backoff_factor:
endptr = NULL;
cfg->reconnect_backoff_factor = strtod(optarg, &endptr);
if (cfg->reconnect_backoff_factor <= 0.0 || !endptr || *endptr != '\0') {
fprintf(stderr, "error: reconnect-backoff-factor must be greater than zero.\n");
return -1;
}
break;
case o_connection_timeout:
endptr = NULL;
cfg->connection_timeout = (unsigned int) strtoul(optarg, &endptr, 10);
if (!endptr || *endptr != '\0') {
fprintf(stderr, "error: connection-timeout must be a valid number.\n");
return -1;
}
break;
case o_thread_conn_start_min_jitter_micros:
endptr = NULL;
cfg->thread_conn_start_min_jitter_micros = (unsigned int) strtoul(optarg, &endptr, 10);
if (!endptr || *endptr != '\0') {
fprintf(stderr, "error: thread-conn-start-min-jitter-micros must be a valid number.\n");
return -1;
}
break;
case o_thread_conn_start_max_jitter_micros:
endptr = NULL;
cfg->thread_conn_start_max_jitter_micros = (unsigned int) strtoul(optarg, &endptr, 10);
if (!endptr || *endptr != '\0') {
fprintf(stderr, "error: thread-conn-start-max-jitter-micros must be a valid number.\n");
return -1;
}
break;
case o_generate_keys:
cfg->generate_keys = 1;
break;
Expand Down Expand Up @@ -1156,6 +1222,12 @@ void usage() {
" --ratio=RATIO Set:Get ratio (default: 1:10)\n"
" --pipeline=NUMBER Number of concurrent pipelined requests (default: 1)\n"
" --reconnect-interval=NUM Number of requests after which re-connection is performed\n"
" --reconnect-on-error Enable automatic reconnection on connection errors (default: disabled)\n"
" --max-reconnect-attempts=NUM Maximum number of reconnection attempts (default: 0, unlimited)\n"
" --reconnect-backoff-factor=NUM Backoff factor for reconnection delays (default: 0, no backoff)\n"
" --connection-timeout=SECS Connection timeout in seconds, 0 to disable (default: 0)\n"
" --thread-conn-start-min-jitter-micros=NUM Minimum jitter in microseconds between connection creation (default: 0)\n"
" --thread-conn-start-max-jitter-micros=NUM Maximum jitter in microseconds between connection creation (default: 0)\n"
" --multi-key-get=NUM Enable multi-key get commands, up to NUM keys (default: 0)\n"
" --select-db=DB DB number to select, when testing a redis server\n"
" --distinct-client-seed Use a different random seed for each client\n"
Expand Down Expand Up @@ -1235,9 +1307,12 @@ struct cg_thread {
abstract_protocol* m_protocol;
pthread_t m_thread;
std::atomic<bool> m_finished; // Atomic to prevent data race between worker thread write and main thread read
bool m_restart_requested;
unsigned int m_restart_count;

cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) :
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false)
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL),
m_finished(false), m_restart_requested(false), m_restart_count(0)
{
m_protocol = protocol_factory(m_config->protocol);
assert(m_protocol != NULL);
Expand Down Expand Up @@ -1276,13 +1351,57 @@ struct cg_thread {
assert(ret == 0);
}

int restart(void)
{
// Clean up existing client group
if (m_cg != NULL) {
delete m_cg;
}

// Create new client group
m_cg = new client_group(m_config, m_protocol, m_obj_gen);

// Prepare new clients
if (m_cg->create_clients(m_config->clients) < (int) m_config->clients)
return -1;
if (m_cg->prepare() < 0)
return -1;

// Reset state
m_finished = false;
m_restart_requested = false;
m_restart_count++;

// Start new thread
return pthread_create(&m_thread, NULL, cg_thread_start, (void *)this);
}

};

static void* cg_thread_start(void *t)
{
cg_thread* thread = (cg_thread*) t;
thread->m_cg->run();
thread->m_finished = true;

try {
thread->m_cg->run();

// Check if we should restart due to connection failures
// If the thread finished but still has time left and connection errors, request restart
if (thread->m_cg->get_total_connection_errors() > 0) {
benchmark_error_log("Thread %u finished due to connection failures, requesting restart.\n", thread->m_thread_id);
thread->m_restart_requested = true;
}

thread->m_finished = true;
} catch (const std::exception& e) {
benchmark_error_log("Thread %u caught exception: %s\n", thread->m_thread_id, e.what());
thread->m_finished = true;
thread->m_restart_requested = true;
} catch (...) {
benchmark_error_log("Thread %u caught unknown exception\n", thread->m_thread_id);
thread->m_finished = true;
thread->m_restart_requested = true;
}

return t;
}
Expand Down Expand Up @@ -1364,14 +1483,32 @@ run_stats run_benchmark(int run_id, benchmark_config* cfg, object_generator* obj
unsigned long int duration = 0;
unsigned int thread_counter = 0;
unsigned long int total_latency = 0;
unsigned long int total_connection_errors = 0;

for (std::vector<cg_thread*>::iterator i = threads.begin(); i != threads.end(); i++) {
// Check if thread needs restart
if ((*i)->m_finished && (*i)->m_restart_requested && (*i)->m_restart_count < 5) {
benchmark_error_log("Restarting thread %u (restart #%u)...\n",
(*i)->m_thread_id, (*i)->m_restart_count + 1);

// Join the failed thread first
(*i)->join();

// Attempt to restart
if ((*i)->restart() == 0) {
benchmark_error_log("Thread %u restarted successfully.\n", (*i)->m_thread_id);
} else {
benchmark_error_log("Failed to restart thread %u.\n", (*i)->m_thread_id);
}
}

if (!(*i)->m_finished)
active_threads++;

total_ops += (*i)->m_cg->get_total_ops();
total_bytes += (*i)->m_cg->get_total_bytes();
total_latency += (*i)->m_cg->get_total_latency();
total_connection_errors += (*i)->m_cg->get_total_connection_errors();
thread_counter++;
float factor = ((float)(thread_counter - 1) / thread_counter);
duration = factor * duration + (float)(*i)->m_cg->get_duration_usec() / thread_counter ;
Expand Down Expand Up @@ -1410,8 +1547,14 @@ run_stats run_benchmark(int run_id, benchmark_config* cfg, object_generator* obj
else
progress = 100.0 * (duration / 1000000.0)/cfg->test_time;

fprintf(stderr, "[RUN #%u %.0f%%, %3u secs] %2u threads: %11lu ops, %7lu (avg: %7lu) ops/sec, %s/sec (avg: %s/sec), %5.2f (avg: %5.2f) msec latency\r",
run_id, progress, (unsigned int) (duration / 1000000), active_threads, total_ops, cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
// Only show connection errors if there are any (backwards compatible output)
if (total_connection_errors > 0) {
fprintf(stderr, "[RUN #%u %.0f%%, %3u secs] %2u threads %2u conns %lu conn errors: %11lu ops, %7lu (avg: %7lu) ops/sec, %s/sec (avg: %s/sec), %5.2f (avg: %5.2f) msec latency\r",
run_id, progress, (unsigned int) (duration / 1000000), active_threads, cfg->clients, total_connection_errors, total_ops, cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
} else {
fprintf(stderr, "[RUN #%u %.0f%%, %3u secs] %2u threads %2u conns: %11lu ops, %7lu (avg: %7lu) ops/sec, %s/sec (avg: %s/sec), %5.2f (avg: %5.2f) msec latency\r",
run_id, progress, (unsigned int) (duration / 1000000), active_threads, cfg->clients, total_ops, cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
}
} while (active_threads > 0);

fprintf(stderr, "\n\n");
Expand Down Expand Up @@ -1569,6 +1712,14 @@ int main(int argc, char *argv[])
}

config_init_defaults(&cfg);

// Validate jitter parameters
if (cfg.thread_conn_start_min_jitter_micros > cfg.thread_conn_start_max_jitter_micros) {
fprintf(stderr, "error: thread-conn-start-min-jitter-micros (%u) cannot be greater than thread-conn-start-max-jitter-micros (%u).\n",
cfg.thread_conn_start_min_jitter_micros, cfg.thread_conn_start_max_jitter_micros);
exit(1);
}

log_level = cfg.debug;
if (cfg.show_config) {
fprintf(stderr, "============== Configuration values: ==============\n");
Expand Down Expand Up @@ -1981,6 +2132,9 @@ int main(int argc, char *argv[])
}

if (jsonhandler != NULL) {
// Log message for saving JSON file
fprintf(stderr, "Saving JSON output file: %s\n", cfg.json_out_file);

// closing the JSON
delete jsonhandler;
}
Expand Down
Loading
Loading