Skip to content

Commit ed7c647

Browse files
Add handlers for ExportToFs / ImportFromFs (#28126)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent f90b3dc commit ed7c647

File tree

22 files changed

+652
-14
lines changed

22 files changed

+652
-14
lines changed

ydb/core/grpc_services/rpc_export.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ using TEvExportToYtRequest = TGrpcRequestOperationCall<Ydb::Export::ExportToYtRe
2727
Ydb::Export::ExportToYtResponse>;
2828
using TEvExportToS3Request = TGrpcRequestOperationCall<Ydb::Export::ExportToS3Request,
2929
Ydb::Export::ExportToS3Response>;
30+
using TEvExportToFsRequest = TGrpcRequestOperationCall<Ydb::Export::ExportToFsRequest,
31+
Ydb::Export::ExportToFsResponse>;
3032

3133
template <typename TDerived, typename TEvRequest>
3234
class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TExportConv {
3335
static constexpr bool IsS3Export = std::is_same_v<TEvRequest, TEvExportToS3Request>;
3436
static constexpr bool IsYtExport = std::is_same_v<TEvRequest, TEvExportToYtRequest>;
37+
static constexpr bool IsFsExport = std::is_same_v<TEvRequest, TEvExportToFsRequest>;
3538

3639
struct TExportItemInfo {
3740
TString Destination;
@@ -102,6 +105,16 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
102105
item->set_destination_prefix(info.Destination);
103106
}
104107
}
108+
if constexpr (IsFsExport) {
109+
auto* exportSettings = createExport.MutableExportToFsSettings();
110+
*exportSettings = request.settings();
111+
exportSettings->clear_items();
112+
for (const auto& [sourcePath, info] : ExportItems) {
113+
auto* item = exportSettings->add_items();
114+
item->set_source_path(sourcePath);
115+
item->set_destination_path(info.Destination);
116+
}
117+
}
105118

106119
return ev.Release();
107120
}
@@ -130,6 +143,9 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
130143
if constexpr (IsYtExport) {
131144
it->second.Destination = item.destination_path();
132145
}
146+
if constexpr (IsFsExport) {
147+
it->second.Destination = item.destination_path();
148+
}
133149
}
134150

135151
if constexpr (IsS3Export) {
@@ -446,6 +462,24 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
446462
}
447463
}
448464
}
465+
if constexpr (IsFsExport) {
466+
if (!settings.base_path().StartsWith("/")) {
467+
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR,
468+
"base_path must be an absolute path");
469+
}
470+
471+
if (settings.compression()) {
472+
StatusIds::StatusCode status;
473+
TString error;
474+
if (!CheckCompression(settings.compression(), status, error)) {
475+
return this->Reply(status, TIssuesIds::DEFAULT_ERROR, error);
476+
}
477+
}
478+
479+
if (settings.items().empty()) {
480+
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set");
481+
}
482+
}
449483

450484
if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) {
451485
if (settings.compression()) {
@@ -496,6 +530,11 @@ class TExportToS3RPC: public TExportRPC<TExportToS3RPC, TEvExportToS3Request> {
496530
using TExportRPC::TExportRPC;
497531
};
498532

533+
class TExportToFsRPC: public TExportRPC<TExportToFsRPC, TEvExportToFsRequest> {
534+
public:
535+
using TExportRPC::TExportRPC;
536+
};
537+
499538
void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
500539
f.RegisterActor(new TExportToYtRPC(p.release()));
501540
}
@@ -504,5 +543,9 @@ void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvid
504543
f.RegisterActor(new TExportToS3RPC(p.release()));
505544
}
506545

546+
void DoExportToFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
547+
f.RegisterActor(new TExportToFsRPC(p.release()));
548+
}
549+
507550
} // namespace NGRpcService
508551
} // namespace NKikimr

