Skip to content

Conversation

@CodingCat
Copy link
Contributor

@CodingCat CodingCat commented Dec 16, 2025

Why are the changes needed?

This PR implements proactive deletion of shuffle data once it is no longer needed (failure recovery is not considered).

With Spark’s native shuffle mechanism, shuffle data is only eligible for cleanup when the associated ShuffleDependency object becomes unreachable and is garbage-collected on the driver. In long-running applications with long lineages, ShuffleDependency objects can be retained in the driver heap for a long time due to reference chains in the DAG / query plan, even though the corresponding shuffle files are no longer needed for future computation. This delays shuffle cleanup and causes shuffle data to occupy cluster disks much longer than necessary within the same job.

This becomes especially costly in the “independent deployment” scenario where Celeborn runs as a long-lived service and serves applications that may run for tens of hours: without timely cleanup, obsolete shuffle data from earlier parts of the lineage accumulates on the cluster disks during execution, increasing disk pressure and the overall cost of the service.

What changes were proposed in this pull request?

Shuffle – Stage Dependency Tracking

This PR introduces explicit tracking of the dependency relationship between Spark stages and shuffle data, so that shuffle data can be safely expired once it is no longer needed by any running stage.

The mechanism works as follows:

  • When ClientReader sends a GetShuffleId request, we record that stage X depends on shuffle Y via callbacks into LifecycleManager. This relationship is maintained by a new component, StageDependencyManager.

  • We also rely on SparkListener to track the stage and shuffle dependency by analyzing the shuffle written by the parent stages. This is kind of a best-effort approach to handle extreme cases that some tasks failed due to too early deletion before it sends GetShuffleId for certain shuffle. However, we need to realize that it cannot handle for all cases especially when some parent stages are skipped

  • We use SparkListener to track stage completion events. When a stage finishes, its dependency on the corresponding shuffle is removed. Once no active stages depend on shuffle Y, shuffle Y is considered expired and becomes eligible for deletion.

    • we limit the deletion to only happen for the completion of stages which write bytes in output/shuffle. The reason is that there are many cases that users or the 3rd part libraries just launch jobs with .count, etc. for purposes like stats collection , and they then immediately reuse certain shuffle for real work which dumps the data to S3, etc.

Auto Recover Expired Shuffle

Since shuffle data is now deleted proactively, it is possible that some shuffle files may be removed "too early" under certain conditions, for example:

  • A shuffle is consumed by multiple jobs.

  • A downstream stage fails (e.g., fetch failure) and some upstream stages must be re-executed after its shuffle has already been expired.

To avoid job failures in these scenarios, this PR also introduces an automatic recovery mechanism that allows deleted shuffle data to be reconstructed when it is requested again. The recovery flow works as follows:

  • We detect “too-early deletion” when handling the GetShuffleId request from ClientReader.

  • If a shuffle has already been expired, areAllMapperAttemptsFinished throws an IllegalStageException. When this exception is caught, we instruct ClientReader to throw a FetchFailure after LifecycleManager finishes the recovery preparation, so that the corresponding upstream stages can be retried by Spark. During this recovery preparation phase, we perform one of the following actions:

    • If we have recorded the stage → shuffle dependency for the requesting stage, we invalidate all shuffles that this stage depends on, even if some of them have not yet been reported as deleted. This prevents the stage from being restarted multiple times due to discovering additional missing shuffles one by one, which could otherwise lead to repeated failures and eventually job abortion.
    • If the dependency information is unavailable (e.g., due to listener processing failures), we fall back to invalidating only the reported missing shuffle and regenerate it.
    • we also need to invalidate the shuffle written by the stage reporting the too-early deleted shuffle. The reason is that the shuffle expiration is an asynchronous process, e.g. if the upstream shuffle is deleted after 20% of a determinate stage's tasks have finished, then the restart of the stage will only run 80% of the tasks and write with a new celeborn shuffle id (see the comments about why we cannot reuse the shuffle id)... as a result, the further downstream stage will only get 80% of output which is a data loss scenario

Mechanism to Support More Complex Lineage

We have a special function to offer more flexibilities for users to control when to enable this early deletion feature. For instance, users can launch two jobs reusing a certain users and dumping data respectively. Users can turn on celeborn.client.spark.fetch.shuffleEarlyDeletion.checkProperty and then set System property CELEBORN_EARLY_SHUFFLE_DELETION to true in the second job to avoid too early deletion .

