Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <random>

#include "types.h"
#include "utils.h"

namespace py = pybind11;

Expand Down
17 changes: 8 additions & 9 deletions mooncake-store/include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
namespace mooncake {

class PutOperation;
struct PutOp;

/**
* @brief Client for interacting with the mooncake distributed object store
Expand Down Expand Up @@ -247,16 +248,14 @@ class Client {
/**
* @brief Batch put helper methods for structured approach
*/
std::vector<PutOperation> CreatePutOperations(
std::vector<PutOp> makeOps(
const std::vector<ObjectKey>& keys,
const std::vector<std::vector<Slice>>& batched_slices);
void StartBatchPut(std::vector<PutOperation>& ops,
const ReplicateConfig& config);
void SubmitTransfers(std::vector<PutOperation>& ops);
void WaitForTransfers(std::vector<PutOperation>& ops);
void FinalizeBatchPut(std::vector<PutOperation>& ops);
std::vector<tl::expected<void, ErrorCode>> CollectResults(
const std::vector<PutOperation>& ops);
void stageStart(std::vector<PutOp>& ops, const ReplicateConfig& config);
void stageTransfer(std::vector<PutOp>& ops);
void stageEnd(std::vector<PutOp>& ops);
std::vector<tl::expected<void, ErrorCode>> collect(
const std::vector<PutOp>& ops);

// Core components
TransferEngine transfer_engine_;
Expand Down Expand Up @@ -286,4 +285,4 @@ class Client {
UUID client_id_;
};

} // namespace mooncake
} // namespace mooncake
34 changes: 34 additions & 0 deletions mooncake-store/include/client_batch_put.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <glog/logging.h>

#include <span>
#include <vector>
#include <ylt/util/tl/expected.hpp>

#include "transfer_task.h"
#include "types.h"

namespace mooncake {

class Client;

// Calculate total size of all slices in the collection
[[nodiscard]] size_t CalculateSliceSize(const std::vector<Slice>& slices);
[[nodiscard]] size_t CalculateSliceSize(std::span<const Slice> slices);

// Represents a single put operation in a batch
struct PutOp {
explicit PutOp(std::string key, std::vector<Slice> slices)
: key(std::move(key)), slices(std::move(slices)) {}

std::string key; // Object key to store
std::vector<Slice> slices; // Data slices to be stored
std::vector<Replica::Descriptor>
replicas; // Replica locations (filled by stage-1, put start)
std::vector<tl::expected<TransferFuture, ErrorCode>>
replica_futures; // Transfer futures for each replica
tl::expected<void, ErrorCode> result; // Final operation result
};

} // namespace mooncake
22 changes: 4 additions & 18 deletions mooncake-store/include/master_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,17 @@ class MasterClient {
* @param key Object key
* @return tl::expected<void, ErrorCode> indicating success/failure
*/
[[nodiscard]] tl::expected<void, ErrorCode> PutEnd(const std::string& key);
[[nodiscard]] tl::expected<void, ErrorCode> PutEnd(
const std::string& key, const std::vector<PutResult>& put_results);

/**
* @brief Ends a put operation for a batch of objects
* @param keys Vector of object keys
* @return ErrorCode indicating success/failure
*/
[[nodiscard]] std::vector<tl::expected<void, ErrorCode>> BatchPutEnd(
const std::vector<std::string>& keys);

/**
* @brief Revokes a put operation
* @param key Object key
* @return tl::expected<void, ErrorCode> indicating success/failure
*/
[[nodiscard]] tl::expected<void, ErrorCode> PutRevoke(
const std::string& key);

/**
* @brief Revokes a put operation for a batch of objects
* @param keys Vector of object keys
* @return ErrorCode indicating success/failure
*/
[[nodiscard]] std::vector<tl::expected<void, ErrorCode>> BatchPutRevoke(
const std::vector<std::string>& keys);
const std::vector<std::string>& keys,
const std::vector<std::vector<PutResult>>& put_results);

