From cd28af3447f7e648d537d94a2398aada00d3995f Mon Sep 17 00:00:00 2001 From: Deukyeon Hwang Date: Wed, 10 Aug 2022 14:44:48 -0700 Subject: [PATCH 01/15] Adding transactional_splinterdb into YCSB - Each operation takes `Transaction`. - `DoTransaction(Transaction *)` runs the given number of operations. And the number of operations is set by the property, `opspertransaction`. --- core/client.h | 93 ++++++----- core/core_workload.cc | 5 + core/core_workload.h | 6 +- core/db.h | 108 +++++++++++++ core/transaction.h | 12 ++ db/db_factory.cc | 3 + db/transactional_splinter_db.cc | 267 ++++++++++++++++++++++++++++++++ db/transactional_splinter_db.h | 112 ++++++++++++++ 8 files changed, 565 insertions(+), 41 deletions(-) create mode 100644 core/transaction.h create mode 100644 db/transactional_splinter_db.cc create mode 100644 db/transactional_splinter_db.h diff --git a/core/client.h b/core/client.h index 2a97aa2c..4b249dd9 100644 --- a/core/client.h +++ b/core/client.h @@ -30,11 +30,11 @@ class Client { protected: - virtual int TransactionRead(); - virtual int TransactionReadModifyWrite(); - virtual int TransactionScan(); - virtual int TransactionUpdate(); - virtual int TransactionInsert(); + virtual int TransactionRead(Transaction *txn); + virtual int TransactionReadModifyWrite(Transaction *txn); + virtual int TransactionScan(Transaction *txn); + virtual int TransactionUpdate(Transaction *txn); + virtual int TransactionInsert(Transaction *txn); DB &db_; CoreWorkload &workload_; @@ -45,48 +45,61 @@ class Client { inline bool Client::DoInsert() { workload_.NextSequenceKey(key); workload_.UpdateValues(pairs); - return (db_.Insert(workload_.NextTable(), key, pairs) == DB::kOK); + int status = -1; + Transaction *txn = NULL; + do { + db_.Begin(&txn); + status = db_.Insert(txn, workload_.NextTable(), key, pairs); + } while (db_.Commit(&txn) == DB::kErrorConflict); + return (status == DB::kOK); } inline bool Client::DoTransaction() { int status = -1; - switch (workload_.NextOperation()) { - case READ: - status = TransactionRead(); - break; - case UPDATE: - status = TransactionUpdate(); - break; - case INSERT: - status = TransactionInsert(); - break; - case SCAN: - status = TransactionScan(); - break; - case READMODIFYWRITE: - status = TransactionReadModifyWrite(); - break; - default: - throw utils::Exception("Operation request is not recognized!"); - } - assert(status >= 0); + Transaction *txn = NULL; + do { + db_.Begin(&txn); + + for (int i = 0; i < workload_.ops_per_transaction(); ++i) { + switch (workload_.NextOperation()) { + case READ: + status = TransactionRead(txn); + break; + case UPDATE: + status = TransactionUpdate(txn); + break; + case INSERT: + status = TransactionInsert(txn); + break; + case SCAN: + status = TransactionScan(txn); + break; + case READMODIFYWRITE: + status = TransactionReadModifyWrite(txn); + break; + default: + throw utils::Exception("Operation request is not recognized!"); + } + assert(status >= 0); + } + } while (db_.Commit(&txn) == DB::kErrorConflict); return (status == DB::kOK); } -inline int Client::TransactionRead() { +inline int Client::TransactionRead(Transaction *txn) { const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Read(table, key, &fields, result); + return db_.Read(txn, table, key, &fields, result); } else { - return db_.Read(table, key, NULL, result); + return db_.Read(txn, table, key, NULL, result); } } -inline int Client::TransactionReadModifyWrite() { +inline int Client::TransactionReadModifyWrite(Transaction *txn) { const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); std::vector result; @@ -94,9 +107,9 @@ inline int Client::TransactionReadModifyWrite() { if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - db_.Read(table, key, &fields, result); + db_.Read(txn, table, key, &fields, result); } else { - db_.Read(table, key, NULL, result); + db_.Read(txn, table, key, NULL, result); } std::vector values; @@ -105,10 +118,10 @@ inline int Client::TransactionReadModifyWrite() { } else { workload_.BuildUpdate(values); } - return db_.Update(table, key, values); + return db_.Update(txn, table, key, values); } -inline int Client::TransactionScan() { +inline int Client::TransactionScan(Transaction *txn) { const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); int len = workload_.NextScanLength(); @@ -116,13 +129,13 @@ inline int Client::TransactionScan() { if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Scan(table, key, len, &fields, result); + return db_.Scan(txn, table, key, len, &fields, result); } else { - return db_.Scan(table, key, len, NULL, result); + return db_.Scan(txn, table, key, len, NULL, result); } } -inline int Client::TransactionUpdate() { +inline int Client::TransactionUpdate(Transaction *txn) { const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); std::vector values; @@ -131,15 +144,15 @@ inline int Client::TransactionUpdate() { } else { workload_.BuildUpdate(values); } - return db_.Update(table, key, values); + return db_.Update(txn, table, key, values); } -inline int Client::TransactionInsert() { +inline int Client::TransactionInsert(Transaction *txn) { const std::string &table = workload_.NextTable(); workload_.NextSequenceKey(key); std::vector values; workload_.BuildValues(values); - return db_.Insert(table, key, values); + return db_.Insert(txn, table, key, values); } } // ycsbc diff --git a/core/core_workload.cc b/core/core_workload.cc index f5567d72..6ccb2dac 100644 --- a/core/core_workload.cc +++ b/core/core_workload.cc @@ -76,6 +76,9 @@ const string CoreWorkload::INSERT_START_DEFAULT = "0"; const string CoreWorkload::RECORD_COUNT_PROPERTY = "recordcount"; const string CoreWorkload::OPERATION_COUNT_PROPERTY = "operationcount"; +const string CoreWorkload::OPS_PER_TRANSACTION_PROPERTY = "opspertransaction"; +const string CoreWorkload::OPS_PER_TRANSACTION_DEFAULT = "1"; + void CoreWorkload::InitLoadWorkload(const utils::Properties &p, unsigned int nthreads, unsigned int this_thread, BatchedCounterGenerator *key_generator) { table_name_ = p.GetProperty(TABLENAME_PROPERTY,TABLENAME_DEFAULT); @@ -178,6 +181,8 @@ void CoreWorkload::InitRunWorkload(const utils::Properties &p, unsigned int nthr } //batch_size_ = 1; + + ops_per_transaction_ = std::stoi(p.GetProperty(OPS_PER_TRANSACTION_PROPERTY, OPS_PER_TRANSACTION_DEFAULT)); } ycsbc::Generator *CoreWorkload::GetFieldLenGenerator( diff --git a/core/core_workload.h b/core/core_workload.h index c8bfe923..2e363917 100644 --- a/core/core_workload.h +++ b/core/core_workload.h @@ -141,6 +141,9 @@ class CoreWorkload { static const std::string RECORD_COUNT_PROPERTY; static const std::string OPERATION_COUNT_PROPERTY; + static const std::string OPS_PER_TRANSACTION_PROPERTY; + static const std::string OPS_PER_TRANSACTION_DEFAULT; + /// /// Initialize the scenario. /// Called once, in the main client thread, before any operations are started. @@ -164,6 +167,7 @@ class CoreWorkload { bool read_all_fields() const { return read_all_fields_; } bool write_all_fields() const { return write_all_fields_; } + int ops_per_transaction() const { return ops_per_transaction_; } CoreWorkload() : generator_(), @@ -214,7 +218,7 @@ class CoreWorkload { bool ordered_inserts_; size_t record_count_; int zero_padding_; - + int ops_per_transaction_; std::uniform_int_distribution uniform_letter_dist_; }; diff --git a/core/db.h b/core/db.h index 27a2a83f..f26182b3 100644 --- a/core/db.h +++ b/core/db.h @@ -12,6 +12,8 @@ #include #include +#include "transaction.h" + namespace ycsbc { class DB { @@ -20,6 +22,7 @@ class DB { static const int kOK = 0; static const int kErrorNoData = 1; static const int kErrorConflict = 2; + static const int kErrorNotSupport = 3; /// /// Initializes any state for accessing this DB. /// Called once per DB client (thread); there is a single DB instance globally. @@ -91,6 +94,111 @@ class DB { virtual int Delete(const std::string &table, const std::string &key) = 0; virtual ~DB() { } + + virtual void + Begin(Transaction **txn) + {} + + virtual int + Commit(Transaction **txn) + { + return 0; + } + + /// + /// Reads a record from the database. + /// Field/value pairs from the result are stored in a vector. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to read. + /// @param fields The list of fields to read, or NULL for all of them. + /// @param result A vector of field/value pairs for the result. + /// @return Zero on success, or a non-zero error code on error/record-miss. + /// + virtual int + Read(Transaction *txn, + const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) + { + return Read(table, key, fields, result); + } + /// + /// Performs a range scan for a set of records in the database. + /// Field/value pairs from the result are stored in a vector. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the first record to read. + /// @param record_count The number of records to read. + /// @param fields The list of fields to read, or NULL for all of them. + /// @param result A vector of vector, where each vector contains field/value + /// pairs for one record + /// @return Zero on success, or a non-zero error code on error. + /// + virtual int + Scan(Transaction *txn, + const std::string &table, + const std::string &key, + int record_count, + const std::vector *fields, + std::vector> &result) + { + return Scan(table, key, record_count, fields, result); + } + + /// + /// Updates a record in the database. + /// Field/value pairs in the specified vector are written to the record, + /// overwriting any existing values with the same field names. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to write. + /// @param values A vector of field/value pairs to update in the record. + /// @return Zero on success, a non-zero error code on error. + /// + virtual int + Update(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values) + { + return Update(table, key, values); + } + /// + /// Inserts a record into the database. + /// Field/value pairs in the specified vector are written into the record. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to insert. + /// @param values A vector of field/value pairs to insert in the record. + /// @return Zero on success, a non-zero error code on error. + /// + virtual int + Insert(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values) + { + return Insert(table, key, values); + } + /// + /// Deletes a record from the database. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to delete. + /// @return Zero on success, a non-zero error code on error. + /// + virtual int + Delete(Transaction *txn, const std::string &table, const std::string &key) + { + return Delete(table, key); + } }; } // ycsbc diff --git a/core/transaction.h b/core/transaction.h new file mode 100644 index 00000000..f7376d50 --- /dev/null +++ b/core/transaction.h @@ -0,0 +1,12 @@ +#ifndef YCSB_C_TRANSACTION_H_ +#define YCSB_C_TRANSACTION_H_ + +namespace ycsbc { + +class Transaction { +public: +}; + +} // namespace ycsbc + +#endif // YCSB_C_TRANSACTION_H_ diff --git a/db/db_factory.cc b/db/db_factory.cc index 7ccd1bb9..062741c6 100644 --- a/db/db_factory.cc +++ b/db/db_factory.cc @@ -15,6 +15,7 @@ #include "db/tbb_rand_db.h" #include "db/tbb_scan_db.h" #include "db/splinter_db.h" +#include "db/transactional_splinter_db.h" #include "db/rocks_db.h" using namespace std; @@ -35,6 +36,8 @@ DB* DBFactory::CreateDB(utils::Properties &props, bool preloaded) { return new RocksDB(props, preloaded); } else if (props["dbname"] == "splinterdb") { return new SplinterDB(props, preloaded); + } else if (props["dbname"] == "transactional_splinterdb") { + return new TransactionalSplinterDB(props, preloaded); } else if (props["dbname"] == "tbb_rand") { assert(!preloaded); return new TbbRandDB; diff --git a/db/transactional_splinter_db.cc b/db/transactional_splinter_db.cc new file mode 100644 index 00000000..163832c8 --- /dev/null +++ b/db/transactional_splinter_db.cc @@ -0,0 +1,267 @@ +// +// splinter_db.cc +// YCSB-C +// +// Created by Rob Johnson on 3/20/2022. +// Copyright (c) 2022 VMware. +// + +#include "db/transactional_splinter_db.h" +extern "C" { +#include "splinterdb/default_data_config.h" +} + +#include +#include + +using std::string; +using std::vector; + +namespace ycsbc { + +TransactionalSplinterDB::TransactionalSplinterDB(utils::Properties &props, + bool preloaded) +{ + cout << "This is TransacionalSplinterDB\n"; + + uint64_t max_key_size = props.GetIntProperty("splinterdb.max_key_size"); + + default_data_config_init(max_key_size, &data_cfg); + splinterdb_cfg.filename = props.GetProperty("splinterdb.filename").c_str(); + splinterdb_cfg.cache_size = + props.GetIntProperty("splinterdb.cache_size_mb") * 1024 * 1024; + splinterdb_cfg.disk_size = + props.GetIntProperty("splinterdb.disk_size_gb") * 1024 * 1024 * 1024; + splinterdb_cfg.data_cfg = &data_cfg; + splinterdb_cfg.heap_handle = NULL; + splinterdb_cfg.heap_id = NULL; + splinterdb_cfg.page_size = props.GetIntProperty("splinterdb.page_size"); + splinterdb_cfg.extent_size = props.GetIntProperty("splinterdb.extent_size"); + splinterdb_cfg.io_flags = props.GetIntProperty("splinterdb.io_flags"); + splinterdb_cfg.io_perms = props.GetIntProperty("splinterdb.io_perms"); + splinterdb_cfg.io_async_queue_depth = + props.GetIntProperty("splinterdb.io_async_queue_depth"); + splinterdb_cfg.cache_use_stats = + props.GetIntProperty("splinterdb.cache_use_stats"); + splinterdb_cfg.cache_logfile = + props.GetProperty("splinterdb.cache_logfile").c_str(); + splinterdb_cfg.btree_rough_count_height = + props.GetIntProperty("splinterdb.btree_rough_count_height"); + splinterdb_cfg.filter_remainder_size = + props.GetIntProperty("splinterdb.filter_remainder_size"); + splinterdb_cfg.filter_index_size = + props.GetIntProperty("splinterdb.filter_index_size"); + splinterdb_cfg.use_log = props.GetIntProperty("splinterdb.use_log"); + splinterdb_cfg.memtable_capacity = + props.GetIntProperty("splinterdb.memtable_capacity"); + splinterdb_cfg.fanout = props.GetIntProperty("splinterdb.fanout"); + splinterdb_cfg.max_branches_per_node = + props.GetIntProperty("splinterdb.max_branches_per_node"); + splinterdb_cfg.use_stats = props.GetIntProperty("splinterdb.use_stats"); + splinterdb_cfg.reclaim_threshold = + props.GetIntProperty("splinterdb.reclaim_threshold"); + + if (preloaded) { + assert(!transactional_splinterdb_open(&splinterdb_cfg, &spl)); + } else { + assert(!transactional_splinterdb_create(&splinterdb_cfg, &spl)); + } +} + +TransactionalSplinterDB::~TransactionalSplinterDB() +{ + transactional_splinterdb_close(&spl); +} + +void +TransactionalSplinterDB::Init() +{ + transactional_splinterdb_register_thread(spl); +} + +void +TransactionalSplinterDB::Close() +{ + transactional_splinterdb_deregister_thread(spl); +} + +int +TransactionalSplinterDB::Read(Transaction *txn, + const string &table, + const string &key, + const vector *fields, + vector &result) +{ + assert(txn != NULL); + + splinterdb_lookup_result lookup_result; + transactional_splinterdb_lookup_result_init(spl, &lookup_result, 0, NULL); + slice key_slice = slice_create(key.size(), key.c_str()); + // cout << "lookup " << key << endl; + + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert(!transactional_splinterdb_lookup( + spl, txn_handle, key_slice, &lookup_result)); + if (!splinterdb_lookup_found(&lookup_result)) { + cout << "FAILED lookup " << key << endl; + assert(0); + } + // cout << "done lookup " << key << endl; + splinterdb_lookup_result_deinit(&lookup_result); + return DB::kOK; +} + +int +TransactionalSplinterDB::Scan(Transaction *txn, + const string &table, + const string &key, + int len, + const vector *fields, + vector> &result) +{ + assert(txn != NULL); + assert(fields == NULL); + + return DB::kErrorNotSupport; +} + +int +TransactionalSplinterDB::Update(Transaction *txn, + const string &table, + const string &key, + vector &values) +{ + return Insert(txn, table, key, values); +} + +int +TransactionalSplinterDB::Insert(Transaction *txn, + const string &table, + const string &key, + vector &values) +{ + assert(txn != NULL); + assert(values.size() == 1); + + std::string val = values[0].second; + slice key_slice = slice_create(key.size(), key.c_str()); + slice val_slice = slice_create(val.size(), val.c_str()); + // cout << "insert " << key << endl; + + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert( + !transactional_splinterdb_insert(spl, txn_handle, key_slice, val_slice)); + // cout << "done insert " << key << endl; + + return DB::kOK; +} + +int +TransactionalSplinterDB::Delete(Transaction *txn, + const string &table, + const string &key) +{ + slice key_slice = slice_create(key.size(), key.c_str()); + + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert(!transactional_splinterdb_delete(spl, txn_handle, key_slice)); + + return DB::kOK; +} + +void +TransactionalSplinterDB::Begin(Transaction **txn) +{ + *txn = new SplinterDBTransaction(); + transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; + transactional_splinterdb_begin(spl, txn_handle); +} + +int +TransactionalSplinterDB::Commit(Transaction **txn) +{ + transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; + int rc = transactional_splinterdb_commit(spl, txn_handle); + delete *txn; + *txn = NULL; + return rc < 0 ? DB::kErrorConflict : DB::kOK; +} + +int +TransactionalSplinterDB::Read(const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) +{ + return Read(NULL, table, key, fields, result); +} +/// +/// Performs a range scan for a set of records in the database. +/// Field/value pairs from the result are stored in a vector. +/// +/// @param table The name of the table. +/// @param key The key of the first record to read. +/// @param record_count The number of records to read. +/// @param fields The list of fields to read, or NULL for all of them. +/// @param result A vector of vector, where each vector contains field/value +/// pairs for one record +/// @return Zero on success, or a non-zero error code on error. +/// +int +TransactionalSplinterDB::Scan(const std::string &table, + const std::string &key, + int record_count, + const std::vector *fields, + std::vector> &result) +{ + return Scan(NULL, table, key, record_count, fields, result); +} + +/// +/// Updates a record in the database. +/// Field/value pairs in the specified vector are written to the record, +/// overwriting any existing values with the same field names. +/// +/// @param table The name of the table. +/// @param key The key of the record to write. +/// @param values A vector of field/value pairs to update in the record. +/// @return Zero on success, a non-zero error code on error. +/// +int +TransactionalSplinterDB::Update(const std::string &table, + const std::string &key, + std::vector &values) +{ + return Update(NULL, table, key, values); +} +/// +/// Inserts a record into the database. +/// Field/value pairs in the specified vector are written into the record. +/// +/// @param table The name of the table. +/// @param key The key of the record to insert. +/// @param values A vector of field/value pairs to insert in the record. +/// @return Zero on success, a non-zero error code on error. +/// +int +TransactionalSplinterDB::Insert(const std::string &table, + const std::string &key, + std::vector &values) +{ + return Insert(NULL, table, key, values); +} +/// +/// Deletes a record from the database. +/// +/// @param table The name of the table. +/// @param key The key of the record to delete. +/// @return Zero on success, a non-zero error code on error. +/// +int +TransactionalSplinterDB::Delete(const std::string &table, + const std::string &key) +{ + return Delete(NULL, table, key); +} + +} // namespace ycsbc diff --git a/db/transactional_splinter_db.h b/db/transactional_splinter_db.h new file mode 100644 index 00000000..58bad2ef --- /dev/null +++ b/db/transactional_splinter_db.h @@ -0,0 +1,112 @@ +// +// splinter_db.h +// YCSB-C +// + +#ifndef YCSB_C_TRANSACTIONAL_SPLINTER_DB_H_ +#define YCSB_C_TRANSACTIONAL_SPLINTER_DB_H_ + +#include +#include +#include + +#include "core/db.h" +#include "core/properties.h" + +extern "C" { +#include "splinterdb/transaction.h" +} + +using std::cout; +using std::endl; + +namespace ycsbc { + +class TransactionalSplinterDB : public DB { +public: + TransactionalSplinterDB(utils::Properties &props, bool preloaded); + ~TransactionalSplinterDB(); + + void + Init(); + void + Close(); + + int + Read(const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result); + + int + Scan(const std::string &table, + const std::string &key, + int len, + const std::vector *fields, + std::vector> &result); + + int + Update(const std::string &table, + const std::string &key, + std::vector &values); + + int + Insert(const std::string &table, + const std::string &key, + std::vector &values); + + int + Delete(const std::string &table, const std::string &key); + + void + Begin(Transaction **txn); + + int + Commit(Transaction **txn); + + int + Read(Transaction *txn, + const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result); + + int + Scan(Transaction *txn, + const std::string &table, + const std::string &key, + int len, + const std::vector *fields, + std::vector> &result); + + int + Update(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values); + + int + Insert(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values); + + int + Delete(Transaction *txn, const std::string &table, const std::string &key); + +private: + splinterdb_config splinterdb_cfg; + data_config data_cfg; + transactional_splinterdb *spl; +}; + +class SplinterDBTransaction : public Transaction { +private: + transaction handle; + + friend TransactionalSplinterDB; +}; + +} // namespace ycsbc + +#endif // YCSB_C_TRANSACTIONAL_SPLINTER_DB_H_ From 648c3afbfbfffab88b1ca820b1855437e97fbd06 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Wed, 17 Aug 2022 11:24:40 -0700 Subject: [PATCH 02/15] Implemented RocksDB's transactions (#1) * Adding RocksDB's (Pessimistic)TransactionDB * Adding RocksDB's OptimisticTransactionDB --- db/db_factory.cc | 5 + db/optimistic_transaction_rocks_db.cc | 177 ++++++++++++++++++++++++++ db/transaction_rocks_db.cc | 173 +++++++++++++++++++++++++ db/transaction_rocks_db.h | 164 ++++++++++++++++++++++++ 4 files changed, 519 insertions(+) create mode 100644 db/optimistic_transaction_rocks_db.cc create mode 100644 db/transaction_rocks_db.cc create mode 100644 db/transaction_rocks_db.h diff --git a/db/db_factory.cc b/db/db_factory.cc index 062741c6..61f27de9 100644 --- a/db/db_factory.cc +++ b/db/db_factory.cc @@ -17,6 +17,7 @@ #include "db/splinter_db.h" #include "db/transactional_splinter_db.h" #include "db/rocks_db.h" +#include "db/transaction_rocks_db.h" using namespace std; using ycsbc::DB; @@ -34,6 +35,10 @@ DB* DBFactory::CreateDB(utils::Properties &props, bool preloaded) { return new RedisDB(props["host"].c_str(), port, slaves); } else if (props["dbname"] == "rocksdb") { return new RocksDB(props, preloaded); + } else if (props["dbname"] == "transaction_rocksdb") { + return new TransactionRocksDB(props, preloaded); + } else if (props["dbname"] == "optimistic_transaction_rocksdb") { + return new OptimisticTransactionRocksDB(props, preloaded); } else if (props["dbname"] == "splinterdb") { return new SplinterDB(props, preloaded); } else if (props["dbname"] == "transactional_splinterdb") { diff --git a/db/optimistic_transaction_rocks_db.cc b/db/optimistic_transaction_rocks_db.cc new file mode 100644 index 00000000..7e3e8bbe --- /dev/null +++ b/db/optimistic_transaction_rocks_db.cc @@ -0,0 +1,177 @@ +// +// rocks_db.cc +// YCSB-C +// +// Created by Rob Johnson on 3/20/2022. +// Copyright (c) 2022 VMware. +// + +#include "db/transaction_rocks_db.h" +#include +#include +#include +#include + +using std::string; +using std::vector; + +namespace ycsbc { + +void OptimisticTransactionRocksDB::InitializeOptions(utils::Properties &props) { + const std::map &m = + (const std::map &)props; + rocksdb::ConfigOptions copts; + + if (m.count("rocksdb.config_file")) { + std::vector cf_descs; + assert(LoadOptionsFromFile(copts, m.at("rocksdb.config_file"), &options, + &cf_descs) == rocksdb::Status::OK()); + } + + std::unordered_map options_map; + for (auto tuple : m) { + if (tuple.first.find("rocksdb.options.") == 0) { + auto key = + tuple.first.substr(strlen("rocksdb.options."), std::string::npos); + options_map[key] = tuple.second; + + } else if (tuple.first == "rocksdb.write_options.sync") { + long int sync = props.GetIntProperty("rocksdb.write_options.sync"); + woptions.sync = sync; + } else if (tuple.first == "rocksdb.write_options.disableWAL") { + long int disableWAL = + props.GetIntProperty("rocksdb.write_options.disableWAL"); + woptions.disableWAL = disableWAL; + } else if (tuple.first == "rocksdb.config_file") { + // ignore it here -- loaded above + } else if (tuple.first == "rocksdb.database_filename") { + // ignore it, used in constructor + } else if (tuple.first.find("rocksdb.") == 0) { + std::cout << "Unknown rocksdb config option " << tuple.first << std::endl; + assert(0); + } + } + rocksdb::Options new_options; + assert(GetDBOptionsFromMap(copts, options, options_map, &new_options) == + rocksdb::Status::OK()); + options = new_options; +} + +OptimisticTransactionRocksDB::OptimisticTransactionRocksDB( + utils::Properties &props, bool preloaded) { + InitializeOptions(props); + std::string database_filename = + props.GetProperty("rocksdb.database_filename"); + options.create_if_missing = !preloaded; + options.error_if_exists = !preloaded; + rocksdb::Status status = + rocksdb::OptimisticTransactionDB::Open(options, database_filename, &db); + assert(status.ok()); +} + +OptimisticTransactionRocksDB::~OptimisticTransactionRocksDB() { delete db; } + +void OptimisticTransactionRocksDB::Init() {} + +void OptimisticTransactionRocksDB::Close() {} + +void OptimisticTransactionRocksDB::Begin(Transaction **txn) { + *txn = new RocksDBTransaction(); + + rocksdb::Transaction *rt = db->BeginTransaction(woptions); + ((RocksDBTransaction *)*txn)->handle = rt; + rt->SetSnapshot(); +} + +int OptimisticTransactionRocksDB::Commit(Transaction **txn) { + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; + rocksdb::Status s = txn_handle->Commit(); + delete txn_handle; + delete *txn; + *txn = NULL; + + if (s.ok()) { + return DB::kOK; + } + + if (s.IsAborted() || s.IsTimedOut()) { + return DB::kErrorConflict; + } + + // FIXME: this error type might not be correct + return DB::kErrorNotSupport; +} + +int OptimisticTransactionRocksDB::Read(Transaction *txn, + const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + assert(txn != NULL); + string value; + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::ReadOptions roptions_ = roptions; + roptions_.snapshot = txn_handle->GetSnapshot(); + rocksdb::Status status = + txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); + assert(status.ok() || status.IsNotFound()); // TODO is it expected we're + // querying non-existing keys? + return DB::kOK; +} + +int OptimisticTransactionRocksDB::Scan( + Transaction *txn, const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + // rocksdb::Iterator* it = db->NewIterator(roptions); + // int i = 0; + // for (it->Seek(key); i < len && it->Valid(); it->Next()) { + // i++; + // } + // delete it; + // return DB::kOK; +} + +int OptimisticTransactionRocksDB::Update(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values) { + return Insert(txn, table, key, values); +} + +int OptimisticTransactionRocksDB::Insert(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values) { + assert(txn != NULL); + assert(values.size() == 1); + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = + txn_handle->Put(rocksdb::Slice(key), rocksdb::Slice(values[0].second)); + assert(status.ok()); + return DB::kOK; +} + +int OptimisticTransactionRocksDB::Delete(Transaction *txn, + const std::string &table, + const std::string &key) { + assert(txn != NULL); + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = txn_handle->Delete(rocksdb::Slice(key)); + assert(status.ok()); + return DB::kOK; +} + +} // namespace ycsbc + +// Might want this for later: +// +// +// inline void sync(bool /*fullSync*/) { +// static struct rocksdb::FlushOptions foptions = +// rocksdb::FlushOptions(); rocksdb::Status status = db.Flush(foptions); +// assert(status.ok()); +// } diff --git a/db/transaction_rocks_db.cc b/db/transaction_rocks_db.cc new file mode 100644 index 00000000..1402b14f --- /dev/null +++ b/db/transaction_rocks_db.cc @@ -0,0 +1,173 @@ +// +// rocks_db.cc +// YCSB-C +// +// Created by Rob Johnson on 3/20/2022. +// Copyright (c) 2022 VMware. +// + +#include "db/transaction_rocks_db.h" +#include +#include +#include +#include + +using std::string; +using std::vector; + +namespace ycsbc { + +void TransactionRocksDB::InitializeOptions(utils::Properties &props) { + const std::map &m = + (const std::map &)props; + rocksdb::ConfigOptions copts; + + if (m.count("rocksdb.config_file")) { + std::vector cf_descs; + assert(LoadOptionsFromFile(copts, m.at("rocksdb.config_file"), &options, + &cf_descs) == rocksdb::Status::OK()); + } + + std::unordered_map options_map; + for (auto tuple : m) { + if (tuple.first.find("rocksdb.options.") == 0) { + auto key = + tuple.first.substr(strlen("rocksdb.options."), std::string::npos); + options_map[key] = tuple.second; + + } else if (tuple.first == "rocksdb.write_options.sync") { + long int sync = props.GetIntProperty("rocksdb.write_options.sync"); + woptions.sync = sync; + } else if (tuple.first == "rocksdb.write_options.disableWAL") { + long int disableWAL = + props.GetIntProperty("rocksdb.write_options.disableWAL"); + woptions.disableWAL = disableWAL; + } else if (tuple.first == "rocksdb.config_file") { + // ignore it here -- loaded above + } else if (tuple.first == "rocksdb.database_filename") { + // ignore it, used in constructor + } else if (tuple.first == + "rocksdb.txndb_options.transaction_lock_timeout") { + long transaction_lock_timeout = props.GetIntProperty( + "rocksdb.txndb_options.transaction_lock_timeout"); + txndb_options.transaction_lock_timeout = transaction_lock_timeout; + } else if (tuple.first.find("rocksdb.") == 0) { + std::cout << "Unknown rocksdb config option " << tuple.first << std::endl; + assert(0); + } + } + rocksdb::Options new_options; + assert(GetDBOptionsFromMap(copts, options, options_map, &new_options) == + rocksdb::Status::OK()); + options = new_options; +} + +TransactionRocksDB::TransactionRocksDB(utils::Properties &props, + bool preloaded) { + InitializeOptions(props); + std::string database_filename = + props.GetProperty("rocksdb.database_filename"); + options.create_if_missing = !preloaded; + options.error_if_exists = !preloaded; + rocksdb::Status status = rocksdb::TransactionDB::Open(options, txndb_options, + database_filename, &db); + assert(status.ok()); +} + +TransactionRocksDB::~TransactionRocksDB() { delete db; } + +void TransactionRocksDB::Init() {} + +void TransactionRocksDB::Close() {} + +void TransactionRocksDB::Begin(Transaction **txn) { + *txn = new RocksDBTransaction(); + ((RocksDBTransaction *)*txn)->handle = db->BeginTransaction(woptions); +} + +int TransactionRocksDB::Commit(Transaction **txn) { + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; + rocksdb::Status s = txn_handle->Commit(); + delete txn_handle; + delete *txn; + *txn = NULL; + + if (s.ok()) { + return DB::kOK; + } + + if (s.IsAborted() || s.IsTimedOut()) { + return DB::kErrorConflict; + } + + // FIXME: this error type might not be correct + return DB::kErrorNotSupport; +} + +int TransactionRocksDB::Read(Transaction *txn, const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + assert(txn != NULL); + string value; + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = + txn_handle->GetForUpdate(roptions, rocksdb::Slice(key), &value); + assert(status.ok() || status.IsNotFound()); // TODO is it expected we're + // querying non-existing keys? + return DB::kOK; +} + +int TransactionRocksDB::Scan(Transaction *txn, const std::string &table, + const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + // rocksdb::Iterator* it = db->NewIterator(roptions); + // int i = 0; + // for (it->Seek(key); i < len && it->Valid(); it->Next()) { + // i++; + // } + // delete it; + // return DB::kOK; +} + +int TransactionRocksDB::Update(Transaction *txn, const std::string &table, + const std::string &key, + std::vector &values) { + return Insert(txn, table, key, values); +} + +int TransactionRocksDB::Insert(Transaction *txn, const std::string &table, + const std::string &key, + std::vector &values) { + assert(txn != NULL); + assert(values.size() == 1); + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = + txn_handle->Put(rocksdb::Slice(key), rocksdb::Slice(values[0].second)); + assert(status.ok()); + return DB::kOK; +} + +int TransactionRocksDB::Delete(Transaction *txn, const std::string &table, + const std::string &key) { + assert(txn != NULL); + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = txn_handle->Delete(rocksdb::Slice(key)); + assert(status.ok()); + return DB::kOK; +} + +} // namespace ycsbc + +// Might want this for later: +// +// +// inline void sync(bool /*fullSync*/) { +// static struct rocksdb::FlushOptions foptions = +// rocksdb::FlushOptions(); rocksdb::Status status = db.Flush(foptions); +// assert(status.ok()); +// } diff --git a/db/transaction_rocks_db.h b/db/transaction_rocks_db.h new file mode 100644 index 00000000..3d0371a2 --- /dev/null +++ b/db/transaction_rocks_db.h @@ -0,0 +1,164 @@ +// +// rocks_db.h +// YCSB-C +// + +#ifndef YCSB_C_TRANSACTION_ROCKS_DB_H_ +#define YCSB_C_TRANSACTION_ROCKS_DB_H_ + +#include "core/db.h" + +#include "core/properties.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction_db.h" +#include +#include + +using std::cout; +using std::endl; + +namespace ycsbc { + +class TransactionRocksDB : public DB { +public: + TransactionRocksDB(utils::Properties &props, bool preloaded); + ~TransactionRocksDB(); + + void Init(); + void Close(); + + int Read(const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result) + { + return DB::kErrorNotSupport; + } + + int Scan(const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result) + { + return DB::kErrorNotSupport; + } + + int Update(const std::string &table, const std::string &key, + std::vector &values) + { + return DB::kErrorNotSupport; + } + + int Insert(const std::string &table, const std::string &key, + std::vector &values) + { + return DB::kErrorNotSupport; + } + + int Delete(const std::string &table, const std::string &key) + { + return DB::kErrorNotSupport; + } + + void Begin(Transaction **txn); + + int Commit(Transaction **txn); + + int Read(Transaction *txn, const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(Transaction *txn, const std::string &table, const std::string &key, + int len, const std::vector *fields, + std::vector> &result); + + int Update(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Insert(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Delete(Transaction *txn, const std::string &table, + const std::string &key); + +private: + void InitializeOptions(utils::Properties &props); + + rocksdb::TransactionDB *db; + rocksdb::Options options; + rocksdb::ReadOptions roptions; + rocksdb::WriteOptions woptions; + rocksdb::TransactionDBOptions txndb_options; +}; + +class OptimisticTransactionRocksDB : public DB { +public: + OptimisticTransactionRocksDB(utils::Properties &props, bool preloaded); + ~OptimisticTransactionRocksDB(); + + void Init(); + void Close(); + + int Read(const std::string &table, const std::string &key, + const std::vector *fields, + std::vector &result) { + return DB::kErrorNotSupport; + } + + int Scan(const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + } + + int Update(const std::string &table, const std::string &key, + std::vector &values) { + return DB::kErrorNotSupport; + } + + int Insert(const std::string &table, const std::string &key, + std::vector &values) { + return DB::kErrorNotSupport; + } + + int Delete(const std::string &table, const std::string &key) { + return DB::kErrorNotSupport; + } + + void Begin(Transaction **txn); + + int Commit(Transaction **txn); + + int Read(Transaction *txn, const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(Transaction *txn, const std::string &table, const std::string &key, + int len, const std::vector *fields, + std::vector> &result); + + int Update(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Insert(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Delete(Transaction *txn, const std::string &table, + const std::string &key); + +private: + void InitializeOptions(utils::Properties &props); + + rocksdb::OptimisticTransactionDB *db; + rocksdb::Options options; + rocksdb::ReadOptions roptions; + rocksdb::WriteOptions woptions; +}; + +class RocksDBTransaction : public Transaction { +private: + rocksdb::Transaction *handle; + + friend TransactionRocksDB; + friend OptimisticTransactionRocksDB; +}; + +} // namespace ycsbc + +#endif // YCSB_C_TRANSACTION_ROCKS_DB_H_ From 3022560c7b65524c443ca9354457d9c69a8a4f38 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Fri, 2 Sep 2022 18:15:54 -0700 Subject: [PATCH 03/15] Fixed a retry (#2) The problem was that an aborted transaction gets new keys from a generator. It should do with keys that it originally tried. Each transaction stores operations and replays until it commits. --- core/client.h | 208 ++++++++++++++----- core/db.h | 80 +++----- core/transaction.h | 26 +++ db/optimistic_transaction_rocks_db.cc | 7 +- db/transaction_rocks_db.cc | 11 +- db/transaction_rocks_db.h | 18 +- db/transactional_splinter_db.cc | 274 +++++++++++--------------- db/transactional_splinter_db.h | 120 +++++------ 8 files changed, 394 insertions(+), 350 deletions(-) diff --git a/core/client.h b/core/client.h index 4b249dd9..2fb3c2d2 100644 --- a/core/client.h +++ b/core/client.h @@ -9,33 +9,42 @@ #ifndef YCSB_C_CLIENT_H_ #define YCSB_C_CLIENT_H_ -#include -#include "db.h" #include "core_workload.h" +#include "db.h" +#include "transaction.h" #include "utils.h" +#include namespace ycsbc { class Client { - public: +public: Client(DB &db, CoreWorkload &wl) : db_(db), workload_(wl) { workload_.InitKeyBuffer(key); workload_.InitPairs(pairs); } - + virtual bool DoInsert(); virtual bool DoTransaction(); - - virtual ~Client() { } - - protected: - + + virtual ~Client() {} + +protected: virtual int TransactionRead(Transaction *txn); virtual int TransactionReadModifyWrite(Transaction *txn); virtual int TransactionScan(Transaction *txn); virtual int TransactionUpdate(Transaction *txn); virtual int TransactionInsert(Transaction *txn); - + + virtual int TransactionReadRetry(Transaction *txn, TransactionOperation &top); + virtual int TransactionReadModifyWriteRetry(Transaction *txn, + TransactionOperation &top); + virtual int TransactionScanRetry(Transaction *txn, TransactionOperation &top); + virtual int TransactionUpdateRetry(Transaction *txn, + TransactionOperation &top); + virtual int TransactionInsertRetry(Transaction *txn, + TransactionOperation &top); + DB &db_; CoreWorkload &workload_; std::string key; @@ -57,104 +66,195 @@ inline bool Client::DoInsert() { inline bool Client::DoTransaction() { int status = -1; Transaction *txn = NULL; - do { + + db_.Begin(&txn); + + txn->SetTransactionOperationsSize(workload_.ops_per_transaction()); + for (int i = 0; i < workload_.ops_per_transaction(); ++i) { + switch (workload_.NextOperation()) { + case READ: + status = TransactionRead(txn); + break; + case UPDATE: + status = TransactionUpdate(txn); + break; + case INSERT: + status = TransactionInsert(txn); + break; + case SCAN: + status = TransactionScan(txn); + break; + case READMODIFYWRITE: + status = TransactionReadModifyWrite(txn); + break; + default: + throw utils::Exception("Operation request is not recognized!"); + } + assert(status >= 0); + } + + bool need_retry = db_.Commit(&txn) == DB::kErrorConflict; + + while (need_retry) { db_.Begin(&txn); - for (int i = 0; i < workload_.ops_per_transaction(); ++i) { - switch (workload_.NextOperation()) { + for (unsigned long i = 0; i < txn->GetTransactionOperationsSize(); ++i) { + TransactionOperation &top = txn->GetOperation(i); + switch (top.op) { case READ: - status = TransactionRead(txn); - break; + status = TransactionReadRetry(txn, top); + break; case UPDATE: - status = TransactionUpdate(txn); - break; + status = TransactionUpdateRetry(txn, top); + break; case INSERT: - status = TransactionInsert(txn); - break; + status = TransactionInsertRetry(txn, top); + break; case SCAN: - status = TransactionScan(txn); - break; + status = TransactionScanRetry(txn, top); + break; case READMODIFYWRITE: - status = TransactionReadModifyWrite(txn); - break; + status = TransactionReadModifyWriteRetry(txn, top); + break; default: - throw utils::Exception("Operation request is not recognized!"); + throw utils::Exception("Operation request is not recognized!"); } assert(status >= 0); } - } while (db_.Commit(&txn) == DB::kErrorConflict); + + need_retry = db_.Commit(&txn) == DB::kErrorConflict; + } + return (status == DB::kOK); } inline int Client::TransactionRead(Transaction *txn) { - const std::string &table = workload_.NextTable(); - const std::string &key = workload_.NextTransactionKey(); + TransactionOperation &top = txn->GetNextOperation(); + top.op = READ; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Read(txn, table, key, &fields, result); + return db_.Read(txn, top.table, top.key, &fields, result); } else { - return db_.Read(txn, table, key, NULL, result); + return db_.Read(txn, top.table, top.key, NULL, result); } } inline int Client::TransactionReadModifyWrite(Transaction *txn) { - const std::string &table = workload_.NextTable(); - const std::string &key = workload_.NextTransactionKey(); + TransactionOperation &top = txn->GetNextOperation(); + top.op = READMODIFYWRITE; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - db_.Read(txn, table, key, &fields, result); + db_.Read(txn, top.table, top.key, &fields, result); } else { - db_.Read(txn, table, key, NULL, result); + db_.Read(txn, top.table, top.key, NULL, result); } - std::vector values; if (workload_.write_all_fields()) { - workload_.BuildValues(values); + workload_.BuildValues(top.values); } else { - workload_.BuildUpdate(values); + workload_.BuildUpdate(top.values); } - return db_.Update(txn, table, key, values); + return db_.Update(txn, top.table, top.key, top.values); } inline int Client::TransactionScan(Transaction *txn) { - const std::string &table = workload_.NextTable(); - const std::string &key = workload_.NextTransactionKey(); - int len = workload_.NextScanLength(); + TransactionOperation &top = txn->GetNextOperation(); + top.op = SCAN; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + top.len = workload_.NextScanLength(); std::vector> result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Scan(txn, table, key, len, &fields, result); + return db_.Scan(txn, top.table, top.key, top.len, &fields, result); } else { - return db_.Scan(txn, table, key, len, NULL, result); + return db_.Scan(txn, top.table, top.key, top.len, NULL, result); } } inline int Client::TransactionUpdate(Transaction *txn) { - const std::string &table = workload_.NextTable(); - const std::string &key = workload_.NextTransactionKey(); - std::vector values; + TransactionOperation &top = txn->GetNextOperation(); + top.op = UPDATE; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); if (workload_.write_all_fields()) { - workload_.BuildValues(values); + workload_.BuildValues(top.values); } else { - workload_.BuildUpdate(values); + workload_.BuildUpdate(top.values); } - return db_.Update(txn, table, key, values); + return db_.Update(txn, top.table, top.key, top.values); } inline int Client::TransactionInsert(Transaction *txn) { - const std::string &table = workload_.NextTable(); + TransactionOperation &top = txn->GetNextOperation(); + top.op = INSERT; + top.table = workload_.NextTable(); workload_.NextSequenceKey(key); - std::vector values; - workload_.BuildValues(values); - return db_.Insert(txn, table, key, values); -} + top.key = key; + workload_.BuildValues(top.values); + + return db_.Insert(txn, top.table, top.key, top.values); +} + +inline int Client::TransactionReadRetry(Transaction *txn, + TransactionOperation &top) { + std::vector result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Read(txn, top.table, top.key, &fields, result); + } else { + return db_.Read(txn, top.table, top.key, NULL, result); + } +} + +inline int Client::TransactionReadModifyWriteRetry(Transaction *txn, + TransactionOperation &top) { + std::vector result; + + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + db_.Read(txn, top.table, top.key, &fields, result); + } else { + db_.Read(txn, top.table, top.key, NULL, result); + } + + return db_.Update(txn, top.table, top.key, top.values); +} + +inline int Client::TransactionScanRetry(Transaction *txn, + TransactionOperation &top) { + std::vector> result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Scan(txn, top.table, top.key, top.len, &fields, result); + } else { + return db_.Scan(txn, top.table, top.key, top.len, NULL, result); + } +} + +inline int Client::TransactionUpdateRetry(Transaction *txn, + TransactionOperation &top) { + return db_.Update(txn, top.table, top.key, top.values); +} + +inline int Client::TransactionInsertRetry(Transaction *txn, + TransactionOperation &top) { + return db_.Insert(txn, top.table, top.key, top.values); +} -} // ycsbc +} // namespace ycsbc #endif // YCSB_C_CLIENT_H_ diff --git a/core/db.h b/core/db.h index f26182b3..ce10dd6c 100644 --- a/core/db.h +++ b/core/db.h @@ -9,15 +9,15 @@ #ifndef YCSB_C_DB_H_ #define YCSB_C_DB_H_ -#include #include - -#include "transaction.h" +#include namespace ycsbc { +class Transaction; + class DB { - public: +public: typedef std::pair KVPair; static const int kOK = 0; static const int kErrorNoData = 1; @@ -25,14 +25,16 @@ class DB { static const int kErrorNotSupport = 3; /// /// Initializes any state for accessing this DB. - /// Called once per DB client (thread); there is a single DB instance globally. + /// Called once per DB client (thread); there is a single DB instance + /// globally. /// - virtual void Init() { } + virtual void Init() {} /// /// Clears any state for accessing this DB. - /// Called once per DB client (thread); there is a single DB instance globally. + /// Called once per DB client (thread); there is a single DB instance + /// globally. /// - virtual void Close() { } + virtual void Close() {} /// /// Reads a record from the database. /// Field/value pairs from the result are stored in a vector. @@ -92,18 +94,12 @@ class DB { /// @return Zero on success, a non-zero error code on error. /// virtual int Delete(const std::string &table, const std::string &key) = 0; - - virtual ~DB() { } - - virtual void - Begin(Transaction **txn) - {} - virtual int - Commit(Transaction **txn) - { - return 0; - } + virtual ~DB() {} + + virtual void Begin(Transaction **txn) {} + + virtual int Commit(Transaction **txn) { return 0; } /// /// Reads a record from the database. @@ -116,13 +112,10 @@ class DB { /// @param result A vector of field/value pairs for the result. /// @return Zero on success, or a non-zero error code on error/record-miss. /// - virtual int - Read(Transaction *txn, - const std::string &table, - const std::string &key, - const std::vector *fields, - std::vector &result) - { + virtual int Read(Transaction *txn, const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { return Read(table, key, fields, result); } /// @@ -138,14 +131,10 @@ class DB { /// pairs for one record /// @return Zero on success, or a non-zero error code on error. /// - virtual int - Scan(Transaction *txn, - const std::string &table, - const std::string &key, - int record_count, - const std::vector *fields, - std::vector> &result) - { + virtual int Scan(Transaction *txn, const std::string &table, + const std::string &key, int record_count, + const std::vector *fields, + std::vector> &result) { return Scan(table, key, record_count, fields, result); } @@ -160,12 +149,8 @@ class DB { /// @param values A vector of field/value pairs to update in the record. /// @return Zero on success, a non-zero error code on error. /// - virtual int - Update(Transaction *txn, - const std::string &table, - const std::string &key, - std::vector &values) - { + virtual int Update(Transaction *txn, const std::string &table, + const std::string &key, std::vector &values) { return Update(table, key, values); } /// @@ -178,12 +163,8 @@ class DB { /// @param values A vector of field/value pairs to insert in the record. /// @return Zero on success, a non-zero error code on error. /// - virtual int - Insert(Transaction *txn, - const std::string &table, - const std::string &key, - std::vector &values) - { + virtual int Insert(Transaction *txn, const std::string &table, + const std::string &key, std::vector &values) { return Insert(table, key, values); } /// @@ -194,13 +175,12 @@ class DB { /// @param key The key of the record to delete. /// @return Zero on success, a non-zero error code on error. /// - virtual int - Delete(Transaction *txn, const std::string &table, const std::string &key) - { + virtual int Delete(Transaction *txn, const std::string &table, + const std::string &key) { return Delete(table, key); } }; -} // ycsbc +} // namespace ycsbc #endif // YCSB_C_DB_H_ diff --git a/core/transaction.h b/core/transaction.h index f7376d50..9bb68faf 100644 --- a/core/transaction.h +++ b/core/transaction.h @@ -1,10 +1,36 @@ #ifndef YCSB_C_TRANSACTION_H_ #define YCSB_C_TRANSACTION_H_ +#include + +#include "core_workload.h" +#include "db.h" + namespace ycsbc { +struct TransactionOperation { + enum Operation op; + std::string table; + std::string key; + int len; + std::vector values; +}; + class Transaction { public: + Transaction() { next_op = 0; }; + + void SetTransactionOperationsSize(unsigned long size) { ops.resize(size); }; + + unsigned long GetTransactionOperationsSize() { return ops.size(); }; + + TransactionOperation &GetNextOperation() { return ops[next_op++]; } + + TransactionOperation &GetOperation(unsigned long i) { return ops[i]; } + +private: + std::vector ops; + unsigned long next_op; }; } // namespace ycsbc diff --git a/db/optimistic_transaction_rocks_db.cc b/db/optimistic_transaction_rocks_db.cc index 7e3e8bbe..3dc4286a 100644 --- a/db/optimistic_transaction_rocks_db.cc +++ b/db/optimistic_transaction_rocks_db.cc @@ -87,14 +87,15 @@ int OptimisticTransactionRocksDB::Commit(Transaction **txn) { rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; rocksdb::Status s = txn_handle->Commit(); delete txn_handle; - delete *txn; - *txn = NULL; if (s.ok()) { + delete *txn; + *txn = NULL; + return DB::kOK; } - if (s.IsAborted() || s.IsTimedOut()) { + if (s.IsBusy()) { return DB::kErrorConflict; } diff --git a/db/transaction_rocks_db.cc b/db/transaction_rocks_db.cc index 1402b14f..d8003825 100644 --- a/db/transaction_rocks_db.cc +++ b/db/transaction_rocks_db.cc @@ -81,7 +81,9 @@ void TransactionRocksDB::Init() {} void TransactionRocksDB::Close() {} void TransactionRocksDB::Begin(Transaction **txn) { - *txn = new RocksDBTransaction(); + if (*txn == NULL) { + *txn = new RocksDBTransaction(); + } ((RocksDBTransaction *)*txn)->handle = db->BeginTransaction(woptions); } @@ -89,14 +91,15 @@ int TransactionRocksDB::Commit(Transaction **txn) { rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; rocksdb::Status s = txn_handle->Commit(); delete txn_handle; - delete *txn; - *txn = NULL; if (s.ok()) { + delete *txn; + *txn = NULL; + return DB::kOK; } - if (s.IsAborted() || s.IsTimedOut()) { + if (s.IsBusy()) { return DB::kErrorConflict; } diff --git a/db/transaction_rocks_db.h b/db/transaction_rocks_db.h index 3d0371a2..e134ef4b 100644 --- a/db/transaction_rocks_db.h +++ b/db/transaction_rocks_db.h @@ -7,8 +7,8 @@ #define YCSB_C_TRANSACTION_ROCKS_DB_H_ #include "core/db.h" - #include "core/properties.h" +#include "core/transaction.h" #include "rocksdb/db.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/transaction_db.h" @@ -29,32 +29,28 @@ class TransactionRocksDB : public DB { void Close(); int Read(const std::string &table, const std::string &key, - const std::vector *fields, std::vector &result) - { + const std::vector *fields, + std::vector &result) { return DB::kErrorNotSupport; } int Scan(const std::string &table, const std::string &key, int len, const std::vector *fields, - std::vector> &result) - { + std::vector> &result) { return DB::kErrorNotSupport; } int Update(const std::string &table, const std::string &key, - std::vector &values) - { + std::vector &values) { return DB::kErrorNotSupport; } int Insert(const std::string &table, const std::string &key, - std::vector &values) - { + std::vector &values) { return DB::kErrorNotSupport; } - int Delete(const std::string &table, const std::string &key) - { + int Delete(const std::string &table, const std::string &key) { return DB::kErrorNotSupport; } diff --git a/db/transactional_splinter_db.cc b/db/transactional_splinter_db.cc index 163832c8..5580e124 100644 --- a/db/transactional_splinter_db.cc +++ b/db/transactional_splinter_db.cc @@ -20,180 +20,155 @@ using std::vector; namespace ycsbc { TransactionalSplinterDB::TransactionalSplinterDB(utils::Properties &props, - bool preloaded) -{ - cout << "This is TransacionalSplinterDB\n"; + bool preloaded) { + cout << "This is TransacionalSplinterDB\n"; - uint64_t max_key_size = props.GetIntProperty("splinterdb.max_key_size"); + uint64_t max_key_size = props.GetIntProperty("splinterdb.max_key_size"); - default_data_config_init(max_key_size, &data_cfg); - splinterdb_cfg.filename = props.GetProperty("splinterdb.filename").c_str(); - splinterdb_cfg.cache_size = + default_data_config_init(max_key_size, &data_cfg); + splinterdb_cfg.filename = props.GetProperty("splinterdb.filename").c_str(); + splinterdb_cfg.cache_size = props.GetIntProperty("splinterdb.cache_size_mb") * 1024 * 1024; - splinterdb_cfg.disk_size = + splinterdb_cfg.disk_size = props.GetIntProperty("splinterdb.disk_size_gb") * 1024 * 1024 * 1024; - splinterdb_cfg.data_cfg = &data_cfg; - splinterdb_cfg.heap_handle = NULL; - splinterdb_cfg.heap_id = NULL; - splinterdb_cfg.page_size = props.GetIntProperty("splinterdb.page_size"); - splinterdb_cfg.extent_size = props.GetIntProperty("splinterdb.extent_size"); - splinterdb_cfg.io_flags = props.GetIntProperty("splinterdb.io_flags"); - splinterdb_cfg.io_perms = props.GetIntProperty("splinterdb.io_perms"); - splinterdb_cfg.io_async_queue_depth = + splinterdb_cfg.data_cfg = &data_cfg; + splinterdb_cfg.heap_handle = NULL; + splinterdb_cfg.heap_id = NULL; + splinterdb_cfg.page_size = props.GetIntProperty("splinterdb.page_size"); + splinterdb_cfg.extent_size = props.GetIntProperty("splinterdb.extent_size"); + splinterdb_cfg.io_flags = props.GetIntProperty("splinterdb.io_flags"); + splinterdb_cfg.io_perms = props.GetIntProperty("splinterdb.io_perms"); + splinterdb_cfg.io_async_queue_depth = props.GetIntProperty("splinterdb.io_async_queue_depth"); - splinterdb_cfg.cache_use_stats = + splinterdb_cfg.cache_use_stats = props.GetIntProperty("splinterdb.cache_use_stats"); - splinterdb_cfg.cache_logfile = + splinterdb_cfg.cache_logfile = props.GetProperty("splinterdb.cache_logfile").c_str(); - splinterdb_cfg.btree_rough_count_height = + splinterdb_cfg.btree_rough_count_height = props.GetIntProperty("splinterdb.btree_rough_count_height"); - splinterdb_cfg.filter_remainder_size = + splinterdb_cfg.filter_remainder_size = props.GetIntProperty("splinterdb.filter_remainder_size"); - splinterdb_cfg.filter_index_size = + splinterdb_cfg.filter_index_size = props.GetIntProperty("splinterdb.filter_index_size"); - splinterdb_cfg.use_log = props.GetIntProperty("splinterdb.use_log"); - splinterdb_cfg.memtable_capacity = + splinterdb_cfg.use_log = props.GetIntProperty("splinterdb.use_log"); + splinterdb_cfg.memtable_capacity = props.GetIntProperty("splinterdb.memtable_capacity"); - splinterdb_cfg.fanout = props.GetIntProperty("splinterdb.fanout"); - splinterdb_cfg.max_branches_per_node = + splinterdb_cfg.fanout = props.GetIntProperty("splinterdb.fanout"); + splinterdb_cfg.max_branches_per_node = props.GetIntProperty("splinterdb.max_branches_per_node"); - splinterdb_cfg.use_stats = props.GetIntProperty("splinterdb.use_stats"); - splinterdb_cfg.reclaim_threshold = + splinterdb_cfg.use_stats = props.GetIntProperty("splinterdb.use_stats"); + splinterdb_cfg.reclaim_threshold = props.GetIntProperty("splinterdb.reclaim_threshold"); - if (preloaded) { - assert(!transactional_splinterdb_open(&splinterdb_cfg, &spl)); - } else { - assert(!transactional_splinterdb_create(&splinterdb_cfg, &spl)); - } + if (preloaded) { + assert(!transactional_splinterdb_open(&splinterdb_cfg, &spl)); + } else { + assert(!transactional_splinterdb_create(&splinterdb_cfg, &spl)); + } } -TransactionalSplinterDB::~TransactionalSplinterDB() -{ - transactional_splinterdb_close(&spl); +TransactionalSplinterDB::~TransactionalSplinterDB() { + transactional_splinterdb_close(&spl); } -void -TransactionalSplinterDB::Init() -{ - transactional_splinterdb_register_thread(spl); +void TransactionalSplinterDB::Init() { + transactional_splinterdb_register_thread(spl); } -void -TransactionalSplinterDB::Close() -{ - transactional_splinterdb_deregister_thread(spl); +void TransactionalSplinterDB::Close() { + transactional_splinterdb_deregister_thread(spl); } -int -TransactionalSplinterDB::Read(Transaction *txn, - const string &table, - const string &key, - const vector *fields, - vector &result) -{ - assert(txn != NULL); +int TransactionalSplinterDB::Read(Transaction *txn, const string &table, + const string &key, + const vector *fields, + vector &result) { + assert(txn != NULL); - splinterdb_lookup_result lookup_result; - transactional_splinterdb_lookup_result_init(spl, &lookup_result, 0, NULL); - slice key_slice = slice_create(key.size(), key.c_str()); - // cout << "lookup " << key << endl; + splinterdb_lookup_result lookup_result; + transactional_splinterdb_lookup_result_init(spl, &lookup_result, 0, NULL); + slice key_slice = slice_create(key.size(), key.c_str()); + // cout << "lookup " << key << endl; - transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; - assert(!transactional_splinterdb_lookup( - spl, txn_handle, key_slice, &lookup_result)); - if (!splinterdb_lookup_found(&lookup_result)) { - cout << "FAILED lookup " << key << endl; - assert(0); - } - // cout << "done lookup " << key << endl; - splinterdb_lookup_result_deinit(&lookup_result); - return DB::kOK; + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert(!transactional_splinterdb_lookup(spl, txn_handle, key_slice, + &lookup_result)); + if (!splinterdb_lookup_found(&lookup_result)) { + cout << "FAILED lookup " << key << endl; + assert(0); + } + // cout << "done lookup " << key << endl; + splinterdb_lookup_result_deinit(&lookup_result); + return DB::kOK; } -int -TransactionalSplinterDB::Scan(Transaction *txn, - const string &table, - const string &key, - int len, - const vector *fields, - vector> &result) -{ - assert(txn != NULL); - assert(fields == NULL); +int TransactionalSplinterDB::Scan(Transaction *txn, const string &table, + const string &key, int len, + const vector *fields, + vector> &result) { + assert(txn != NULL); + assert(fields == NULL); - return DB::kErrorNotSupport; + return DB::kErrorNotSupport; } -int -TransactionalSplinterDB::Update(Transaction *txn, - const string &table, - const string &key, - vector &values) -{ - return Insert(txn, table, key, values); +int TransactionalSplinterDB::Update(Transaction *txn, const string &table, + const string &key, vector &values) { + return Insert(txn, table, key, values); } -int -TransactionalSplinterDB::Insert(Transaction *txn, - const string &table, - const string &key, - vector &values) -{ - assert(txn != NULL); - assert(values.size() == 1); +int TransactionalSplinterDB::Insert(Transaction *txn, const string &table, + const string &key, vector &values) { + assert(txn != NULL); + assert(values.size() == 1); - std::string val = values[0].second; - slice key_slice = slice_create(key.size(), key.c_str()); - slice val_slice = slice_create(val.size(), val.c_str()); - // cout << "insert " << key << endl; + std::string val = values[0].second; + slice key_slice = slice_create(key.size(), key.c_str()); + slice val_slice = slice_create(val.size(), val.c_str()); + // cout << "insert " << key << endl; - transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; - assert( + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert( !transactional_splinterdb_insert(spl, txn_handle, key_slice, val_slice)); - // cout << "done insert " << key << endl; + // cout << "done insert " << key << endl; - return DB::kOK; + return DB::kOK; } -int -TransactionalSplinterDB::Delete(Transaction *txn, - const string &table, - const string &key) -{ - slice key_slice = slice_create(key.size(), key.c_str()); +int TransactionalSplinterDB::Delete(Transaction *txn, const string &table, + const string &key) { + slice key_slice = slice_create(key.size(), key.c_str()); - transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; - assert(!transactional_splinterdb_delete(spl, txn_handle, key_slice)); + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert(!transactional_splinterdb_delete(spl, txn_handle, key_slice)); - return DB::kOK; + return DB::kOK; } -void -TransactionalSplinterDB::Begin(Transaction **txn) -{ - *txn = new SplinterDBTransaction(); - transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; - transactional_splinterdb_begin(spl, txn_handle); +void TransactionalSplinterDB::Begin(Transaction **txn) { + if (*txn == NULL) { + *txn = new SplinterDBTransaction(); + } + transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; + transactional_splinterdb_begin(spl, txn_handle); } -int -TransactionalSplinterDB::Commit(Transaction **txn) -{ - transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; - int rc = transactional_splinterdb_commit(spl, txn_handle); - delete *txn; - *txn = NULL; - return rc < 0 ? DB::kErrorConflict : DB::kOK; +int TransactionalSplinterDB::Commit(Transaction **txn) { + transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; + if (transactional_splinterdb_commit(spl, txn_handle) < 0) { + return DB::kErrorConflict; + } + + delete *txn; + *txn = NULL; + return DB::kOK; } -int -TransactionalSplinterDB::Read(const std::string &table, - const std::string &key, - const std::vector *fields, - std::vector &result) -{ - return Read(NULL, table, key, fields, result); +int TransactionalSplinterDB::Read(const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + return Read(NULL, table, key, fields, result); } /// /// Performs a range scan for a set of records in the database. @@ -207,14 +182,11 @@ TransactionalSplinterDB::Read(const std::string &table, /// pairs for one record /// @return Zero on success, or a non-zero error code on error. /// -int -TransactionalSplinterDB::Scan(const std::string &table, - const std::string &key, - int record_count, - const std::vector *fields, - std::vector> &result) -{ - return Scan(NULL, table, key, record_count, fields, result); +int TransactionalSplinterDB::Scan(const std::string &table, + const std::string &key, int record_count, + const std::vector *fields, + std::vector> &result) { + return Scan(NULL, table, key, record_count, fields, result); } /// @@ -227,12 +199,10 @@ TransactionalSplinterDB::Scan(const std::string &table, /// @param values A vector of field/value pairs to update in the record. /// @return Zero on success, a non-zero error code on error. /// -int -TransactionalSplinterDB::Update(const std::string &table, - const std::string &key, - std::vector &values) -{ - return Update(NULL, table, key, values); +int TransactionalSplinterDB::Update(const std::string &table, + const std::string &key, + std::vector &values) { + return Update(NULL, table, key, values); } /// /// Inserts a record into the database. @@ -243,12 +213,10 @@ TransactionalSplinterDB::Update(const std::string &table, /// @param values A vector of field/value pairs to insert in the record. /// @return Zero on success, a non-zero error code on error. /// -int -TransactionalSplinterDB::Insert(const std::string &table, - const std::string &key, - std::vector &values) -{ - return Insert(NULL, table, key, values); +int TransactionalSplinterDB::Insert(const std::string &table, + const std::string &key, + std::vector &values) { + return Insert(NULL, table, key, values); } /// /// Deletes a record from the database. @@ -257,11 +225,9 @@ TransactionalSplinterDB::Insert(const std::string &table, /// @param key The key of the record to delete. /// @return Zero on success, a non-zero error code on error. /// -int -TransactionalSplinterDB::Delete(const std::string &table, - const std::string &key) -{ - return Delete(NULL, table, key); +int TransactionalSplinterDB::Delete(const std::string &table, + const std::string &key) { + return Delete(NULL, table, key); } } // namespace ycsbc diff --git a/db/transactional_splinter_db.h b/db/transactional_splinter_db.h index 58bad2ef..f3ec3a8c 100644 --- a/db/transactional_splinter_db.h +++ b/db/transactional_splinter_db.h @@ -12,6 +12,7 @@ #include "core/db.h" #include "core/properties.h" +#include "core/transaction.h" extern "C" { #include "splinterdb/transaction.h" @@ -24,87 +25,58 @@ namespace ycsbc { class TransactionalSplinterDB : public DB { public: - TransactionalSplinterDB(utils::Properties &props, bool preloaded); - ~TransactionalSplinterDB(); - - void - Init(); - void - Close(); - - int - Read(const std::string &table, - const std::string &key, - const std::vector *fields, - std::vector &result); - - int - Scan(const std::string &table, - const std::string &key, - int len, - const std::vector *fields, - std::vector> &result); - - int - Update(const std::string &table, - const std::string &key, - std::vector &values); - - int - Insert(const std::string &table, - const std::string &key, - std::vector &values); - - int - Delete(const std::string &table, const std::string &key); - - void - Begin(Transaction **txn); - - int - Commit(Transaction **txn); - - int - Read(Transaction *txn, - const std::string &table, - const std::string &key, - const std::vector *fields, - std::vector &result); - - int - Scan(Transaction *txn, - const std::string &table, - const std::string &key, - int len, - const std::vector *fields, - std::vector> &result); - - int - Update(Transaction *txn, - const std::string &table, - const std::string &key, - std::vector &values); - - int - Insert(Transaction *txn, - const std::string &table, - const std::string &key, - std::vector &values); - - int - Delete(Transaction *txn, const std::string &table, const std::string &key); + TransactionalSplinterDB(utils::Properties &props, bool preloaded); + ~TransactionalSplinterDB(); + + void Init(); + void Close(); + + int Read(const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result); + + int Update(const std::string &table, const std::string &key, + std::vector &values); + + int Insert(const std::string &table, const std::string &key, + std::vector &values); + + int Delete(const std::string &table, const std::string &key); + + void Begin(Transaction **txn); + + int Commit(Transaction **txn); + + int Read(Transaction *txn, const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(Transaction *txn, const std::string &table, const std::string &key, + int len, const std::vector *fields, + std::vector> &result); + + int Update(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Insert(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Delete(Transaction *txn, const std::string &table, + const std::string &key); private: - splinterdb_config splinterdb_cfg; - data_config data_cfg; - transactional_splinterdb *spl; + splinterdb_config splinterdb_cfg; + data_config data_cfg; + transactional_splinterdb *spl; }; class SplinterDBTransaction : public Transaction { private: - transaction handle; + transaction handle; - friend TransactionalSplinterDB; + friend TransactionalSplinterDB; }; } // namespace ycsbc From 43ad4aea0405b6b8038f11999c4aa8277e09a3f1 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 6 Sep 2022 15:43:12 -0700 Subject: [PATCH 04/15] Fixed transaction implementation (#3) * Fixed the number of operations for each thread * Added a sample workload spec for transactional databases * Implemented more strict isolation for RocksDB Transaction Based on the examples from the official RocksDB repo, this code implements `Read committed with multiple snapshot`, which is stronger than snapshot isolation/read committed without snapshot. * Minor code updates to handle transaction conflict * Added abort_cnt --- core/client.h | 88 +++++++++++++++++++++++---- core/transaction.h | 17 +++++- db/optimistic_transaction_rocks_db.cc | 5 +- db/transaction_rocks_db.cc | 31 +++++++++- db/transaction_rocks_db.h | 3 + db/transactional_splinter_db.h | 5 ++ workloads/myworkload.spec | 22 +++++++ ycsbc.cc | 10 ++- 8 files changed, 162 insertions(+), 19 deletions(-) create mode 100644 workloads/myworkload.spec diff --git a/core/client.h b/core/client.h index 2fb3c2d2..cd612002 100644 --- a/core/client.h +++ b/core/client.h @@ -14,6 +14,7 @@ #include "transaction.h" #include "utils.h" #include +#include namespace ycsbc { @@ -22,13 +23,19 @@ class Client { Client(DB &db, CoreWorkload &wl) : db_(db), workload_(wl) { workload_.InitKeyBuffer(key); workload_.InitPairs(pairs); + + abort_cnt = 0; } virtual bool DoInsert(); virtual bool DoTransaction(); - virtual ~Client() {} + virtual ~Client() { + Client::total_abort_cnt += abort_cnt; + } + + static std::atomic total_abort_cnt; protected: virtual int TransactionRead(Transaction *txn); virtual int TransactionReadModifyWrite(Transaction *txn); @@ -49,6 +56,8 @@ class Client { CoreWorkload &workload_; std::string key; std::vector pairs; + + unsigned long abort_cnt; }; inline bool Client::DoInsert() { @@ -56,10 +65,9 @@ inline bool Client::DoInsert() { workload_.UpdateValues(pairs); int status = -1; Transaction *txn = NULL; - do { - db_.Begin(&txn); - status = db_.Insert(txn, workload_.NextTable(), key, pairs); - } while (db_.Commit(&txn) == DB::kErrorConflict); + db_.Begin(&txn); + status = db_.Insert(txn, workload_.NextTable(), key, pairs); + db_.Commit(&txn); return (status == DB::kOK); } @@ -69,7 +77,9 @@ inline bool Client::DoTransaction() { db_.Begin(&txn); - txn->SetTransactionOperationsSize(workload_.ops_per_transaction()); + assert(!txn->IsAborted()); + + txn->ReadyToRecordOperations(workload_.ops_per_transaction()); for (int i = 0; i < workload_.ops_per_transaction(); ++i) { switch (workload_.NextOperation()) { case READ: @@ -91,13 +101,21 @@ inline bool Client::DoTransaction() { throw utils::Exception("Operation request is not recognized!"); } assert(status >= 0); + + if (status == DB::kErrorConflict) { + txn->SetAborted(true); + } } bool need_retry = db_.Commit(&txn) == DB::kErrorConflict; while (need_retry) { + ++abort_cnt; + db_.Begin(&txn); + txn->SetAborted(false); + for (unsigned long i = 0; i < txn->GetTransactionOperationsSize(); ++i) { TransactionOperation &top = txn->GetOperation(i); switch (top.op) { @@ -120,12 +138,17 @@ inline bool Client::DoTransaction() { throw utils::Exception("Operation request is not recognized!"); } assert(status >= 0); + + if (status == DB::kErrorConflict) { + txn->SetAborted(true); + break; + } } need_retry = db_.Commit(&txn) == DB::kErrorConflict; } - return (status == DB::kOK); + return true; } inline int Client::TransactionRead(Transaction *txn) { @@ -133,6 +156,11 @@ inline int Client::TransactionRead(Transaction *txn) { top.op = READ; top.table = workload_.NextTable(); top.key = workload_.NextTransactionKey(); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + std::vector result; if (!workload_.read_all_fields()) { std::vector fields; @@ -148,6 +176,17 @@ inline int Client::TransactionReadModifyWrite(Transaction *txn) { top.op = READMODIFYWRITE; top.table = workload_.NextTable(); top.key = workload_.NextTransactionKey(); + + if (workload_.write_all_fields()) { + workload_.BuildValues(top.values); + } else { + workload_.BuildUpdate(top.values); + } + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + std::vector result; if (!workload_.read_all_fields()) { @@ -158,11 +197,6 @@ inline int Client::TransactionReadModifyWrite(Transaction *txn) { db_.Read(txn, top.table, top.key, NULL, result); } - if (workload_.write_all_fields()) { - workload_.BuildValues(top.values); - } else { - workload_.BuildUpdate(top.values); - } return db_.Update(txn, top.table, top.key, top.values); } @@ -172,6 +206,11 @@ inline int Client::TransactionScan(Transaction *txn) { top.table = workload_.NextTable(); top.key = workload_.NextTransactionKey(); top.len = workload_.NextScanLength(); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + std::vector> result; if (!workload_.read_all_fields()) { std::vector fields; @@ -192,6 +231,11 @@ inline int Client::TransactionUpdate(Transaction *txn) { } else { workload_.BuildUpdate(top.values); } + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + return db_.Update(txn, top.table, top.key, top.values); } @@ -203,11 +247,19 @@ inline int Client::TransactionInsert(Transaction *txn) { top.key = key; workload_.BuildValues(top.values); + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + return db_.Insert(txn, top.table, top.key, top.values); } inline int Client::TransactionReadRetry(Transaction *txn, TransactionOperation &top) { + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + std::vector result; if (!workload_.read_all_fields()) { std::vector fields; @@ -220,6 +272,9 @@ inline int Client::TransactionReadRetry(Transaction *txn, inline int Client::TransactionReadModifyWriteRetry(Transaction *txn, TransactionOperation &top) { + if (txn->IsAborted()) { + return DB::kErrorConflict; + } std::vector result; if (!workload_.read_all_fields()) { @@ -235,6 +290,9 @@ inline int Client::TransactionReadModifyWriteRetry(Transaction *txn, inline int Client::TransactionScanRetry(Transaction *txn, TransactionOperation &top) { + if (txn->IsAborted()) { + return DB::kErrorConflict; + } std::vector> result; if (!workload_.read_all_fields()) { std::vector fields; @@ -247,11 +305,17 @@ inline int Client::TransactionScanRetry(Transaction *txn, inline int Client::TransactionUpdateRetry(Transaction *txn, TransactionOperation &top) { + if (txn->IsAborted()) { + return DB::kErrorConflict; + } return db_.Update(txn, top.table, top.key, top.values); } inline int Client::TransactionInsertRetry(Transaction *txn, TransactionOperation &top) { + if (txn->IsAborted()) { + return DB::kErrorConflict; + } return db_.Insert(txn, top.table, top.key, top.values); } diff --git a/core/transaction.h b/core/transaction.h index 9bb68faf..22ce8d57 100644 --- a/core/transaction.h +++ b/core/transaction.h @@ -18,9 +18,14 @@ struct TransactionOperation { class Transaction { public: - Transaction() { next_op = 0; }; + Transaction() : next_op(0), is_aborted(false){}; - void SetTransactionOperationsSize(unsigned long size) { ops.resize(size); }; + virtual ~Transaction(){}; + + void ReadyToRecordOperations(unsigned long size) { + ops.resize(size); + next_op = 0; + }; unsigned long GetTransactionOperationsSize() { return ops.size(); }; @@ -28,9 +33,15 @@ class Transaction { TransactionOperation &GetOperation(unsigned long i) { return ops[i]; } -private: + void SetAborted(bool aborted) { is_aborted = aborted; }; + + bool IsAborted() { return is_aborted; }; + +protected: std::vector ops; unsigned long next_op; + + bool is_aborted; }; } // namespace ycsbc diff --git a/db/optimistic_transaction_rocks_db.cc b/db/optimistic_transaction_rocks_db.cc index 3dc4286a..a98b4788 100644 --- a/db/optimistic_transaction_rocks_db.cc +++ b/db/optimistic_transaction_rocks_db.cc @@ -76,7 +76,9 @@ void OptimisticTransactionRocksDB::Init() {} void OptimisticTransactionRocksDB::Close() {} void OptimisticTransactionRocksDB::Begin(Transaction **txn) { - *txn = new RocksDBTransaction(); + if (*txn == NULL) { + *txn = new RocksDBTransaction(); + } rocksdb::Transaction *rt = db->BeginTransaction(woptions); ((RocksDBTransaction *)*txn)->handle = rt; @@ -112,6 +114,7 @@ int OptimisticTransactionRocksDB::Read(Transaction *txn, string value; rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + txn_handle->SetSnapshot(); rocksdb::ReadOptions roptions_ = roptions; roptions_.snapshot = txn_handle->GetSnapshot(); rocksdb::Status status = diff --git a/db/transaction_rocks_db.cc b/db/transaction_rocks_db.cc index d8003825..50bb1a4d 100644 --- a/db/transaction_rocks_db.cc +++ b/db/transaction_rocks_db.cc @@ -85,10 +85,21 @@ void TransactionRocksDB::Begin(Transaction **txn) { *txn = new RocksDBTransaction(); } ((RocksDBTransaction *)*txn)->handle = db->BeginTransaction(woptions); + ((RocksDBTransaction *)*txn)->handle->SetSnapshot(); + ((RocksDBTransaction *)*txn)->handle->SetSavePoint(); } int TransactionRocksDB::Commit(Transaction **txn) { rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; + + if ((*txn)->IsAborted()) { + txn_handle->RollbackToSavePoint(); + rocksdb::Status s = txn_handle->Commit(); + assert(s.ok()); + delete txn_handle; + return DB::kErrorConflict; + } + rocksdb::Status s = txn_handle->Commit(); delete txn_handle; @@ -115,8 +126,16 @@ int TransactionRocksDB::Read(Transaction *txn, const std::string &table, string value; rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + txn_handle->SetSnapshot(); + rocksdb::ReadOptions roptions_ = roptions; + roptions_.snapshot = txn_handle->GetSnapshot(); rocksdb::Status status = - txn_handle->GetForUpdate(roptions, rocksdb::Slice(key), &value); + txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); + + if (status.IsTimedOut() || status.IsBusy()) { + return DB::kErrorConflict; + } + assert(status.ok() || status.IsNotFound()); // TODO is it expected we're // querying non-existing keys? return DB::kOK; @@ -151,6 +170,11 @@ int TransactionRocksDB::Insert(Transaction *txn, const std::string &table, rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; rocksdb::Status status = txn_handle->Put(rocksdb::Slice(key), rocksdb::Slice(values[0].second)); + + if (status.IsTimedOut() || status.IsBusy()) { + return DB::kErrorConflict; + } + assert(status.ok()); return DB::kOK; } @@ -160,6 +184,11 @@ int TransactionRocksDB::Delete(Transaction *txn, const std::string &table, assert(txn != NULL); rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; rocksdb::Status status = txn_handle->Delete(rocksdb::Slice(key)); + + if (status.IsTimedOut() || status.IsBusy()) { + return DB::kErrorConflict; + } + assert(status.ok()); return DB::kOK; } diff --git a/db/transaction_rocks_db.h b/db/transaction_rocks_db.h index e134ef4b..86fbcc0c 100644 --- a/db/transaction_rocks_db.h +++ b/db/transaction_rocks_db.h @@ -148,6 +148,9 @@ class OptimisticTransactionRocksDB : public DB { }; class RocksDBTransaction : public Transaction { +public: + RocksDBTransaction() : Transaction(){}; + private: rocksdb::Transaction *handle; diff --git a/db/transactional_splinter_db.h b/db/transactional_splinter_db.h index f3ec3a8c..b69d6320 100644 --- a/db/transactional_splinter_db.h +++ b/db/transactional_splinter_db.h @@ -73,6 +73,11 @@ class TransactionalSplinterDB : public DB { }; class SplinterDBTransaction : public Transaction { +public: + SplinterDBTransaction() : Transaction(){}; + + ~SplinterDBTransaction(){}; + private: transaction handle; diff --git a/workloads/myworkload.spec b/workloads/myworkload.spec new file mode 100644 index 00000000..a338f32c --- /dev/null +++ b/workloads/myworkload.spec @@ -0,0 +1,22 @@ +# Yahoo! Cloud System Benchmark +# Workload A: Update heavy workload +# Application example: Session store recording recent actions +# +# Read/update ratio: 50/50 +# Default data size: 1 KB records (10 fields, 100 bytes each, plus key) +# Request distribution: zipfian + +recordcount=1024 +operationcount=8192 +workload=com.yahoo.ycsb.workloads.CoreWorkload +fieldcount=1 +fieldlength=1024 + +opspertransaction=16 + +readallfields=true +requestdistribution=zipfian +readproportion=0.5 +updateproportion=0.5 +scanproportion=0 +insertproportion=0 diff --git a/ycsbc.cc b/ycsbc.cc index 42ae4480..c4fe6e9c 100644 --- a/ycsbc.cc +++ b/ycsbc.cc @@ -128,6 +128,8 @@ int DelegateClient(ycsbc::DB *db, ycsbc::CoreWorkload *wl, const uint64_t num_op return oks; } +std::atomic ycsbc::Client::total_abort_cnt = 0; + int main(const int argc, const char *argv[]) { utils::Properties props; WorkloadProperties load_workload; @@ -203,6 +205,7 @@ int main(const int argc, const char *argv[]) { } actual_ops.clear(); total_ops = stoi(workload.props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]); + uint64_t ops_per_transactions = stoi(workload.props[ycsbc::CoreWorkload::OPS_PER_TRANSACTION_PROPERTY]); timer.Start(); { cerr << "# Transaction count:\t" << total_ops << endl; @@ -211,14 +214,15 @@ int main(const int argc, const char *argv[]) { for (unsigned int i = 0; i < num_threads; ++i) { uint64_t start_op = (total_ops * i) / num_threads; uint64_t end_op = (total_ops * (i + 1)) / num_threads; + uint64_t num_transactions = (end_op - start_op) / ops_per_transactions; actual_ops.emplace_back(async(launch::async, - DelegateClient, db, &wls[i], end_op - start_op, false, pmode, total_ops, &run_progress, &last_printed)); + DelegateClient, db, &wls[i], num_transactions, false, pmode, total_ops, &run_progress, &last_printed)); } assert(actual_ops.size() == num_threads); sum = 0; for (auto &n : actual_ops) { assert(n.valid()); - sum += n.get(); + sum += n.get() * ops_per_transactions; } if (pmode != no_progress) { cout << "\n"; @@ -229,6 +233,8 @@ int main(const int argc, const char *argv[]) { cerr << "# Transaction throughput (KTPS)" << endl; cerr << props["dbname"] << '\t' << workload.filename << '\t' << num_threads << '\t'; cerr << sum / run_duration / 1000 << endl; + + cerr << "# Abort count:\t" << ycsbc::Client::total_abort_cnt << '\n'; } delete db; From e4ad37673dad9115615b92a9dfe02354a23a6dfd Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 6 Sep 2022 16:46:57 -0700 Subject: [PATCH 05/15] Fixed an error when running non-transactional databases (#4) * Fixed an error when non-transactional databases * Minor fixes for RocksDB to get snapshots --- core/client.h | 208 ++++++++++++++++++-------- db/optimistic_transaction_rocks_db.cc | 2 +- db/transaction_rocks_db.cc | 2 +- 3 files changed, 150 insertions(+), 62 deletions(-) diff --git a/core/client.h b/core/client.h index cd612002..974b8a93 100644 --- a/core/client.h +++ b/core/client.h @@ -13,8 +13,8 @@ #include "db.h" #include "transaction.h" #include "utils.h" -#include #include +#include namespace ycsbc { @@ -30,12 +30,10 @@ class Client { virtual bool DoInsert(); virtual bool DoTransaction(); - virtual ~Client() { - Client::total_abort_cnt += abort_cnt; - } + virtual ~Client() { Client::total_abort_cnt += abort_cnt; } - static std::atomic total_abort_cnt; + protected: virtual int TransactionRead(Transaction *txn); virtual int TransactionReadModifyWrite(Transaction *txn); @@ -77,9 +75,9 @@ inline bool Client::DoTransaction() { db_.Begin(&txn); - assert(!txn->IsAborted()); - - txn->ReadyToRecordOperations(workload_.ops_per_transaction()); + if (txn != NULL) { + txn->ReadyToRecordOperations(workload_.ops_per_transaction()); + } for (int i = 0; i < workload_.ops_per_transaction(); ++i) { switch (workload_.NextOperation()) { case READ: @@ -111,7 +109,7 @@ inline bool Client::DoTransaction() { while (need_retry) { ++abort_cnt; - + db_.Begin(&txn); txn->SetAborted(false); @@ -152,110 +150,188 @@ inline bool Client::DoTransaction() { } inline int Client::TransactionRead(Transaction *txn) { - TransactionOperation &top = txn->GetNextOperation(); - top.op = READ; - top.table = workload_.NextTable(); - top.key = workload_.NextTransactionKey(); + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = READ; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } - if (txn->IsAborted()) { - return DB::kErrorConflict; + std::vector result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Read(txn, top.table, top.key, &fields, result); + } else { + return db_.Read(txn, top.table, top.key, NULL, result); + } } + const std::string &table = workload_.NextTable(); + const std::string &key = workload_.NextTransactionKey(); + std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Read(txn, top.table, top.key, &fields, result); + return db_.Read(txn, table, key, &fields, result); } else { - return db_.Read(txn, top.table, top.key, NULL, result); + return db_.Read(txn, table, key, NULL, result); } } inline int Client::TransactionReadModifyWrite(Transaction *txn) { - TransactionOperation &top = txn->GetNextOperation(); - top.op = READMODIFYWRITE; - top.table = workload_.NextTable(); - top.key = workload_.NextTransactionKey(); + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = READMODIFYWRITE; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + + if (workload_.write_all_fields()) { + workload_.BuildValues(top.values); + } else { + workload_.BuildUpdate(top.values); + } - if (workload_.write_all_fields()) { - workload_.BuildValues(top.values); - } else { - workload_.BuildUpdate(top.values); - } + if (txn->IsAborted()) { + return DB::kErrorConflict; + } - if (txn->IsAborted()) { - return DB::kErrorConflict; + std::vector result; + + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + db_.Read(txn, top.table, top.key, &fields, result); + } else { + db_.Read(txn, top.table, top.key, NULL, result); + } + + return db_.Update(txn, top.table, top.key, top.values); } + const std::string &table = workload_.NextTable(); + const std::string &key = workload_.NextTransactionKey(); + std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - db_.Read(txn, top.table, top.key, &fields, result); + db_.Read(txn, table, key, &fields, result); } else { - db_.Read(txn, top.table, top.key, NULL, result); + db_.Read(txn, table, key, NULL, result); } - return db_.Update(txn, top.table, top.key, top.values); + std::vector values; + if (workload_.write_all_fields()) { + workload_.BuildValues(values); + } else { + workload_.BuildUpdate(values); + } + + return db_.Update(txn, table, key, values); } inline int Client::TransactionScan(Transaction *txn) { - TransactionOperation &top = txn->GetNextOperation(); - top.op = SCAN; - top.table = workload_.NextTable(); - top.key = workload_.NextTransactionKey(); - top.len = workload_.NextScanLength(); + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = SCAN; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + top.len = workload_.NextScanLength(); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } - if (txn->IsAborted()) { - return DB::kErrorConflict; + std::vector> result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Scan(txn, top.table, top.key, top.len, &fields, result); + } else { + return db_.Scan(txn, top.table, top.key, top.len, NULL, result); + } } + const std::string &table = workload_.NextTable(); + const std::string &key = workload_.NextTransactionKey(); + uint64_t len = workload_.NextScanLength(); + std::vector> result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Scan(txn, top.table, top.key, top.len, &fields, result); + return db_.Scan(txn, table, key, len, &fields, result); } else { - return db_.Scan(txn, top.table, top.key, top.len, NULL, result); + return db_.Scan(txn, table, key, len, NULL, result); } } inline int Client::TransactionUpdate(Transaction *txn) { - TransactionOperation &top = txn->GetNextOperation(); - top.op = UPDATE; - top.table = workload_.NextTable(); - top.key = workload_.NextTransactionKey(); - if (workload_.write_all_fields()) { - workload_.BuildValues(top.values); - } else { - workload_.BuildUpdate(top.values); + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = UPDATE; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + if (workload_.write_all_fields()) { + workload_.BuildValues(top.values); + } else { + workload_.BuildUpdate(top.values); + } + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + return db_.Update(txn, top.table, top.key, top.values); } - if (txn->IsAborted()) { - return DB::kErrorConflict; + const std::string &table = workload_.NextTable(); + const std::string &key = workload_.NextTransactionKey(); + + std::vector values; + if (workload_.write_all_fields()) { + workload_.BuildValues(values); + } else { + workload_.BuildUpdate(values); } - return db_.Update(txn, top.table, top.key, top.values); + return db_.Update(txn, table, key, values); } inline int Client::TransactionInsert(Transaction *txn) { - TransactionOperation &top = txn->GetNextOperation(); - top.op = INSERT; - top.table = workload_.NextTable(); - workload_.NextSequenceKey(key); - top.key = key; - workload_.BuildValues(top.values); + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = INSERT; + top.table = workload_.NextTable(); + workload_.NextSequenceKey(key); + top.key = key; + workload_.BuildValues(top.values); - if (txn->IsAborted()) { - return DB::kErrorConflict; + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + return db_.Insert(txn, top.table, top.key, top.values); } - return db_.Insert(txn, top.table, top.key, top.values); + const std::string &table = workload_.NextTable(); + workload_.NextSequenceKey(key); + std::vector values; + workload_.BuildValues(values); + + return db_.Insert(txn, table, key, values); } inline int Client::TransactionReadRetry(Transaction *txn, TransactionOperation &top) { + assert(txn != NULL); + if (txn->IsAborted()) { return DB::kErrorConflict; } @@ -272,9 +348,12 @@ inline int Client::TransactionReadRetry(Transaction *txn, inline int Client::TransactionReadModifyWriteRetry(Transaction *txn, TransactionOperation &top) { + assert(txn != NULL); + if (txn->IsAborted()) { return DB::kErrorConflict; } + std::vector result; if (!workload_.read_all_fields()) { @@ -290,9 +369,12 @@ inline int Client::TransactionReadModifyWriteRetry(Transaction *txn, inline int Client::TransactionScanRetry(Transaction *txn, TransactionOperation &top) { + assert(txn != NULL); + if (txn->IsAborted()) { return DB::kErrorConflict; } + std::vector> result; if (!workload_.read_all_fields()) { std::vector fields; @@ -305,17 +387,23 @@ inline int Client::TransactionScanRetry(Transaction *txn, inline int Client::TransactionUpdateRetry(Transaction *txn, TransactionOperation &top) { + assert(txn != NULL); + if (txn->IsAborted()) { return DB::kErrorConflict; } + return db_.Update(txn, top.table, top.key, top.values); } inline int Client::TransactionInsertRetry(Transaction *txn, TransactionOperation &top) { + assert(txn != NULL); + if (txn->IsAborted()) { return DB::kErrorConflict; } + return db_.Insert(txn, top.table, top.key, top.values); } diff --git a/db/optimistic_transaction_rocks_db.cc b/db/optimistic_transaction_rocks_db.cc index a98b4788..7488e137 100644 --- a/db/optimistic_transaction_rocks_db.cc +++ b/db/optimistic_transaction_rocks_db.cc @@ -116,7 +116,7 @@ int OptimisticTransactionRocksDB::Read(Transaction *txn, rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; txn_handle->SetSnapshot(); rocksdb::ReadOptions roptions_ = roptions; - roptions_.snapshot = txn_handle->GetSnapshot(); + roptions_.snapshot = db->GetSnapshot(); rocksdb::Status status = txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); assert(status.ok() || status.IsNotFound()); // TODO is it expected we're diff --git a/db/transaction_rocks_db.cc b/db/transaction_rocks_db.cc index 50bb1a4d..986d2261 100644 --- a/db/transaction_rocks_db.cc +++ b/db/transaction_rocks_db.cc @@ -128,7 +128,7 @@ int TransactionRocksDB::Read(Transaction *txn, const std::string &table, rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; txn_handle->SetSnapshot(); rocksdb::ReadOptions roptions_ = roptions; - roptions_.snapshot = txn_handle->GetSnapshot(); + roptions_.snapshot = db->GetSnapshot(); rocksdb::Status status = txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); From 18db9e8970da145cd267322f98447117e0f50570 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 11 Oct 2022 13:27:05 -0700 Subject: [PATCH 06/15] Implemented various isolation levels (#5) * Implemented various isolation levels This commit makes transactional splinterdb and rocksdb support various isolation levels. The scripts, `run_rocksdb.sh` and `run_splinterdb.sh`, will run ycsb benchs with varying parameters. * fixed a bug when running without ops_per_transaction --- db/optimistic_transaction_rocks_db.cc | 21 ++- db/transaction_rocks_db.cc | 33 +++- db/transaction_rocks_db.h | 9 ++ db/transactional_splinter_db.cc | 4 + run_rocksdb.sh | 31 ++++ run_splinterdb.sh | 26 +++ ycsbc.cc | 222 +++++++++++++++----------- 7 files changed, 247 insertions(+), 99 deletions(-) create mode 100755 run_rocksdb.sh create mode 100755 run_splinterdb.sh diff --git a/db/optimistic_transaction_rocks_db.cc b/db/optimistic_transaction_rocks_db.cc index 7488e137..a5ceb15d 100644 --- a/db/optimistic_transaction_rocks_db.cc +++ b/db/optimistic_transaction_rocks_db.cc @@ -46,6 +46,9 @@ void OptimisticTransactionRocksDB::InitializeOptions(utils::Properties &props) { // ignore it here -- loaded above } else if (tuple.first == "rocksdb.database_filename") { // ignore it, used in constructor + } else if (tuple.first == "rocksdb.isolation_level") { + isol_level = (RocksDBIsolationLevel)props.GetIntProperty( + "rocksdb.isolation_level"); } else if (tuple.first.find("rocksdb.") == 0) { std::cout << "Unknown rocksdb config option " << tuple.first << std::endl; assert(0); @@ -67,6 +70,7 @@ OptimisticTransactionRocksDB::OptimisticTransactionRocksDB( rocksdb::Status status = rocksdb::OptimisticTransactionDB::Open(options, database_filename, &db); assert(status.ok()); + assert(isol_level != ROCKSDB_ISOLATION_LEVEL_INVALID); } OptimisticTransactionRocksDB::~OptimisticTransactionRocksDB() { delete db; } @@ -82,7 +86,11 @@ void OptimisticTransactionRocksDB::Begin(Transaction **txn) { rocksdb::Transaction *rt = db->BeginTransaction(woptions); ((RocksDBTransaction *)*txn)->handle = rt; - rt->SetSnapshot(); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + rt->SetSnapshot(); + } } int OptimisticTransactionRocksDB::Commit(Transaction **txn) { @@ -114,9 +122,16 @@ int OptimisticTransactionRocksDB::Read(Transaction *txn, string value; rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; - txn_handle->SetSnapshot(); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->SetSnapshot(); + } rocksdb::ReadOptions roptions_ = roptions; - roptions_.snapshot = db->GetSnapshot(); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + roptions_.snapshot = db->GetSnapshot(); + } rocksdb::Status status = txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); assert(status.ok() || status.IsNotFound()); // TODO is it expected we're diff --git a/db/transaction_rocks_db.cc b/db/transaction_rocks_db.cc index 986d2261..1f8ddafc 100644 --- a/db/transaction_rocks_db.cc +++ b/db/transaction_rocks_db.cc @@ -51,6 +51,9 @@ void TransactionRocksDB::InitializeOptions(utils::Properties &props) { long transaction_lock_timeout = props.GetIntProperty( "rocksdb.txndb_options.transaction_lock_timeout"); txndb_options.transaction_lock_timeout = transaction_lock_timeout; + } else if (tuple.first == "rocksdb.isolation_level") { + isol_level = (RocksDBIsolationLevel)props.GetIntProperty( + "rocksdb.isolation_level"); } else if (tuple.first.find("rocksdb.") == 0) { std::cout << "Unknown rocksdb config option " << tuple.first << std::endl; assert(0); @@ -72,6 +75,7 @@ TransactionRocksDB::TransactionRocksDB(utils::Properties &props, rocksdb::Status status = rocksdb::TransactionDB::Open(options, txndb_options, database_filename, &db); assert(status.ok()); + assert(isol_level != ROCKSDB_ISOLATION_LEVEL_INVALID); } TransactionRocksDB::~TransactionRocksDB() { delete db; } @@ -85,15 +89,22 @@ void TransactionRocksDB::Begin(Transaction **txn) { *txn = new RocksDBTransaction(); } ((RocksDBTransaction *)*txn)->handle = db->BeginTransaction(woptions); - ((RocksDBTransaction *)*txn)->handle->SetSnapshot(); - ((RocksDBTransaction *)*txn)->handle->SetSavePoint(); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + ((RocksDBTransaction *)*txn)->handle->SetSnapshot(); + ((RocksDBTransaction *)*txn)->handle->SetSavePoint(); + } } int TransactionRocksDB::Commit(Transaction **txn) { rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; if ((*txn)->IsAborted()) { - txn_handle->RollbackToSavePoint(); + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->RollbackToSavePoint(); + } rocksdb::Status s = txn_handle->Commit(); assert(s.ok()); delete txn_handle; @@ -111,6 +122,11 @@ int TransactionRocksDB::Commit(Transaction **txn) { } if (s.IsBusy()) { + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->Rollback(); + } + return DB::kErrorConflict; } @@ -126,9 +142,16 @@ int TransactionRocksDB::Read(Transaction *txn, const std::string &table, string value; rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; - txn_handle->SetSnapshot(); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->SetSnapshot(); + } rocksdb::ReadOptions roptions_ = roptions; - roptions_.snapshot = db->GetSnapshot(); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + roptions_.snapshot = db->GetSnapshot(); + } rocksdb::Status status = txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); diff --git a/db/transaction_rocks_db.h b/db/transaction_rocks_db.h index 86fbcc0c..4cbfbd74 100644 --- a/db/transaction_rocks_db.h +++ b/db/transaction_rocks_db.h @@ -20,6 +20,13 @@ using std::endl; namespace ycsbc { +typedef enum { + ROCKSDB_ISOLATION_LEVEL_INVALID = 0, + ROCKSDB_ISOLATION_LEVEL_READ_COMMITTED, + ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION, + ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW, +} RocksDBIsolationLevel; + class TransactionRocksDB : public DB { public: TransactionRocksDB(utils::Properties &props, bool preloaded); @@ -82,6 +89,7 @@ class TransactionRocksDB : public DB { rocksdb::ReadOptions roptions; rocksdb::WriteOptions woptions; rocksdb::TransactionDBOptions txndb_options; + RocksDBIsolationLevel isol_level; }; class OptimisticTransactionRocksDB : public DB { @@ -145,6 +153,7 @@ class OptimisticTransactionRocksDB : public DB { rocksdb::Options options; rocksdb::ReadOptions roptions; rocksdb::WriteOptions woptions; + RocksDBIsolationLevel isol_level; }; class RocksDBTransaction : public Transaction { diff --git a/db/transactional_splinter_db.cc b/db/transactional_splinter_db.cc index 5580e124..64ae4396 100644 --- a/db/transactional_splinter_db.cc +++ b/db/transactional_splinter_db.cc @@ -65,6 +65,10 @@ TransactionalSplinterDB::TransactionalSplinterDB(utils::Properties &props, } else { assert(!transactional_splinterdb_create(&splinterdb_cfg, &spl)); } + + transactional_splinterdb_set_isolation_level( + spl, (transaction_isolation_level)props.GetIntProperty( + "splinterdb.isolation_level")); } TransactionalSplinterDB::~TransactionalSplinterDB() { diff --git a/run_rocksdb.sh b/run_rocksdb.sh new file mode 100755 index 00000000..e44ed2b1 --- /dev/null +++ b/run_rocksdb.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +NTHREADS=1 +DB=optimistic_transaction_rocksdb +WL=workloads/myworkload.spec + +PARAMS="-W $WL -L $WL" + +if [ `uname` = FreeBSD ]; then + GETOPT=/usr/local/bin/getopt +else + GETOPT=getopt +fi + +eval set -- "$(${GETOPT} -o i:t:ph -- $@)" + +while true ; do + case "$1" in + -p) DB=transaction_rocksdb; shift 1 ;; + -t) NTHREADS=$2; shift 2 ;; + -i) PARAMS+=" -p rocksdb.isolation_level $2"; shift 2 ;; + -h) printf "$0 options:\n\t-t [# threads]\n\t-i [isolation_level: 1=read_committed, 2=snapshot_isolation, 3=monotonic_atomic_views(default)]\n\t-p: pessimistic transaction db\nExample: $0 -t 4 -i 3 -p\n"; exit ;; + --) shift ; break ;; + esac +done + +./ycsbc -db $DB -threads $NTHREADS $PARAMS + +if [[ -d rocksdb.db ]]; then + rm -rf rocksdb.db +fi diff --git a/run_splinterdb.sh b/run_splinterdb.sh new file mode 100755 index 00000000..cbebc815 --- /dev/null +++ b/run_splinterdb.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +NTHREADS=1 +WL=workloads/myworkload.spec + +PARAMS="-W $WL -L $WL" + +if [ `uname` = FreeBSD ]; then + GETOPT=/usr/local/bin/getopt +else + GETOPT=getopt +fi + +eval set -- "$(${GETOPT} -o i:t:h -- $@)" + +while true ; do + case "$1" in + -t) NTHREADS=$2; shift 2 ;; + -i) PARAMS+=" -p splinterdb.isolation_level $2"; shift 2 ;; + -h) printf "$0 options:\n\t-t [# threads]\n\t-i [isolation_level: 1=serializable(default), 2=snapshot_isolation]\nExample: $0 -t 4 -i 1\n"; exit ;; + --) shift ; break ;; + esac +done + +./ycsbc -db transactional_splinterdb -threads $NTHREADS $PARAMS + diff --git a/ycsbc.cc b/ycsbc.cc index c4fe6e9c..7b823afb 100644 --- a/ycsbc.cc +++ b/ycsbc.cc @@ -6,16 +6,16 @@ // Copyright (c) 2014 Jinglei Ren . // -#include -#include -#include -#include -#include -#include "core/utils.h" -#include "core/timer.h" #include "core/client.h" #include "core/core_workload.h" +#include "core/timer.h" +#include "core/utils.h" #include "db/db_factory.h" +#include +#include +#include +#include +#include using namespace std; @@ -26,49 +26,52 @@ typedef struct WorkloadProperties { } WorkloadProperties; std::map default_props = { - {"threadcount", "1"}, - {"dbname", "basic"}, - {"progress", "none"}, - - // - // Basicdb config defaults - // - {"basicdb.verbose", "0"}, - - // - // splinterdb config defaults - // - {"splinterdb.filename", "splinterdb.db"}, - {"splinterdb.cache_size_mb", "4096"}, - {"splinterdb.disk_size_gb", "128"}, - - {"splinterdb.max_key_size", "24"}, - {"splinterdb.use_log", "1"}, - - // All these options use splinterdb's internal defaults - {"splinterdb.page_size", "0"}, - {"splinterdb.extent_size", "0"}, - {"splinterdb.io_flags", "0"}, - {"splinterdb.io_perms", "0"}, - {"splinterdb.io_async_queue_depth", "0"}, - {"splinterdb.cache_use_stats", "0"}, - {"splinterdb.cache_logfile", "0"}, - {"splinterdb.btree_rough_count_height", "0"}, - {"splinterdb.filter_remainder_size", "0"}, - {"splinterdb.filter_index_size", "0"}, - {"splinterdb.memtable_capacity", "0"}, - {"splinterdb.fanout", "0"}, - {"splinterdb.max_branches_per_node", "0"}, - {"splinterdb.use_stats", "0"}, - {"splinterdb.reclaim_threshold", "0"}, - - {"rocksdb.database_filename", "rocksdb.db"}, + {"threadcount", "1"}, + {"dbname", "basic"}, + {"progress", "none"}, + + // + // Basicdb config defaults + // + {"basicdb.verbose", "0"}, + + // + // splinterdb config defaults + // + {"splinterdb.filename", "splinterdb.db"}, + {"splinterdb.cache_size_mb", "4096"}, + {"splinterdb.disk_size_gb", "128"}, + + {"splinterdb.max_key_size", "24"}, + {"splinterdb.use_log", "1"}, + + // All these options use splinterdb's internal defaults + {"splinterdb.page_size", "0"}, + {"splinterdb.extent_size", "0"}, + {"splinterdb.io_flags", "0"}, + {"splinterdb.io_perms", "0"}, + {"splinterdb.io_async_queue_depth", "0"}, + {"splinterdb.cache_use_stats", "0"}, + {"splinterdb.cache_logfile", "0"}, + {"splinterdb.btree_rough_count_height", "0"}, + {"splinterdb.filter_remainder_size", "0"}, + {"splinterdb.filter_index_size", "0"}, + {"splinterdb.memtable_capacity", "0"}, + {"splinterdb.fanout", "0"}, + {"splinterdb.max_branches_per_node", "0"}, + {"splinterdb.use_stats", "0"}, + {"splinterdb.reclaim_threshold", "0"}, + {"splinterdb.isolation_level", "1"}, + + {"rocksdb.database_filename", "rocksdb.db"}, +// {"rocksdb.isolation_level", "3"}, }; - void UsageMessage(const char *command); bool StrStartWith(const char *str, const char *pre); -void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, WorkloadProperties &load_workload, vector &run_workloads); +void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, + WorkloadProperties &load_workload, + vector &run_workloads); typedef enum progress_mode { no_progress, @@ -76,8 +79,10 @@ typedef enum progress_mode { percent_progress, } progress_mode; -static inline void ReportProgress(progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, uint64_t stepsize, volatile uint64_t *last_printed) -{ +static inline void ReportProgress(progress_mode pmode, uint64_t total_ops, + volatile uint64_t *global_op_counter, + uint64_t stepsize, + volatile uint64_t *last_printed) { uint64_t old_counter = __sync_fetch_and_add(global_op_counter, stepsize); uint64_t new_counter = old_counter + stepsize; if (100 * old_counter / total_ops != 100 * new_counter / total_ops) { @@ -85,29 +90,36 @@ static inline void ReportProgress(progress_mode pmode, uint64_t total_ops, volat cout << "#" << flush; } else if (pmode == percent_progress) { uint64_t my_percent = 100 * new_counter / total_ops; - while (*last_printed + 1 != my_percent) {} + while (*last_printed + 1 != my_percent) { + } cout << 100 * new_counter / total_ops << "%\r" << flush; *last_printed = my_percent; } } } -static inline void ProgressUpdate(progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, uint64_t i, volatile uint64_t *last_printed) -{ +static inline void ProgressUpdate(progress_mode pmode, uint64_t total_ops, + volatile uint64_t *global_op_counter, + uint64_t i, volatile uint64_t *last_printed) { uint64_t sync_interval = 0 < total_ops / 1000 ? total_ops / 1000 : 1; if ((i % sync_interval) == 0) { - ReportProgress(pmode, total_ops, global_op_counter, sync_interval, last_printed); + ReportProgress(pmode, total_ops, global_op_counter, sync_interval, + last_printed); } } -static inline void ProgressFinish(progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, uint64_t i, volatile uint64_t *last_printed) -{ +static inline void ProgressFinish(progress_mode pmode, uint64_t total_ops, + volatile uint64_t *global_op_counter, + uint64_t i, volatile uint64_t *last_printed) { uint64_t sync_interval = 0 < total_ops / 1000 ? total_ops / 1000 : 1; - ReportProgress(pmode, total_ops, global_op_counter, i % sync_interval, last_printed); + ReportProgress(pmode, total_ops, global_op_counter, i % sync_interval, + last_printed); } -int DelegateClient(ycsbc::DB *db, ycsbc::CoreWorkload *wl, const uint64_t num_ops, bool is_loading, - progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, volatile uint64_t *last_printed) { +int DelegateClient(ycsbc::DB *db, ycsbc::CoreWorkload *wl, + const uint64_t num_ops, bool is_loading, progress_mode pmode, + uint64_t total_ops, volatile uint64_t *global_op_counter, + volatile uint64_t *last_printed) { db->Init(); ycsbc::Client client(*db, *wl); uint64_t oks = 0; @@ -155,17 +167,20 @@ int main(const int argc, const char *argv[]) { exit(0); } - record_count = stoi(load_workload.props[ycsbc::CoreWorkload::RECORD_COUNT_PROPERTY]); + record_count = + stoi(load_workload.props[ycsbc::CoreWorkload::RECORD_COUNT_PROPERTY]); uint64_t batch_size = sqrt(record_count); if (record_count / batch_size < num_threads) batch_size = record_count / num_threads; if (batch_size < 1) batch_size = 1; - ycsbc::BatchedCounterGenerator key_generator(load_workload.preloaded ? record_count : 0, batch_size); + ycsbc::BatchedCounterGenerator key_generator( + load_workload.preloaded ? record_count : 0, batch_size); ycsbc::CoreWorkload wls[num_threads]; for (unsigned int i = 0; i < num_threads; ++i) { - wls[i].InitLoadWorkload(load_workload.props, num_threads, i, &key_generator); + wls[i].InitLoadWorkload(load_workload.props, num_threads, i, + &key_generator); } // Perform the Load phase @@ -178,7 +193,9 @@ int main(const int argc, const char *argv[]) { for (unsigned int i = 0; i < num_threads; ++i) { uint64_t start_op = (record_count * i) / num_threads; uint64_t end_op = (record_count * (i + 1)) / num_threads; - actual_ops.emplace_back(async(launch::async, DelegateClient, db, &wls[i], end_op - start_op, true, pmode, record_count, &load_progress, &last_printed)); + actual_ops.emplace_back( + async(launch::async, DelegateClient, db, &wls[i], end_op - start_op, + true, pmode, record_count, &load_progress, &last_printed)); } assert(actual_ops.size() == num_threads); sum = 0; @@ -192,11 +209,11 @@ int main(const int argc, const char *argv[]) { } double load_duration = timer.End(); cerr << "# Load throughput (KTPS)" << endl; - cerr << props["dbname"] << '\t' << load_workload.filename << '\t' << num_threads << '\t'; + cerr << props["dbname"] << '\t' << load_workload.filename << '\t' + << num_threads << '\t'; cerr << sum / load_duration / 1000 << endl; } - // Perform any Run phases for (unsigned int i = 0; i < run_workloads.size(); i++) { auto workload = run_workloads[i]; @@ -204,8 +221,11 @@ int main(const int argc, const char *argv[]) { wls[i].InitRunWorkload(workload.props, num_threads, i); } actual_ops.clear(); - total_ops = stoi(workload.props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]); - uint64_t ops_per_transactions = stoi(workload.props[ycsbc::CoreWorkload::OPS_PER_TRANSACTION_PROPERTY]); + total_ops = + stoi(workload.props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]); + uint64_t ops_per_transactions = + stoi(workload.props.GetProperty(ycsbc::CoreWorkload::OPS_PER_TRANSACTION_PROPERTY, + ycsbc::CoreWorkload::OPS_PER_TRANSACTION_DEFAULT)); timer.Start(); { cerr << "# Transaction count:\t" << total_ops << endl; @@ -214,9 +234,10 @@ int main(const int argc, const char *argv[]) { for (unsigned int i = 0; i < num_threads; ++i) { uint64_t start_op = (total_ops * i) / num_threads; uint64_t end_op = (total_ops * (i + 1)) / num_threads; - uint64_t num_transactions = (end_op - start_op) / ops_per_transactions; - actual_ops.emplace_back(async(launch::async, - DelegateClient, db, &wls[i], num_transactions, false, pmode, total_ops, &run_progress, &last_printed)); + uint64_t num_transactions = (end_op - start_op) / ops_per_transactions; + actual_ops.emplace_back(async(launch::async, DelegateClient, db, + &wls[i], num_transactions, false, pmode, + total_ops, &run_progress, &last_printed)); } assert(actual_ops.size() == num_threads); sum = 0; @@ -231,7 +252,8 @@ int main(const int argc, const char *argv[]) { double run_duration = timer.End(); cerr << "# Transaction throughput (KTPS)" << endl; - cerr << props["dbname"] << '\t' << workload.filename << '\t' << num_threads << '\t'; + cerr << props["dbname"] << '\t' << workload.filename << '\t' << num_threads + << '\t'; cerr << sum / run_duration / 1000 << endl; cerr << "# Abort count:\t" << ycsbc::Client::total_abort_cnt << '\n'; @@ -240,12 +262,14 @@ int main(const int argc, const char *argv[]) { delete db; } -void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, WorkloadProperties &load_workload, vector &run_workloads) { +void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, + WorkloadProperties &load_workload, + vector &run_workloads) { bool saw_load_workload = false; WorkloadProperties *last_workload = NULL; int argindex = 1; - for (auto const & [key, val] : default_props) { + for (auto const &[key, val] : default_props) { props.SetProperty(key, val); } @@ -298,9 +322,9 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo } props.SetProperty("slaves", argv[argindex]); argindex++; - } else if (strcmp(argv[argindex], "-W") == 0 - || strcmp(argv[argindex], "-P") == 0 - || strcmp(argv[argindex], "-L") == 0) { + } else if (strcmp(argv[argindex], "-W") == 0 || + strcmp(argv[argindex], "-P") == 0 || + strcmp(argv[argindex], "-L") == 0) { WorkloadProperties workload; workload.preloaded = strcmp(argv[argindex], "-P") == 0; argindex++; @@ -318,9 +342,9 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo } input.close(); argindex++; - if (strcmp(argv[argindex-2], "-W") == 0) { + if (strcmp(argv[argindex - 2], "-W") == 0) { run_workloads.push_back(workload); - last_workload = &run_workloads[run_workloads.size()-1]; + last_workload = &run_workloads[run_workloads.size() - 1]; } else if (saw_load_workload) { UsageMessage(argv[0]); exit(0); @@ -329,8 +353,8 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo load_workload = workload; last_workload = &load_workload; } - } else if (strcmp(argv[argindex], "-p") == 0 - || strcmp(argv[argindex], "-w") == 0) { + } else if (strcmp(argv[argindex], "-p") == 0 || + strcmp(argv[argindex], "-w") == 0) { argindex++; if (argindex >= argc) { UsageMessage(argv[0]); @@ -343,7 +367,7 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo exit(0); } std::string propval = argv[argindex]; - if (strcmp(argv[argindex-2], "-w") == 0) { + if (strcmp(argv[argindex - 2], "-w") == 0) { if (last_workload) { last_workload->props.SetProperty(propkey, propval); } else { @@ -367,23 +391,39 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo } void UsageMessage(const char *command) { - cout << "Usage: " << command << " [options]" << "-L [-W run-workload.spec] ..." << endl; - cout << " Perform the given Load workload, then each Run workload" << endl; - cout << "Usage: " << command << " [options]" << "-P [-W run-workload.spec] ... " << endl; - cout << " Perform each given Run workload on a database that has been preloaded with the given Load workload" << endl; + cout << "Usage: " << command << " [options]" + << "-L [-W run-workload.spec] ..." << endl; + cout << " Perform the given Load workload, then each Run workload" + << endl; + cout << "Usage: " << command << " [options]" + << "-P [-W run-workload.spec] ... " << endl; + cout << " Perform each given Run workload on a database that has been " + "preloaded with the given Load workload" + << endl; cout << "Options:" << endl; - cout << " -threads : execute using threads (default: " << default_props["threadcount"] << ")" << endl; - cout << " -db : specify the name of the DB to use (default: " << default_props["dbname"] << ")" << endl; - cout << " -L : Initialize the database with the specified Load workload" << endl; - cout << " -P : Indicates that the database has been preloaded with the specified Load workload" << endl; + cout << " -threads : execute using threads (default: " + << default_props["threadcount"] << ")" << endl; + cout << " -db : specify the name of the DB to use (default: " + << default_props["dbname"] << ")" << endl; + cout + << " -L : Initialize the database with the specified Load workload" + << endl; + cout << " -P : Indicates that the database has been preloaded with " + "the specified Load workload" + << endl; cout << " -W : Perform the Run workload specified in " << endl; cout << " -p : set property to value " << endl; - cout << " -w : set a property in the previously specified workload" << endl; - cout << "Exactly one Load workload is allowed, but multiple Run workloads may be given.." << endl; - cout << "Run workloads will be executed in the order given on the command line." << endl; + cout << " -w : set a property in the previously specified " + "workload" + << endl; + cout << "Exactly one Load workload is allowed, but multiple Run workloads " + "may be given.." + << endl; + cout << "Run workloads will be executed in the order given on the command " + "line." + << endl; } inline bool StrStartWith(const char *str, const char *pre) { return strncmp(str, pre, strlen(pre)) == 0; } - From 101ab6939a92fc8ab32e57e35a08a5e6ebf6cceb Mon Sep 17 00:00:00 2001 From: Deukyeon Hwang Date: Mon, 12 Sep 2022 22:23:31 -0700 Subject: [PATCH 07/15] Fixed in order to run non-transactional splinterdb This commit makes transactional splinterdb and rocksdb support various isolation levels. The scripts, `run_rocksdb.sh` and `run_splinterdb.sh`, will run ycsb benchs with varying parameters. --- ycsbc.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ycsbc.cc b/ycsbc.cc index 7b823afb..930bb35d 100644 --- a/ycsbc.cc +++ b/ycsbc.cc @@ -64,7 +64,11 @@ std::map default_props = { {"splinterdb.isolation_level", "1"}, {"rocksdb.database_filename", "rocksdb.db"}, +<<<<<<< HEAD // {"rocksdb.isolation_level", "3"}, +======= + {"rocksdb.isolation_level", "3"}, +>>>>>>> 41abab0 (Implemented various isolation levels) }; void UsageMessage(const char *command); From 087595f595d7dea9439664098a8a75a3a7733d48 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Sat, 1 Oct 2022 15:06:28 +0000 Subject: [PATCH 08/15] fixed a bug when running without ops_per_transaction --- ycsbc.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ycsbc.cc b/ycsbc.cc index 930bb35d..eea522a4 100644 --- a/ycsbc.cc +++ b/ycsbc.cc @@ -67,7 +67,7 @@ std::map default_props = { <<<<<<< HEAD // {"rocksdb.isolation_level", "3"}, ======= - {"rocksdb.isolation_level", "3"}, +// {"rocksdb.isolation_level", "3"}, >>>>>>> 41abab0 (Implemented various isolation levels) }; From ac00f937cc129d6fda5ede6b5af843fa57958197 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Sat, 1 Oct 2022 15:57:00 +0000 Subject: [PATCH 09/15] minor change on the run script --- run_splinterdb.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run_splinterdb.sh b/run_splinterdb.sh index cbebc815..730f55ea 100755 --- a/run_splinterdb.sh +++ b/run_splinterdb.sh @@ -23,4 +23,4 @@ while true ; do done ./ycsbc -db transactional_splinterdb -threads $NTHREADS $PARAMS - +rm -f splinterdb.db From 1dd939e579f51ebacc98627ffc1bd72e64037c15 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Mon, 17 Oct 2022 23:55:34 +0000 Subject: [PATCH 10/15] added an experiment script --- run_individual.sh | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 run_individual.sh diff --git a/run_individual.sh b/run_individual.sh new file mode 100644 index 00000000..9fe800a2 --- /dev/null +++ b/run_individual.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +DB=transactional_splinterdb +THREADS=$1 +FIELDLENGTH=1024 + +RECORDCOUNT=84000000 +TXNPERTREAD=1000000 +OPSPERTRANSACTION=2 +OPERATIONCOUNT=$(($OPSPERTRANSACTION * $THREADS * $TXNPERTREAD)) + +./ycsbc -db $DB -threads $THREADS \ +-L workloads/load.spec \ +-w fieldlength $FIELDLENGTH \ +-w recordcount $RECORDCOUNT \ +-W workloads/workloada.spec \ +-w requestdistribution zipfian \ +-w operationcount $OPERATIONCOUNT \ +-w opspertransaction $OPSPERTRANSACTION \ No newline at end of file From 00c48eaca185ab35280216a0ce88fc15b38292a8 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 18 Oct 2022 00:05:49 +0000 Subject: [PATCH 11/15] Added and fixed scripts for experiments --- run_individual.sh | 4 ++-- run_splinterdb.sh | 30 +++++------------------------- 2 files changed, 7 insertions(+), 27 deletions(-) diff --git a/run_individual.sh b/run_individual.sh index 9fe800a2..0f7e706b 100644 --- a/run_individual.sh +++ b/run_individual.sh @@ -1,7 +1,7 @@ #!/bin/bash -DB=transactional_splinterdb -THREADS=$1 +DB=${1:-"transactional_splinterdb"} +THREADS=${2:-1} FIELDLENGTH=1024 RECORDCOUNT=84000000 diff --git a/run_splinterdb.sh b/run_splinterdb.sh index 730f55ea..982f2bff 100755 --- a/run_splinterdb.sh +++ b/run_splinterdb.sh @@ -1,26 +1,6 @@ -#!/bin/bash +#!/bin/bash -x -NTHREADS=1 -WL=workloads/myworkload.spec - -PARAMS="-W $WL -L $WL" - -if [ `uname` = FreeBSD ]; then - GETOPT=/usr/local/bin/getopt -else - GETOPT=getopt -fi - -eval set -- "$(${GETOPT} -o i:t:h -- $@)" - -while true ; do - case "$1" in - -t) NTHREADS=$2; shift 2 ;; - -i) PARAMS+=" -p splinterdb.isolation_level $2"; shift 2 ;; - -h) printf "$0 options:\n\t-t [# threads]\n\t-i [isolation_level: 1=serializable(default), 2=snapshot_isolation]\nExample: $0 -t 4 -i 1\n"; exit ;; - --) shift ; break ;; - esac -done - -./ycsbc -db transactional_splinterdb -threads $NTHREADS $PARAMS -rm -f splinterdb.db +for t in 1 4 12 +do + bash run_individual.sh transactional_splinterdb $t +done \ No newline at end of file From c3037b7442d26e757dee5bcd87fd039d5aab0132 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 18 Oct 2022 00:51:50 +0000 Subject: [PATCH 12/15] fixed the script --- run_splinterdb.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run_splinterdb.sh b/run_splinterdb.sh index 982f2bff..0717f666 100755 --- a/run_splinterdb.sh +++ b/run_splinterdb.sh @@ -1,6 +1,6 @@ #!/bin/bash -x -for t in 1 4 12 +for t in 1 2 4 8 do bash run_individual.sh transactional_splinterdb $t done \ No newline at end of file From 5f338091d5ce337a98c9d3608e799d9eeb4c1598 Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 18 Oct 2022 01:00:19 +0000 Subject: [PATCH 13/15] removed a merge text --- ycsbc.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ycsbc.cc b/ycsbc.cc index eea522a4..7b823afb 100644 --- a/ycsbc.cc +++ b/ycsbc.cc @@ -64,11 +64,7 @@ std::map default_props = { {"splinterdb.isolation_level", "1"}, {"rocksdb.database_filename", "rocksdb.db"}, -<<<<<<< HEAD // {"rocksdb.isolation_level", "3"}, -======= -// {"rocksdb.isolation_level", "3"}, ->>>>>>> 41abab0 (Implemented various isolation levels) }; void UsageMessage(const char *command); From 5bcabac33309fdf75384d6f4c36d1d12ef6d10ba Mon Sep 17 00:00:00 2001 From: deukyeon Date: Tue, 18 Oct 2022 17:17:37 +0000 Subject: [PATCH 14/15] Updated the run scripts for various parameters --- run_individual.sh | 3 ++- run_splinterdb.sh | 23 +++++++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/run_individual.sh b/run_individual.sh index 0f7e706b..35212207 100644 --- a/run_individual.sh +++ b/run_individual.sh @@ -2,6 +2,7 @@ DB=${1:-"transactional_splinterdb"} THREADS=${2:-1} +DIST=${3:-"uniform"} FIELDLENGTH=1024 RECORDCOUNT=84000000 @@ -14,6 +15,6 @@ OPERATIONCOUNT=$(($OPSPERTRANSACTION * $THREADS * $TXNPERTREAD)) -w fieldlength $FIELDLENGTH \ -w recordcount $RECORDCOUNT \ -W workloads/workloada.spec \ --w requestdistribution zipfian \ +-w requestdistribution $DIST \ -w operationcount $OPERATIONCOUNT \ -w opspertransaction $OPSPERTRANSACTION \ No newline at end of file diff --git a/run_splinterdb.sh b/run_splinterdb.sh index 0717f666..1cc8f794 100755 --- a/run_splinterdb.sh +++ b/run_splinterdb.sh @@ -1,6 +1,25 @@ #!/bin/bash -x -for t in 1 2 4 8 +if [[ "x$1" == "xsplinterdb" || "x$1" == "xtransactional_splinterdb" ]] +then + DB=$1 +else + echo "Usage: $0 [splinterdb|transactional_splinterdb]" + exit 1 +fi + +THREADS=(1 2 4 8) + +echo "Run for the uniform distribution" + +for t in ${THREADS[@]} +do + bash run_individual.sh $DB $t uniform +done + +echo "Run for the zipfian distribution" + +for t in ${THREADS[@]} do - bash run_individual.sh transactional_splinterdb $t + bash run_individual.sh $DB $t zipfian done \ No newline at end of file From bcbd057bae21388f082c4dc61d68da32198ac7ad Mon Sep 17 00:00:00 2001 From: deukyeon Date: Wed, 19 Oct 2022 21:43:05 -0700 Subject: [PATCH 15/15] added a script to parse results and plot --- plot_thoughput.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++ run_splinterdb.sh | 12 ++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 plot_thoughput.py diff --git a/plot_thoughput.py b/plot_thoughput.py new file mode 100644 index 00000000..257e925c --- /dev/null +++ b/plot_thoughput.py @@ -0,0 +1,57 @@ +import sys +import matplotlib.pyplot as plt + +outputfile = sys.argv[1] + +f = open(outputfile, "r") + +lines = f.readlines() + +load_threads = [] +load_tputs = [] +run_threads = [] +run_tputs = [] + +abort_counts = [] + +load_data = False +run_data = False + +for line in lines: + if load_data: + fields = line.split() + load_threads.append(fields[-2]) + load_tputs.append(fields[-1]) + load_data = False + + if line.startswith("# Load throughput (KTPS)"): + load_data = True + + if run_data: + fields = line.split() + run_threads.append(fields[-2]) + run_tputs.append(fields[-1]) + run_data = False + + if line.startswith("# Transaction throughput (KTPS)"): + run_data = True + + if line.startswith("# Abort count"): + fields = line.split() + abort_counts.append(fields[-1]) + +# print csv +print("threads,load,workload,aborts") +for i in range(0, len(load_threads)): + print(load_threads[i], load_tputs[i], + run_tputs[i], abort_counts[i], sep=',') + +plt.plot(load_threads, load_tputs, label='load') +plt.plot(run_threads, run_tputs, label='run') + +plt.ylabel("Throughput(ops/sec)") +plt.xlabel("# of threads") + +plt.legend() + +plt.show() diff --git a/run_splinterdb.sh b/run_splinterdb.sh index 1cc8f794..6fc70976 100755 --- a/run_splinterdb.sh +++ b/run_splinterdb.sh @@ -12,14 +12,22 @@ THREADS=(1 2 4 8) echo "Run for the uniform distribution" +OUT=uniform.out + +rm -f $OUT + for t in ${THREADS[@]} do - bash run_individual.sh $DB $t uniform + bash run_individual.sh $DB $t uniform >> $OUT 2>&1 done +OUT=zipf.out + +rm -f $OUT + echo "Run for the zipfian distribution" for t in ${THREADS[@]} do - bash run_individual.sh $DB $t zipfian + bash run_individual.sh $DB $t zipfian >> $OUT 2>&1 done \ No newline at end of file