Does this PR resolve a correctness bug?

NO

Does this PR introduce any user-facing change?

several more configs to enable this feature

User Guidance

  • Rolling out to all jobs in one shot is dangerous and may not get the benefit you expected
  • this feature is likely to get you most benefits if you only rollout them to those huge shuffle jobs. For PINS, we have X jobs which shuffle more than Y TBs , only rolling out to them already saved us a huge bill
  • we also got benefit of enabling some jobs which we have never been able to run.... e.g. we have k-ways join jobs which shuffle Z+ PBs , without this feature, we need 100s of servers to exclusively serve this job.... now, as we delete shuffle in a more timely manner, we only need 10s of machines ...
  • fetch failure is the biggest enemy. sometimes, your server just doesn't deliver data... then with this feature , you need to pay more on retry... so you need to ensure that your servers are in a good shape by tuning whatever you need... and also you may want to monitor fetch failure occurrence in cluster wide to take actions when it is needed

How was this patch tested?

integration tests and the production environments in PINS

PbReportMissingShuffleIdResponse.newBuilder().setSuccess(ret).build()
context.reply(pbReportMissingShuffleIdResponse)
/*
val latestUpstreamShuffleId = shuffleIds.maxBy(_._2._1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original dedup logic may suffer from a race condition described as following

stage A depends on shuffle 1, due to "too early deletion", the missing report is sent and handled for shuffle 1, at this point, a new shuffle id is generated, so latestUpstreamShuffleId._2._1 is no longer UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID... the missing report is handled again... then mess up everything

// be cleaned up as it is entirely unusuable
if (determinate && !isBarrierStage && !isCelebornSkewShuffleOrChildShuffle(
appShuffleId)) {
appShuffleId) && !conf.clientShuffleEarlyDeletion) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot reuse the shuffle id when this feature is turned on, think about the following

stage B.0 depends on shuffle 1 which was written by stage A.0

due to "too early deletion", shuffle 1 id is lost, we need to run A.1 , now , shuffle 1 has been deleted from "registered shuffle" , if we reuse 1 as the id and send to tasks of A.1, we will fall into errors like "shuffle not registered"

@CodingCat CodingCat changed the title [WIP] shuffle early delete feature for Spark [WIP][CELEBORN-2244] shuffle early delete feature for Spark Dec 28, 2025
@CodingCat CodingCat marked this pull request as ready for review December 29, 2025 03:53
@CodingCat CodingCat changed the title [WIP][CELEBORN-2244] shuffle early delete feature for Spark [CELEBORN-2244] shuffle early delete feature for Spark Dec 29, 2025
@zuston
Copy link
Member

zuston commented Dec 30, 2025

Thanks for your great work! @CodingCat — this is an important and elegant design for the shuffle system.

Could you help elaborate on the early shuffle deletion issue when a shuffle is consumed by multiple jobs?
Specifically, how does this situation occur, and under what conditions can the shuffle data be deleted prematurely while it is still needed by other jobs?

Or could you provide a concrete example to illustrate this behavior?

@CodingCat
Copy link
Contributor Author

@zuston that's a good question, you can find a simple example in my test https://github.com/apache/celeborn/pull/3569/files#diff-744d5d0f093bff68eee23d0ed6ab4c4ae3b5fc438342c32865acb4629060ab3dR191-R233

basically there is a delay for ShuffleDependency to completely be GCed, so multiple jobs may reuse the output from the same upstream stage.

@zuston
Copy link
Member

zuston commented Dec 30, 2025

@zuston that's a good question, you can find a simple example in my test https://github.com/apache/celeborn/pull/3569/files#diff-744d5d0f093bff68eee23d0ed6ab4c4ae3b5fc438342c32865acb4629060ab3dR191-R233

basically there is a delay for ShuffleDependency to completely be GCed, so multiple jobs may reuse the output from the same upstream stage.

Thanks for your quick reply. @CodingCat Does this issue also occur in pure Spark SQL use cases?

@CodingCat
Copy link
Contributor Author

@zuston that's a good question, you can find a simple example in my test https://github.com/apache/celeborn/pull/3569/files#diff-744d5d0f093bff68eee23d0ed6ab4c4ae3b5fc438342c32865acb4629060ab3dR191-R233
basically there is a delay for ShuffleDependency to completely be GCed, so multiple jobs may reuse the output from the same upstream stage.

Thanks for your quick reply. @CodingCat Does this issue also occur in pure Spark SQL use cases?

depends....

for happy path... in SQL, even there are multiple jobs shown in UI for a single query, they are essentially either sequential or parallel stages which are "converted" to jobs by AQE... and they already capture the correct lineage , so we won't delete shuffle too early

however, the premature deletion still happens in some cases, e.g. if we have 3 stages, A -> B -> C

after B is finished, basically shuffles generated by A can be deleted, , now C hits a fetch failure, it has to restart B which cannot find shuffle generated by A anymore as it is deleted "too early" (this is a case for both SQL/non-SQL tho)

@zuston
Copy link
Member

zuston commented Dec 30, 2025

@zuston that's a good question, you can find a simple example in my test https://github.com/apache/celeborn/pull/3569/files#diff-744d5d0f093bff68eee23d0ed6ab4c4ae3b5fc438342c32865acb4629060ab3dR191-R233
basically there is a delay for ShuffleDependency to completely be GCed, so multiple jobs may reuse the output from the same upstream stage.

Thanks for your quick reply. @CodingCat Does this issue also occur in pure Spark SQL use cases?

depends....

for happy path... in SQL, even there are multiple jobs shown in UI for a single query, they are essentially either sequential or parallel stages which are "converted" to jobs by AQE... and they already capture the correct lineage , so we won't delete shuffle too early

however, the premature deletion still happens in some cases, e.g. if we have 3 stages, A -> B -> C

after B is finished, basically shuffles generated by A can be deleted, , now C hits a fetch failure, it has to restart B which cannot find shuffle generated by A anymore as it is deleted "too early" (this is a case for both SQL/non-SQL tho)

Thanks, fully understood.
From the job’s perspective, both stage retries and application reruns introduce significant overhead and cost if early shuffle deletion happens

@CodingCat
Copy link
Contributor Author

@zuston that's a good question, you can find a simple example in my test https://github.com/apache/celeborn/pull/3569/files#diff-744d5d0f093bff68eee23d0ed6ab4c4ae3b5fc438342c32865acb4629060ab3dR191-R233
basically there is a delay for ShuffleDependency to completely be GCed, so multiple jobs may reuse the output from the same upstream stage.

Thanks for your quick reply. @CodingCat Does this issue also occur in pure Spark SQL use cases?

depends....
for happy path... in SQL, even there are multiple jobs shown in UI for a single query, they are essentially either sequential or parallel stages which are "converted" to jobs by AQE... and they already capture the correct lineage , so we won't delete shuffle too early
however, the premature deletion still happens in some cases, e.g. if we have 3 stages, A -> B -> C
after B is finished, basically shuffles generated by A can be deleted, , now C hits a fetch failure, it has to restart B which cannot find shuffle generated by A anymore as it is deleted "too early" (this is a case for both SQL/non-SQL tho)

Thanks, fully understood. From the job’s perspective, both stage retries and application reruns introduce significant overhead and cost if early shuffle deletion happens

yeah, that's true, this feature is kind of aggressive, we essentially handpicked jobs to rollout internally (by analyzing their lineage)

I think I may need to give some guidance about how to use this feature. Rolling out to all jobs in one shot is dangerous and may not get the benefit you expected... this feature is likely to get you most benefits if you only rollout them to those huge shuffle jobs. For PINS, we have X jobs which shuffle more than Y TBs , only rolling out to them already saved us a huge bill..... we also got benefit of enabling some jobs which we have never been able to run.... e.g. we have k-ways join jobs which shuffle Z+ PBs , without this feature, we need 100s of servers to exclusively serve this job.... now, as we delete shuffle in a more timely manner, we only need 10s of machines ...

additionally, fetch failure is the biggest enemy. sometimes, your server just doesn't deliver data... then with this feature , you need to pay more on retry... so you need to ensure that your servers are in a good shape by tuning whatever you need... and also you may want to monitor fetch failure occurrence in cluster wide to take actions when it is needed

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

I dropped a comment on the earlier draft PR, please see here.

@zuston
Copy link
Member

zuston commented Dec 30, 2025

Really thanks for sharing this @CodingCat

We’ve encountered similar issues. Large applications can generate over 100+ TB of shuffle data, which may persist long after the stage finishes.

BTW, It would be helpful to list these limitations explicitly in the description.

@CodingCat
Copy link
Contributor Author

@mridulm thank you for your review...those code are already removed when I merge them to main branch here....

@CodingCat
Copy link
Contributor Author

@zuston sure, I added them to a new section in the description "user guide"

@CodingCat
Copy link
Contributor Author

I dropped a comment on the earlier draft PR, please see here.

@mridulm as i said in earlier comments with @zuston , this feature is not supposed to be rollout to every job, but only for those jobs with large shuffle and no shuffle reuse, and this implementation also provides functionality for jobs to recover from the miss-rollout ... (for AQE case , it is not a big problem, in cases I observed, the parallel downstream stages can always capture the lineage since deletion is an async process)

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

@CodingCat Assuming my understanding of the proposal is correct (which appears to be so based on the response) - for specific jobs, there are application level updates which can be made to accamadate the requirements - without needing to update the platform. This is not specific to Apache Celeborn btw, but applicable to any spark application which is generating persisted data and/or shuffle output.

An example approach could be to - ensure references are released, and call periodic gc explicitly (spark does it already, but users can can force it as required) : after an expensive query is done, etc. This might not be optimal for all cases unfortunately - but addressing the general case would require updates to Apache Spark - not work-around it through Apache Celeborn.

Even in the specific usecases being internally observed for your ecosystem - If/when query/usage patterns change, it will no longer work well.

Please do let me know if I am missing something here.

@CodingCat
Copy link
Contributor Author

we have many jobs like rdd1->rdd2->rdd3-> s3

rdd1 generated 500TB, and it holds there for 24 hours since rdd3 needs that many hours to be dumped to s3, and we have no way to release the reference of shuffle dependency object since rdd1->2->3 is a reference chain and rdd 3 is not release until the job finished

i don't think non remote shuffle spark users care this feature as much, because a huge computing cluster can have 10s of 1000s of nodes, and tasks being spread everywhere will just amortize the disk pressure to a broad range of machines ..celeborn users do care about this, as we cannot have that many celeborn machines due to the cost constraints

@CodingCat
Copy link
Contributor Author

CodingCat commented Dec 30, 2025

for query pattern changes, well, yes, if it suddenly changes to a certain shape, it may fall into trouble, the tradeoff is, do we really want to use 60 machines to exclusive serve this 500TB shuffle for 24 hours and give up the optimization due to the fear that it might not work in future? (plus we also provide the implementation to prevent job failure )

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

@CodingCat Thanks for the details, I understand the motivation and rationale behind the proposal - and there are existing alternatives like checkpoint'ing, temp materialization, etc. This will be more on the application design/impl side though - and apps will have to do their own tradeoffs (given there are costs involved).

As I mentioned earlier, this proposal itself is inherently unsound, and given additional details provided, I am not in favor of introducing it into Apache Celeborn - my rationale/analysis would be the same if a variant of the proposal was made to Apache Spark for "vanilla" shuffle as well :-)

I am open to being corrected ofcourse if there are other valid usecases and/or requirements I am missing !

@CodingCat
Copy link
Contributor Author

CodingCat commented Dec 30, 2025

@CodingCat Thanks for the details, I understand the motivation and rationale behind the proposal - and there are existing alternatives like checkpoint'ing, temp materialization, etc. This will be more on the application design/impl side though - and apps will have to do their own tradeoffs (given there are costs involved).

As I mentioned earlier, this proposal itself is inherently unsound, and given additional details provided, I am not in favor of introducing it into Apache Celeborn - my rationale/analysis would be the same if a variant of the proposal was made to Apache Spark for "vanilla" shuffle as well :-)

I am open to being corrected ofcourse if there are other valid usecases and/or requirements I am missing !

@mridulm Hi, I agree we can reduce shuffle cost with something like Spark checkpoint , or dump intermediate data to s3 manually. these kinds of approaches essentially sacrifice performance significantly...e.g. using s3 to store the intermediate results of a k-ways join can slow down queries for almost 10X based on my experience. Application side changes also need tons of end user changes, which is, as you know, hard to land and push

the proposal here is essentially another alternative: if your job has a "clean" lineage, you can reduce your shuffle cost by enabling this feature at the higher recovery cost from failure. As I said, this is not a feature for broad rollout but only for certain types of jobs. (actually, based on my experience, most of jobs will survive with this feature , since in reality, there are not many jobs dumping same RDD to multiple locations and RDD-reuse jobs can always start quickly to fill the lineage information needed by this feature. On the other side, only big shuffle jobs can deliver significant values with this feature since small shuffle jobs do not play big roles for your cluster capacity )

I’d like to better understand what specific invariant you believe this proposal violates when you call it “inherently unsound”.

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

@CodingCat while I am sympathetic to the intent behind the change, this is not the right way to address it. While Apache Spark has reasonably robust ability to recompute lost data - that is primarily to address fault tolerance; which is getting misused here.
The rationale, used in this PR, applies to vanilla shuffle in Spark as well; and the analysis would be the same - it is unsound and is not in line with how Spark currently expects shuffle to behave : which is why Spark relies on GC to clean up shuffle. Diverging nontrivially from Spark, in Apache Celeborn, will cause maintenance issues and impacts ability to evolve the projects.

Having said that, I understand the pain point - I am open to proposals to evolve this in Apache Spark (which can then be leveraged in Celeborn).

@CodingCat
Copy link
Contributor Author

CodingCat commented Dec 30, 2025

@mridulm I would like to have a better understanding about why you think Spark's fault tolerance mechanism is "misused" here, specifically how the early deletion is different with the scenario that shuffle data suddenly lost when we need a retry or reuse it, or we have a bug in celeborn or mapputtracker the metadata of shuffle just get erased mistakenly

The reason we converge the handling logic to FetchFailureException is exactly to maximize the compatibility with any future changes in Spark. as long as Spark relies on this exception to recompute lost data, this PR will be compatible . If Spark changes on this part, not only this PR but also a big chunk of fault tolerance handling part of any shuffle system

I would love to hear more details about what changes in your mind would make this PR incompatible with upstream Spark , other than a statement that "this is not right" , "this is misused"

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

Active deletion vs legitimate data loss.

@CodingCat
Copy link
Contributor Author

Active deletion vs legitimate data loss.

well, let me ask more specifically, how Spark would be impacted given it can handle both in the same way

or even you want to highlight this difference of terminology, why couldn't we treat actively deletion data as a new type of data loss (which happens in a more predictable way)

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

@CodingCat , I have not thought through what needs to be done to address it in Apache Spark - if there are concrete proposal, I can help review and evolve it. My suggestion would be to address it at the right layer.
This appears to be a recurring issue in Spark, and has come up in past as well.

Having said that, while I was trying to be constructive in making progress here, I have already given my comments and cant keep revisiting them - as currently formulated, I am not in favor of the (fairly nontrivial) change.

If there is additional details/usecases and/or refinements which help I am happy to take a look/revisit my position.

@CodingCat
Copy link
Contributor Author

CodingCat commented Dec 30, 2025

@CodingCat , I have not thought through what needs to be done to address it in Apache Spark - if there are concrete proposal, I can help review and evolve it. My suggestion would be to address it at the right layer.

This appears to be a recurring issue in Spark, and has come up in past as well.

Having said that, while I was trying to be constructive in making progress here, I have already given my comments and cant keep revisiting them - as currently formulated, I am not in favor of the (fairly nontrivial) change.

If there is additional details/usecases and/or refinements which help I am happy to take a look/revisit my position.

I think that's the key option conflict here, I don't really take Spark as the right layer to address this issue

one of the major reasons is that it cannot be extended to an advanced version of this PR,partition level early deletion, given vanilla Spark shuffle storage format (well, you can still work it out, but that will touch every piece of shuffle related code)

to have a more cost efficient solution for shuffle storage via early shuffle deletion, no matter which layer you build it on, you always need to tradeoff between happy path storage cost and bad path computing cost..and with the facilities of remote shuffle systems storage layout, you can significantly improve the happy path gainings

in summary, building on Spark layer brings the same if not higher, cost, we already have a solution and can extend it to a even better one in Remote Shuffle Systems, why not?

@mridulm
Copy link
Contributor

mridulm commented Dec 30, 2025

To clarify, I am not necessarily veto'ing the change - but I am not in favor of the change (I have already detailed why).
I will let other committers review and if they are convinced about the merits, I wont block it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants