Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ydb/core/cms/console/console_configs_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ class TConfigsSubscriber : public TActorBootstrapped<TConfigsSubscriber> {
notChanged = false;
}

if (!notChanged) {
CurrentDynConfig.Clear();
if (rec.HasConfig()) {
CurrentDynConfig.CopyFrom(rec.GetConfig());
}
}

if (rec.VolatileConfigsSize() != VolatileYamlConfigs.size()) {
notChanged = false;
}
Expand Down
43 changes: 43 additions & 0 deletions ydb/core/grpc_services/rpc_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ using TEvExportToYtRequest = TGrpcRequestOperationCall<Ydb::Export::ExportToYtRe
Ydb::Export::ExportToYtResponse>;
using TEvExportToS3Request = TGrpcRequestOperationCall<Ydb::Export::ExportToS3Request,
Ydb::Export::ExportToS3Response>;
using TEvExportToFsRequest = TGrpcRequestOperationCall<Ydb::Export::ExportToFsRequest,
Ydb::Export::ExportToFsResponse>;

template <typename TDerived, typename TEvRequest>
class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TExportConv {
static constexpr bool IsS3Export = std::is_same_v<TEvRequest, TEvExportToS3Request>;
static constexpr bool IsYtExport = std::is_same_v<TEvRequest, TEvExportToYtRequest>;
static constexpr bool IsFsExport = std::is_same_v<TEvRequest, TEvExportToFsRequest>;

struct TExportItemInfo {
TString Destination;
Expand Down Expand Up @@ -102,6 +105,16 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
item->set_destination_prefix(info.Destination);
}
}
if constexpr (IsFsExport) {
auto* exportSettings = createExport.MutableExportToFsSettings();
*exportSettings = request.settings();
exportSettings->clear_items();
for (const auto& [sourcePath, info] : ExportItems) {
auto* item = exportSettings->add_items();
item->set_source_path(sourcePath);
item->set_destination_path(info.Destination);
}
Comment on lines 109 to 116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как будто можно сделать общую функцию для s3 и fs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Отдельным pr-ом сделаю рефакторинг

}

return ev.Release();
}
Expand Down Expand Up @@ -130,6 +143,9 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
if constexpr (IsYtExport) {
it->second.Destination = item.destination_path();
}
if constexpr (IsFsExport) {
it->second.Destination = item.destination_path();
}
}

if constexpr (IsS3Export) {
Expand Down Expand Up @@ -446,6 +462,24 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
}
}
}
if constexpr (IsFsExport) {
if (!settings.base_path().StartsWith("/")) {
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR,
"base_path must be an absolute path");
}

if (settings.compression()) {
StatusIds::StatusCode status;
TString error;
if (!CheckCompression(settings.compression(), status, error)) {
return this->Reply(status, TIssuesIds::DEFAULT_ERROR, error);
}
}

if (settings.items().empty()) {
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set");
}
}

if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) {
if (settings.compression()) {
Expand Down Expand Up @@ -496,6 +530,11 @@ class TExportToS3RPC: public TExportRPC<TExportToS3RPC, TEvExportToS3Request> {
using TExportRPC::TExportRPC;
};

class TExportToFsRPC: public TExportRPC<TExportToFsRPC, TEvExportToFsRequest> {
public:
using TExportRPC::TExportRPC;
};

void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TExportToYtRPC(p.release()));
}
Expand All @@ -504,5 +543,9 @@ void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvid
f.RegisterActor(new TExportToS3RPC(p.release()));
}

void DoExportToFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TExportToFsRPC(p.release()));
}

} // namespace NGRpcService
} // namespace NKikimr
6 changes: 6 additions & 0 deletions ydb/core/grpc_services/rpc_export_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ struct TExportConv: public TOperationConv<NKikimrExport::TExport> {
case NKikimrExport::TExport::kExportToS3Settings:
NOperationId::AddOptionalValue(operationId, "kind", "s3");
break;
case NKikimrExport::TExport::kExportToFsSettings:
NOperationId::AddOptionalValue(operationId, "kind", "fs");
break;
default:
Y_DEBUG_ABORT("Unknown export kind");
break;
Expand Down Expand Up @@ -60,6 +63,9 @@ struct TExportConv: public TOperationConv<NKikimrExport::TExport> {
case NKikimrExport::TExport::kExportToS3Settings:
Fill<ExportToS3Metadata, ExportToS3Result>(operation, in, ClearEncryptionKey(in.GetExportToS3Settings()));
break;
case NKikimrExport::TExport::kExportToFsSettings:
Fill<ExportToFsMetadata, ExportToFsResult>(operation, in, in.GetExportToFsSettings());
break;
default:
Y_DEBUG_ABORT("Unknown export kind");
break;
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/grpc_services/rpc_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ using namespace Ydb;

using TEvImportFromS3Request = TGrpcRequestOperationCall<Ydb::Import::ImportFromS3Request,
Ydb::Import::ImportFromS3Response>;
using TEvImportFromFsRequest = TGrpcRequestOperationCall<Ydb::Import::ImportFromFsRequest,
Ydb::Import::ImportFromFsResponse>;

template <typename TDerived, typename TEvRequest>
class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TImportConv {
static constexpr bool IsS3Import = std::is_same_v<TEvRequest, TEvImportFromS3Request>;
static constexpr bool IsFsImport = std::is_same_v<TEvRequest, TEvImportFromFsRequest>;

TStringBuf GetLogPrefix() const override {
return "[CreateImport]";
Expand All @@ -49,6 +52,9 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
if constexpr (IsS3Import) {
createImport.MutableImportFromS3Settings()->CopyFrom(request.settings());
}
if constexpr (IsFsImport) {
createImport.MutableImportFromFsSettings()->CopyFrom(request.settings());
}

return ev.Release();
}
Expand Down Expand Up @@ -120,6 +126,13 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
}
}

if constexpr (IsFsImport) {
if (!settings.base_path().StartsWith("/")) {
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR,
"base_path must be an absolute path");
}
}

this->AllocateTxId();
this->Become(&TDerived::StateWait);
}
Expand All @@ -139,9 +152,17 @@ class TImportFromS3RPC: public TImportRPC<TImportFromS3RPC, TEvImportFromS3Reque
using TImportRPC::TImportRPC;
};

class TImportFromFsRPC: public TImportRPC<TImportFromFsRPC, TEvImportFromFsRequest> {
public:
using TImportRPC::TImportRPC;
};

void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TImportFromS3RPC(p.release()));
}

void DoImportFromFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TImportFromFsRPC(p.release()));
}
} // namespace NGRpcService
} // namespace NKikimr
6 changes: 6 additions & 0 deletions ydb/core/grpc_services/rpc_import_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ struct TImportConv: public TOperationConv<NKikimrImport::TImport> {
case NKikimrImport::TImport::kImportFromS3Settings:
NOperationId::AddOptionalValue(operationId, "kind", "s3");
break;
case NKikimrImport::TImport::kImportFromFsSettings:
NOperationId::AddOptionalValue(operationId, "kind", "fs");
break;
default:
Y_DEBUG_ABORT("Unknown import kind");
break;
Expand Down Expand Up @@ -54,6 +57,9 @@ struct TImportConv: public TOperationConv<NKikimrImport::TImport> {
case NKikimrImport::TImport::kImportFromS3Settings:
Fill<ImportFromS3Metadata, ImportFromS3Result>(operation, in, ClearEncryptionKey(in.GetImportFromS3Settings()));
break;
case NKikimrImport::TImport::kImportFromFsSettings:
Fill<ImportFromFsMetadata, ImportFromFsResult>(operation, in, in.GetImportFromFsSettings());
break;
default:
Y_DEBUG_ABORT("Unknown import kind");
break;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/service_export.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class IFacilityProvider;

void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoExportToFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

}
}
1 change: 1 addition & 0 deletions ydb/core/grpc_services/service_import.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class IRequestOpCtx;
class IFacilityProvider;

void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoImportFromFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoListObjectsInS3ExportRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/export.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message TExport {
oneof Settings {
Ydb.Export.ExportToYtSettings ExportToYtSettings = 5;
Ydb.Export.ExportToS3Settings ExportToS3Settings = 6;
Ydb.Export.ExportToFsSettings ExportToFsSettings = 11;
}
optional string UserSID = 10;
}
Expand All @@ -28,6 +29,7 @@ message TCreateExportRequest {
oneof Settings {
Ydb.Export.ExportToYtSettings ExportToYtSettings = 2;
Ydb.Export.ExportToS3Settings ExportToS3Settings = 3;
Ydb.Export.ExportToFsSettings ExportToFsSettings = 4;
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/import.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message TImport {
repeated Ydb.Import.ImportItemProgress ItemsProgress = 5;
oneof Settings {
Ydb.Import.ImportFromS3Settings ImportFromS3Settings = 6;
Ydb.Import.ImportFromFsSettings ImportFromFsSettings = 10;
}
optional string UserSID = 9;
}
Expand All @@ -26,6 +27,7 @@ message TCreateImportRequest {
optional Ydb.Operations.OperationParams OperationParams = 1;
oneof Settings {
Ydb.Import.ImportFromS3Settings ImportFromS3Settings = 2;
Ydb.Import.ImportFromFsSettings ImportFromFsSettings = 3;
}
}

Expand Down
18 changes: 18 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_audit_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ TParts ExportKindSpecificParts(const Proto& proto) {
return ExportKindSpecificParts(proto.GetExportToYtSettings());
case Proto::kExportToS3Settings:
return ExportKindSpecificParts(proto.GetExportToS3Settings());
case Proto::kExportToFsSettings:
return ExportKindSpecificParts(proto.GetExportToFsSettings());
case Proto::SETTINGS_NOT_SET:
return {};
}
Expand All @@ -305,6 +307,13 @@ template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToS3Settings
{"export_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_prefix() : "")},
};
}
template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToFsSettings& proto) {
return {
{"export_type", "fs"},
{"export_item_count", ToString(proto.items().size())},
{"export_fs_base_path", proto.base_path()},
};
}

template <class Proto>
TParts ImportKindSpecificParts(const Proto& proto) {
Expand All @@ -313,6 +322,8 @@ TParts ImportKindSpecificParts(const Proto& proto) {
switch (proto.GetSettingsCase()) {
case Proto::kImportFromS3Settings:
return ImportKindSpecificParts(proto.GetImportFromS3Settings());
case Proto::kImportFromFsSettings:
return ImportKindSpecificParts(proto.GetImportFromFsSettings());
case Proto::SETTINGS_NOT_SET:
return {};
}
Expand All @@ -327,6 +338,13 @@ template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromS3Settin
{"import_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).source_prefix() : "")},
};
}
template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromFsSettings& proto) {
return {
{"import_type", "fs"},
{"import_item_count", ToString(proto.items().size())},
{"import_fs_base_path", proto.base_path()},
};
}

} // anonymous namespace

Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/protos/ydb_export.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ message ExportToFsSettings {

// Base path on FS where to write export
// Path to the mounted directory in the case of NFS
// Must be an absolute path
// Example: /mnt/exports
string base_path = 1 [(required) = true];

Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/protos/ydb_import.proto
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ message ImportFromFsSettings {

// Base path on FS where the export is located
// Path to the mounted directory in the case of NFS
// Must be an absolute path
// Example: /mnt/exports
string base_path = 1 [(required) = true];

Expand Down
35 changes: 35 additions & 0 deletions ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/export/export.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,40 @@ class TExportToS3Response : public TOperation {
TMetadata Metadata_;
};

/// FS
struct TExportToFsSettings : public TOperationRequestSettings<TExportToFsSettings> {
using TSelf = TExportToFsSettings;

struct TItem {
std::string Src;
Comment on lines +133 to +134
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TItem struct in TExportToFsSettings lacks documentation. Add comments explaining what Src and Dst represent (e.g., source path in database and destination path on filesystem).

Suggested change
struct TItem {
std::string Src;
struct TItem {
/// Source path in the database to export from.
std::string Src;
/// Destination path on the filesystem to export to.

Copilot uses AI. Check for mistakes.
std::string Dst;
};

FLUENT_SETTING(std::string, BasePath);
FLUENT_SETTING_VECTOR(TItem, Item);
FLUENT_SETTING_OPTIONAL(std::string, Description);
FLUENT_SETTING_OPTIONAL(uint32_t, NumberOfRetries);
FLUENT_SETTING_OPTIONAL(std::string, Compression);
};

class TExportToFsResponse : public TOperation {
public:
struct TMetadata {
TExportToFsSettings Settings;
EExportProgress Progress;
std::vector<TExportItemProgress> ItemsProgress;
};

public:
using TOperation::TOperation;
TExportToFsResponse(TStatus&& status, Ydb::Operations::Operation&& operation);

const TMetadata& Metadata() const;

private:
TMetadata Metadata_;
};

class TExportClient {
class TImpl;

Expand All @@ -134,6 +168,7 @@ class TExportClient {

NThreading::TFuture<TExportToYtResponse> ExportToYt(const TExportToYtSettings& settings);
NThreading::TFuture<TExportToS3Response> ExportToS3(const TExportToS3Settings& settings);
NThreading::TFuture<TExportToFsResponse> ExportToFs(const TExportToFsSettings& settings);

private:
std::shared_ptr<TImpl> Impl_;
Expand Down
Loading
Loading