Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
* [BUGFIX] Scheduler: Fix memory leak by properly cleaning up query fragment registry. #7148
* [BUGFIX] Compactor: Add back deletion of partition group info file even if not complete #7157

## 1.20.1 2025-12-03

Expand Down
22 changes: 12 additions & 10 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,28 +788,30 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
isPartitionGroupInfoDeleted := false
partitionedGroupInfoFile := extraInfo.path
deletedBlocksCount := 0

if extraInfo.status.CanDelete {
if extraInfo.status.IsCompleted {
// Try to remove all blocks included in partitioned group info
deletedBlocksCount, err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID)
deletedBlocksCount, err = partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID)
if err != nil {
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
// if one block can not be marked for deletion, we should
// skip delete this partitioned group. next iteration
// would try it again.
continue
}
if deletedBlocksCount > 0 {
level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
}

if deletedBlocksCount > 0 {
level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
} else {
level.Info(userLogger).Log("msg", "deleting partition group because either all associated blocks have been deleted or partition group is invalid", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleting partition group now that all associated blocks have been deleted", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
isPartitionGroupInfoDeleted = true
}
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
isPartitionGroupInfoDeleted = true
}
}
}
Expand Down
Loading