/**
* @brief Removes an object and all its replicas
Expand Down
15 changes: 1 addition & 14 deletions mooncake-store/include/master_metric_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class MasterMetricManager {
void inc_put_start_failures(int64_t val = 1);
void inc_put_end_requests(int64_t val = 1);
void inc_put_end_failures(int64_t val = 1);
void inc_put_revoke_requests(int64_t val = 1);
void inc_put_revoke_failures(int64_t val = 1);
void inc_get_replica_list_requests(int64_t val = 1);
void inc_get_replica_list_failures(int64_t val = 1);
void inc_exist_key_requests(int64_t val = 1);
Expand All @@ -75,17 +73,12 @@ class MasterMetricManager {
void inc_batch_put_start_failures(int64_t val = 1);
void inc_batch_put_end_requests(int64_t val = 1);
void inc_batch_put_end_failures(int64_t val = 1);
void inc_batch_put_revoke_requests(int64_t val = 1);
void inc_batch_put_revoke_failures(int64_t val = 1);


// Operation Statistics Getters
int64_t get_put_start_requests();
int64_t get_put_start_failures();
int64_t get_put_end_requests();
int64_t get_put_end_failures();
int64_t get_put_revoke_requests();
int64_t get_put_revoke_failures();
int64_t get_get_replica_list_requests();
int64_t get_get_replica_list_failures();
int64_t get_exist_key_requests();
Expand All @@ -112,12 +105,10 @@ class MasterMetricManager {
int64_t get_batch_put_start_failures();
int64_t get_batch_put_end_requests();
int64_t get_batch_put_end_failures();
int64_t get_batch_put_revoke_requests();
int64_t get_batch_put_revoke_failures();

// Eviction Metrics
void inc_eviction_success(int64_t key_count, int64_t size);
void inc_eviction_fail(); // not a single object is evicted
void inc_eviction_fail(); // not a single object is evicted

// Eviction Metrics Getters
int64_t get_eviction_success();
Expand Down Expand Up @@ -165,8 +156,6 @@ class MasterMetricManager {
ylt::metric::counter_t put_start_failures_;
ylt::metric::counter_t put_end_requests_;
ylt::metric::counter_t put_end_failures_;
ylt::metric::counter_t put_revoke_requests_;
ylt::metric::counter_t put_revoke_failures_;
ylt::metric::counter_t get_replica_list_requests_;
ylt::metric::counter_t get_replica_list_failures_;
ylt::metric::counter_t exist_key_requests_;
Expand All @@ -193,8 +182,6 @@ class MasterMetricManager {
ylt::metric::counter_t batch_put_start_failures_;
ylt::metric::counter_t batch_put_end_requests_;
ylt::metric::counter_t batch_put_end_failures_;
ylt::metric::counter_t batch_put_revoke_requests_;
ylt::metric::counter_t batch_put_revoke_failures_;

// Eviction Metrics
ylt::metric::counter_t eviction_success_;
Expand Down
31 changes: 6 additions & 25 deletions mooncake-store/include/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace mooncake {
// Forward declarations
class AllocationStrategy;
class EvictionStrategy;
enum class PutResult : uint8_t;

// Structure to store garbage collection tasks
struct GCTask {
Expand Down Expand Up @@ -178,34 +179,13 @@ class MasterService {
-> tl::expected<std::vector<Replica::Descriptor>, ErrorCode>;

/**
* @brief Complete a put operation
* @brief Complete a put operation of a replica
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
auto PutEnd(const std::string& key) -> tl::expected<void, ErrorCode>;

/**
* @brief Revoke a put operation
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
auto PutRevoke(const std::string& key) -> tl::expected<void, ErrorCode>;

/**
* @brief Complete a batch of put operations
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
std::vector<tl::expected<void, ErrorCode>> BatchPutEnd(
const std::vector<std::string>& keys);

/**
* @brief Revoke a batch of put operations
* @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not
* found, ErrorCode::INVALID_WRITE if replica status is invalid
*/
std::vector<tl::expected<void, ErrorCode>> BatchPutRevoke(
const std::vector<std::string>& keys);
auto PutEnd(const std::string& key,
const std::vector<PutResult>& put_success)
-> tl::expected<void, ErrorCode>;

/**
* @brief Remove an object and its replicas
Expand Down Expand Up @@ -442,6 +422,7 @@ class MasterService {
void ClientMonitorFunc();
std::thread client_monitor_thread_;
std::atomic<bool> client_monitor_running_{false};
std::atomic_uint64_t replica_id_allocator_{0};
static constexpr uint64_t kClientMonitorSleepMs =
1000; // 1000 ms sleep between client monitor checks
// boost lockfree queue requires trivial assignment operator
Expand Down
Loading
Loading