Skip to content

Commit b9b3351

Browse files
authored
YQ-4844 supported recursive drop of directory with stream queries (#28059)
1 parent 2da2346 commit b9b3351

File tree

4 files changed

+25
-2
lines changed

4 files changed

+25
-2
lines changed

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,15 @@ class TKikimrRunner {
180180
NYdb::TDriver* GetDriverMut() { return Driver.Get(); }
181181
const TString& GetEndpoint() const { return Endpoint; }
182182
const NYdb::TDriver& GetDriver() const { return *Driver; }
183-
NYdb::NScheme::TSchemeClient GetSchemeClient() const { return NYdb::NScheme::TSchemeClient(*Driver); }
184183
Tests::TClient& GetTestClient() const { return *Client; }
185184
Tests::TServer& GetTestServer() const { return *Server; }
186185

187186
NYdb::TDriverConfig GetDriverConfig() const { return DriverConfig; }
188187

188+
NYdb::NScheme::TSchemeClient GetSchemeClient(NYdb::TCommonClientSettings settings = NYdb::TCommonClientSettings()) const {
189+
return NYdb::NScheme::TSchemeClient(*Driver, settings);
190+
}
191+
189192
NYdb::NTable::TTableClient GetTableClient(
190193
NYdb::NTable::TClientSettings settings = NYdb::NTable::TClientSettings()) const {
191194
return NYdb::NTable::TTableClient(*Driver, settings.UseQueryCache(false));

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12809,6 +12809,16 @@ END DO)",
1280912809
});
1281012810
}
1281112811

12812+
{
12813+
auto schemeClient = kikimr->GetSchemeClient(TCommonClientSettings().AuthToken(BUILTIN_ACL_ROOT));
12814+
const auto result = schemeClient.DescribePath("/Root/MyFolder/MyStreamingQuery").ExtractValueSync();
12815+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToOneLineString());
12816+
const auto& entry = result.GetEntry();
12817+
UNIT_ASSERT_VALUES_EQUAL(entry.Name, "MyStreamingQuery");
12818+
UNIT_ASSERT_VALUES_EQUAL(entry.Owner, BUILTIN_ACL_ROOT);
12819+
UNIT_ASSERT_VALUES_EQUAL(entry.Type, NYdb::NScheme::ESchemeEntryType::StreamingQuery);
12820+
}
12821+
1281212822
{
1281312823
const auto result = db.ExecuteQuery(R"(
1281412824
CREATE OR REPLACE STREAMING QUERY `MyFolder/MyStreamingQuery` WITH (

ydb/public/api/protos/ydb_scheme.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ message Entry {
6868
RESOURCE_POOL = 21;
6969
TRANSFER = 23;
7070
SYS_VIEW = 24;
71-
STREAMING_QUERY = 25;
71+
STREAMING_QUERY = 26;
7272
}
7373

7474
// Name of scheme entry (dir2 of /dir1/dir2)

ydb/public/lib/ydb_cli/common/recursive_remove.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ using namespace NScheme;
1313
using namespace NTable;
1414
using namespace NTopic;
1515

16+
namespace {
17+
1618
TStatus RemoveDirectory(TSchemeClient& client, const TString& path, const TRemoveDirectorySettings& settings) {
1719
return RetryFunction([&]() -> TStatus {
1820
return client.RemoveDirectory(path, settings).ExtractValueSync();
@@ -83,12 +85,18 @@ TStatus RemoveTransfer(NQuery::TQueryClient& client, const TString& path, const
8385
return DropSchemeObject("TRANSFER", client, path, settings);
8486
}
8587

88+
TStatus RemoveStreamingQuery(NQuery::TQueryClient& client, const TString& path, const TRemoveDirectorySettings& settings) {
89+
return DropSchemeObject("STREAMING QUERY", client, path, settings);
90+
}
91+
8692
NYdb::NIssue::TIssues MakeIssues(const TString& error) {
8793
NYdb::NIssue::TIssues issues;
8894
issues.AddIssue(NYdb::NIssue::TIssue(error));
8995
return issues;
9096
}
9197

98+
} // anonymous namespace
99+
92100
bool Prompt(const TString& path, ESchemeEntryType type) {
93101
Cout << "Remove " << to_lower(ToString(type)) << " '" << path << "' (y/n)? ";
94102
return AskYesOrNo();
@@ -173,6 +181,8 @@ TStatus Remove(
173181
return Remove(&RemoveReplication, schemeClient, queryClient, type, path, prompt, settings);
174182
case ESchemeEntryType::Transfer:
175183
return Remove(&RemoveTransfer, schemeClient, queryClient, type, path, prompt, settings);
184+
case ESchemeEntryType::StreamingQuery:
185+
return Remove(&RemoveStreamingQuery, schemeClient, queryClient, type, path, prompt, settings);
176186

177187
default:
178188
return TStatus(EStatus::UNSUPPORTED, MakeIssues(TStringBuilder()

0 commit comments

Comments
 (0)