ydb/core/grpc_services/rpc_export_base.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ struct TExportConv: public TOperationConv<NKikimrExport::TExport> {
2525
case NKikimrExport::TExport::kExportToS3Settings:
2626
NOperationId::AddOptionalValue(operationId, "kind", "s3");
2727
break;
28+
case NKikimrExport::TExport::kExportToFsSettings:
29+
NOperationId::AddOptionalValue(operationId, "kind", "fs");
30+
break;
2831
default:
2932
Y_DEBUG_ABORT("Unknown export kind");
3033
break;
@@ -60,6 +63,9 @@ struct TExportConv: public TOperationConv<NKikimrExport::TExport> {
6063
case NKikimrExport::TExport::kExportToS3Settings:
6164
Fill<ExportToS3Metadata, ExportToS3Result>(operation, in, ClearEncryptionKey(in.GetExportToS3Settings()));
6265
break;
66+
case NKikimrExport::TExport::kExportToFsSettings:
67+
Fill<ExportToFsMetadata, ExportToFsResult>(operation, in, in.GetExportToFsSettings());
68+
break;
6369
default:
6470
Y_DEBUG_ABORT("Unknown export kind");
6571
break;

ydb/core/grpc_services/rpc_import.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ using namespace Ydb;
2323

2424
using TEvImportFromS3Request = TGrpcRequestOperationCall<Ydb::Import::ImportFromS3Request,
2525
Ydb::Import::ImportFromS3Response>;
26+
using TEvImportFromFsRequest = TGrpcRequestOperationCall<Ydb::Import::ImportFromFsRequest,
27+
Ydb::Import::ImportFromFsResponse>;
2628

2729
template <typename TDerived, typename TEvRequest>
2830
class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TImportConv {
2931
static constexpr bool IsS3Import = std::is_same_v<TEvRequest, TEvImportFromS3Request>;
32+
static constexpr bool IsFsImport = std::is_same_v<TEvRequest, TEvImportFromFsRequest>;
3033

3134
TStringBuf GetLogPrefix() const override {
3235
return "[CreateImport]";
@@ -49,6 +52,9 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
4952
if constexpr (IsS3Import) {
5053
createImport.MutableImportFromS3Settings()->CopyFrom(request.settings());
5154
}
55+
if constexpr (IsFsImport) {
56+
createImport.MutableImportFromFsSettings()->CopyFrom(request.settings());
57+
}
5258

5359
return ev.Release();
5460
}
@@ -120,6 +126,13 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
120126
}
121127
}
122128

129+
if constexpr (IsFsImport) {
130+
if (!settings.base_path().StartsWith("/")) {
131+
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR,
132+
"base_path must be an absolute path");
133+
}
134+
}
135+
123136
this->AllocateTxId();
124137
this->Become(&TDerived::StateWait);
125138
}
@@ -139,9 +152,17 @@ class TImportFromS3RPC: public TImportRPC<TImportFromS3RPC, TEvImportFromS3Reque
139152
using TImportRPC::TImportRPC;
140153
};
141154

155+
class TImportFromFsRPC: public TImportRPC<TImportFromFsRPC, TEvImportFromFsRequest> {
156+
public:
157+
using TImportRPC::TImportRPC;
158+
};
159+
142160
void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
143161
f.RegisterActor(new TImportFromS3RPC(p.release()));
144162
}
145163

164+
void DoImportFromFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
165+
f.RegisterActor(new TImportFromFsRPC(p.release()));
166+
}
146167
} // namespace NGRpcService
147168
} // namespace NKikimr

ydb/core/grpc_services/rpc_import_base.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ struct TImportConv: public TOperationConv<NKikimrImport::TImport> {
2222
case NKikimrImport::TImport::kImportFromS3Settings:
2323
NOperationId::AddOptionalValue(operationId, "kind", "s3");
2424
break;
25+
case NKikimrImport::TImport::kImportFromFsSettings:
26+
NOperationId::AddOptionalValue(operationId, "kind", "fs");
27+
break;
2528
default:
2629
Y_DEBUG_ABORT("Unknown import kind");
2730
break;
@@ -54,6 +57,9 @@ struct TImportConv: public TOperationConv<NKikimrImport::TImport> {
5457
case NKikimrImport::TImport::kImportFromS3Settings:
5558
Fill<ImportFromS3Metadata, ImportFromS3Result>(operation, in, ClearEncryptionKey(in.GetImportFromS3Settings()));
5659
break;
60+
case NKikimrImport::TImport::kImportFromFsSettings:
61+
Fill<ImportFromFsMetadata, ImportFromFsResult>(operation, in, in.GetImportFromFsSettings());
62+
break;
5763
default:
5864
Y_DEBUG_ABORT("Unknown import kind");
5965
break;

ydb/core/grpc_services/service_export.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class IFacilityProvider;
1010

1111
void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1212
void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
13+
void DoExportToFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1314

1415
}
1516
}

ydb/core/grpc_services/service_import.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class IRequestOpCtx;
99
class IFacilityProvider;
1010

1111
void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
12+
void DoImportFromFsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1213
void DoListObjectsInS3ExportRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1314
void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1415

ydb/core/protos/export.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ message TExport {
1919
oneof Settings {
2020
Ydb.Export.ExportToYtSettings ExportToYtSettings = 5;
2121
Ydb.Export.ExportToS3Settings ExportToS3Settings = 6;
22+
Ydb.Export.ExportToFsSettings ExportToFsSettings = 11;
2223
}
2324
optional string UserSID = 10;
2425
}
@@ -28,6 +29,7 @@ message TCreateExportRequest {
2829
oneof Settings {
2930
Ydb.Export.ExportToYtSettings ExportToYtSettings = 2;
3031
Ydb.Export.ExportToS3Settings ExportToS3Settings = 3;
32+
Ydb.Export.ExportToFsSettings ExportToFsSettings = 4;
3133
}
3234
}
3335

ydb/core/protos/import.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ message TImport {
1818
repeated Ydb.Import.ImportItemProgress ItemsProgress = 5;
1919
oneof Settings {
2020
Ydb.Import.ImportFromS3Settings ImportFromS3Settings = 6;
21+
Ydb.Import.ImportFromFsSettings ImportFromFsSettings = 10;
2122
}
2223
optional string UserSID = 9;
2324
}
@@ -26,6 +27,7 @@ message TCreateImportRequest {
2627
optional Ydb.Operations.OperationParams OperationParams = 1;
2728
oneof Settings {
2829
Ydb.Import.ImportFromS3Settings ImportFromS3Settings = 2;
30+
Ydb.Import.ImportFromFsSettings ImportFromFsSettings = 3;
2931
}
3032
}
3133

ydb/core/tx/schemeshard/schemeshard_audit_log.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,8 @@ TParts ExportKindSpecificParts(const Proto& proto) {
284284
return ExportKindSpecificParts(proto.GetExportToYtSettings());
285285
case Proto::kExportToS3Settings:
286286
return ExportKindSpecificParts(proto.GetExportToS3Settings());
287+
case Proto::kExportToFsSettings:
288+
return ExportKindSpecificParts(proto.GetExportToFsSettings());
287289
case Proto::SETTINGS_NOT_SET:
288290
return {};
289291
}
@@ -305,6 +307,13 @@ template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToS3Settings
305307
{"export_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_prefix() : "")},
306308
};
307309
}
310+
template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToFsSettings& proto) {
311+
return {
312+
{"export_type", "fs"},
313+
{"export_item_count", ToString(proto.items().size())},
314+
{"export_fs_base_path", proto.base_path()},
315+
};
316+
}
308317

309318
template <class Proto>
310319
TParts ImportKindSpecificParts(const Proto& proto) {
@@ -313,6 +322,8 @@ TParts ImportKindSpecificParts(const Proto& proto) {
313322
switch (proto.GetSettingsCase()) {
314323
case Proto::kImportFromS3Settings:
315324
return ImportKindSpecificParts(proto.GetImportFromS3Settings());
325+
case Proto::kImportFromFsSettings:
326+
return ImportKindSpecificParts(proto.GetImportFromFsSettings());
316327
case Proto::SETTINGS_NOT_SET:
317328
return {};
318329
}
@@ -327,6 +338,13 @@ template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromS3Settin
327338
{"import_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).source_prefix() : "")},
328339
};
329340
}
341+
template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromFsSettings& proto) {
342+
return {
343+
{"import_type", "fs"},
344+
{"import_item_count", ToString(proto.items().size())},
345+
{"import_fs_base_path", proto.base_path()},
346+
};
347+
}
330348

331349
} // anonymous namespace
332350

ydb/public/api/protos/ydb_export.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ message ExportToFsSettings {
195195

196196
// Base path on FS where to write export
197197
// Path to the mounted directory in the case of NFS
198+
// Must be an absolute path
198199
// Example: /mnt/exports
199200
string base_path = 1 [(required) = true];
200201

0 commit comments

Comments
 (0)