-
Notifications
You must be signed in to change notification settings - Fork 408
[CELEBORN-2244] shuffle early delete feature for Spark #3569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…able reuse shuffle id when early deletion feature turned on
| PbReportMissingShuffleIdResponse.newBuilder().setSuccess(ret).build() | ||
| context.reply(pbReportMissingShuffleIdResponse) | ||
| /* | ||
| val latestUpstreamShuffleId = shuffleIds.maxBy(_._2._1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original dedup logic may suffer from a race condition described as following
stage A depends on shuffle 1, due to "too early deletion", the missing report is sent and handled for shuffle 1, at this point, a new shuffle id is generated, so latestUpstreamShuffleId._2._1 is no longer UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID... the missing report is handled again... then mess up everything
| // be cleaned up as it is entirely unusuable | ||
| if (determinate && !isBarrierStage && !isCelebornSkewShuffleOrChildShuffle( | ||
| appShuffleId)) { | ||
| appShuffleId) && !conf.clientShuffleEarlyDeletion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot reuse the shuffle id when this feature is turned on, think about the following
stage B.0 depends on shuffle 1 which was written by stage A.0
due to "too early deletion", shuffle 1 id is lost, we need to run A.1 , now , shuffle 1 has been deleted from "registered shuffle" , if we reuse 1 as the id and send to tasks of A.1, we will fall into errors like "shuffle not registered"
|
Thanks for your great work! @CodingCat — this is an important and elegant design for the shuffle system. Could you help elaborate on the early shuffle deletion issue when a shuffle is consumed by multiple jobs? Or could you provide a concrete example to illustrate this behavior? |
|
@zuston that's a good question, you can find a simple example in my test https://github.com/apache/celeborn/pull/3569/files#diff-744d5d0f093bff68eee23d0ed6ab4c4ae3b5fc438342c32865acb4629060ab3dR191-R233 basically there is a delay for ShuffleDependency to completely be GCed, so multiple jobs may reuse the output from the same upstream stage. |
Thanks for your quick reply. @CodingCat Does this issue also occur in pure Spark SQL use cases? |
depends.... for happy path... in SQL, even there are multiple jobs shown in UI for a single query, they are essentially either sequential or parallel stages which are "converted" to jobs by AQE... and they already capture the correct lineage , so we won't delete shuffle too early however, the premature deletion still happens in some cases, e.g. if we have 3 stages, A -> B -> C after B is finished, basically shuffles generated by A can be deleted, , now C hits a fetch failure, it has to restart B which cannot find shuffle generated by A anymore as it is deleted "too early" (this is a case for both SQL/non-SQL tho) |
Thanks, fully understood. |
yeah, that's true, this feature is kind of aggressive, we essentially handpicked jobs to rollout internally (by analyzing their lineage) I think I may need to give some guidance about how to use this feature. Rolling out to all jobs in one shot is dangerous and may not get the benefit you expected... this feature is likely to get you most benefits if you only rollout them to those huge shuffle jobs. For PINS, we have X jobs which shuffle more than Y TBs , only rolling out to them already saved us a huge bill..... we also got benefit of enabling some jobs which we have never been able to run.... e.g. we have k-ways join jobs which shuffle Z+ PBs , without this feature, we need 100s of servers to exclusively serve this job.... now, as we delete shuffle in a more timely manner, we only need 10s of machines ... additionally, fetch failure is the biggest enemy. sometimes, your server just doesn't deliver data... then with this feature , you need to pay more on retry... so you need to ensure that your servers are in a good shape by tuning whatever you need... and also you may want to monitor fetch failure occurrence in cluster wide to take actions when it is needed |
|
I dropped a comment on the earlier draft PR, please see here. |
|
Really thanks for sharing this @CodingCat We’ve encountered similar issues. Large applications can generate over 100+ TB of shuffle data, which may persist long after the stage finishes. BTW, It would be helpful to list these limitations explicitly in the description. |
|
@mridulm thank you for your review...those code are already removed when I merge them to main branch here.... |
|
@zuston sure, I added them to a new section in the description "user guide" |
@mridulm as i said in earlier comments with @zuston , this feature is not supposed to be rollout to every job, but only for those jobs with large shuffle and no shuffle reuse, and this implementation also provides functionality for jobs to recover from the miss-rollout ... (for AQE case , it is not a big problem, in cases I observed, the parallel downstream stages can always capture the lineage since deletion is an async process) |
|
@CodingCat Assuming my understanding of the proposal is correct (which appears to be so based on the response) - for specific jobs, there are application level updates which can be made to accamadate the requirements - without needing to update the platform. This is not specific to Apache Celeborn btw, but applicable to any spark application which is generating persisted data and/or shuffle output. An example approach could be to - ensure references are released, and call periodic gc explicitly (spark does it already, but users can can force it as required) : after an expensive query is done, etc. This might not be optimal for all cases unfortunately - but addressing the general case would require updates to Apache Spark - not work-around it through Apache Celeborn. Even in the specific usecases being internally observed for your ecosystem - If/when query/usage patterns change, it will no longer work well. Please do let me know if I am missing something here. |
|
we have many jobs like rdd1->rdd2->rdd3-> s3 rdd1 generated 500TB, and it holds there for 24 hours since rdd3 needs that many hours to be dumped to s3, and we have no way to release the reference of shuffle dependency object since rdd1->2->3 is a reference chain and rdd 3 is not release until the job finished i don't think non remote shuffle spark users care this feature as much, because a huge computing cluster can have 10s of 1000s of nodes, and tasks being spread everywhere will just amortize the disk pressure to a broad range of machines ..celeborn users do care about this, as we cannot have that many celeborn machines due to the cost constraints |
|
for query pattern changes, well, yes, if it suddenly changes to a certain shape, it may fall into trouble, the tradeoff is, do we really want to use 60 machines to exclusive serve this 500TB shuffle for 24 hours and give up the optimization due to the fear that it might not work in future? (plus we also provide the implementation to prevent job failure ) |
|
@CodingCat Thanks for the details, I understand the motivation and rationale behind the proposal - and there are existing alternatives like checkpoint'ing, temp materialization, etc. This will be more on the application design/impl side though - and apps will have to do their own tradeoffs (given there are costs involved). As I mentioned earlier, this proposal itself is inherently unsound, and given additional details provided, I am not in favor of introducing it into Apache Celeborn - my rationale/analysis would be the same if a variant of the proposal was made to Apache Spark for "vanilla" shuffle as well :-) I am open to being corrected ofcourse if there are other valid usecases and/or requirements I am missing ! |
@mridulm Hi, I agree we can reduce shuffle cost with something like Spark checkpoint , or dump intermediate data to s3 manually. these kinds of approaches essentially sacrifice performance significantly...e.g. using s3 to store the intermediate results of a k-ways join can slow down queries for almost 10X based on my experience. Application side changes also need tons of end user changes, which is, as you know, hard to land and push the proposal here is essentially another alternative: if your job has a "clean" lineage, you can reduce your shuffle cost by enabling this feature at the higher recovery cost from failure. As I said, this is not a feature for broad rollout but only for certain types of jobs. (actually, based on my experience, most of jobs will survive with this feature , since in reality, there are not many jobs dumping same RDD to multiple locations and RDD-reuse jobs can always start quickly to fill the lineage information needed by this feature. On the other side, only big shuffle jobs can deliver significant values with this feature since small shuffle jobs do not play big roles for your cluster capacity ) I’d like to better understand what specific invariant you believe this proposal violates when you call it “inherently unsound”. |
|
@CodingCat while I am sympathetic to the intent behind the change, this is not the right way to address it. While Apache Spark has reasonably robust ability to recompute lost data - that is primarily to address fault tolerance; which is getting misused here. Having said that, I understand the pain point - I am open to proposals to evolve this in Apache Spark (which can then be leveraged in Celeborn). |
|
@mridulm I would like to have a better understanding about why you think Spark's fault tolerance mechanism is "misused" here, specifically how the early deletion is different with the scenario that shuffle data suddenly lost when we need a retry or reuse it, or we have a bug in celeborn or mapputtracker the metadata of shuffle just get erased mistakenly The reason we converge the handling logic to FetchFailureException is exactly to maximize the compatibility with any future changes in Spark. as long as Spark relies on this exception to recompute lost data, this PR will be compatible . If Spark changes on this part, not only this PR but also a big chunk of fault tolerance handling part of any shuffle system I would love to hear more details about what changes in your mind would make this PR incompatible with upstream Spark , other than a statement that "this is not right" , "this is misused" |
|
Active deletion vs legitimate data loss. |
well, let me ask more specifically, how Spark would be impacted given it can handle both in the same way or even you want to highlight this difference of terminology, why couldn't we treat actively deletion data as a new type of data loss (which happens in a more predictable way) |
|
@CodingCat , I have not thought through what needs to be done to address it in Apache Spark - if there are concrete proposal, I can help review and evolve it. My suggestion would be to address it at the right layer. Having said that, while I was trying to be constructive in making progress here, I have already given my comments and cant keep revisiting them - as currently formulated, I am not in favor of the (fairly nontrivial) change. If there is additional details/usecases and/or refinements which help I am happy to take a look/revisit my position. |
I think that's the key option conflict here, I don't really take Spark as the right layer to address this issue one of the major reasons is that it cannot be extended to an advanced version of this PR,partition level early deletion, given vanilla Spark shuffle storage format (well, you can still work it out, but that will touch every piece of shuffle related code) to have a more cost efficient solution for shuffle storage via early shuffle deletion, no matter which layer you build it on, you always need to tradeoff between happy path storage cost and bad path computing cost..and with the facilities of remote shuffle systems storage layout, you can significantly improve the happy path gainings in summary, building on Spark layer brings the same if not higher, cost, we already have a solution and can extend it to a even better one in Remote Shuffle Systems, why not? |
|
To clarify, I am not necessarily veto'ing the change - but I am not in favor of the change (I have already detailed why). |
Why are the changes needed?
This PR implements proactive deletion of shuffle data once it is no longer needed (failure recovery is not considered).
With Spark’s native shuffle mechanism, shuffle data is only eligible for cleanup when the associated ShuffleDependency object becomes unreachable and is garbage-collected on the driver. In long-running applications with long lineages, ShuffleDependency objects can be retained in the driver heap for a long time due to reference chains in the DAG / query plan, even though the corresponding shuffle files are no longer needed for future computation. This delays shuffle cleanup and causes shuffle data to occupy cluster disks much longer than necessary within the same job.
This becomes especially costly in the “independent deployment” scenario where Celeborn runs as a long-lived service and serves applications that may run for tens of hours: without timely cleanup, obsolete shuffle data from earlier parts of the lineage accumulates on the cluster disks during execution, increasing disk pressure and the overall cost of the service.
What changes were proposed in this pull request?
Shuffle – Stage Dependency Tracking
This PR introduces explicit tracking of the dependency relationship between Spark stages and shuffle data, so that shuffle data can be safely expired once it is no longer needed by any running stage.
The mechanism works as follows:
When
ClientReadersends aGetShuffleIdrequest, we record that stage X depends on shuffle Y via callbacks into LifecycleManager. This relationship is maintained by a new component,StageDependencyManager.We also rely on
SparkListenerto track the stage and shuffle dependency by analyzing the shuffle written by the parent stages. This is kind of a best-effort approach to handle extreme cases that some tasks failed due totoo early deletionbefore it sendsGetShuffleIdfor certain shuffle. However, we need to realize that it cannot handle for all cases especially when some parent stages are skippedWe use
SparkListenerto track stage completion events. When a stage finishes, its dependency on the corresponding shuffle is removed. Once no active stages depend on shuffle Y, shuffle Y is considered expired and becomes eligible for deletion.Auto Recover Expired Shuffle
Since shuffle data is now deleted proactively, it is possible that some shuffle files may be removed "too early" under certain conditions, for example:
A shuffle is consumed by multiple jobs.
A downstream stage fails (e.g., fetch failure) and some upstream stages must be re-executed after its shuffle has already been expired.
To avoid job failures in these scenarios, this PR also introduces an automatic recovery mechanism that allows deleted shuffle data to be reconstructed when it is requested again. The recovery flow works as follows:
We detect “too-early deletion” when handling the GetShuffleId request from ClientReader.
If a shuffle has already been expired, areAllMapperAttemptsFinished throws an IllegalStageException. When this exception is caught, we instruct ClientReader to throw a FetchFailure after LifecycleManager finishes the recovery preparation, so that the corresponding upstream stages can be retried by Spark. During this recovery preparation phase, we perform one of the following actions:
Mechanism to Support More Complex Lineage
We have a special function to offer more flexibilities for users to control when to enable this early deletion feature. For instance, users can launch two jobs reusing a certain users and dumping data respectively. Users can turn on
celeborn.client.spark.fetch.shuffleEarlyDeletion.checkPropertyand then set System property CELEBORN_EARLY_SHUFFLE_DELETION to true in the second job to avoidtoo early deletion.Does this PR resolve a correctness bug?
NO
Does this PR introduce any user-facing change?
several more configs to enable this feature
User Guidance
How was this patch tested?
integration tests and the production environments in PINS