-
Notifications
You must be signed in to change notification settings - Fork 981
Use CUDA streams in all pylibcudf calls made by cudf-polars #20291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use CUDA streams in all pylibcudf calls made by cudf-polars #20291
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't finished flagging the non-trivial changes, but I have to run. I'll finish that up later.
27e8178 to
1d32cdc
Compare
This adds CUDA streams to all pylibcudf calls in cudf-polars. At the moment, we continue to use the default stream for all operations, so we're *explicitly* using the default stream. A future PR will update things to use non-default streams.
This adds CUDA streams to `cudf_polars.dsl.expressions.aggregation`. Streams are still missing from some `cudf_polars.containers.Column` calls in this file, but all the directly pylibcudf calls should be covered. Split off rapidsai#20291.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The remaining non-trivial changes.
…cuda-stream-everything
| ast_result = to_ast(predicate) | ||
| stream = get_cuda_stream() | ||
| ast_result = to_ast(predicate, stream=stream) | ||
| stream.synchronize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait...Why are we synchronizing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the discussion at #20291 (comment). In these spots, we don't have natural access to a CUDA stream like we do in IR.do_evaluate. But, we do need to ensure that the output is valid by the time we use the result. So our two options were
- Have some sort of well-known stream ("a stream singleton" I called it) that can do these things and be synchronized with before using the result
- Ensure the result is valid before returning control flow to the rest of the program (i.e. call
stream.syncrhonize()
All the cases in the thread linked above are quick things just initializing some small pieces of data, so calling stream.synchronize() here should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think another option is to do it lazily
def to_ast_lazy(predicate):
def f(stream):
return to_ast(predicate, stream=stream)
return fAnd then call when we have the stream to_ast_lazy(predicate)(df.stream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that seems like it'd work for our (current) usage of ConditionalJoin.Predicate.ast, which is in a do_evaluate. I'm less certain whether it'd work for duration_to_scalar and the stats things.
Given how this is used (just making a plc.Scalar) I think a synchronize is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm less certain whether it'd work for duration_to_scalar and the stats things.
FWIW I think it should work.
Given how this is used (just making a plc.Scalar) I think a synchronize is OK.
I think we should try to avoid stream syncs when we can. Maybe add TODO comment saying we should consider implementing the lazy approach or the refactor @wence- #20291 (comment) mentioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling stats is proving more complicated. IIUC, we use those results to make lowering decisions, so it really does need to be done before we're in some
IR.do_evaluate. So synchronization is required / isn't an (additional) problem.
Could CUDA events could help a little here? We record the event and hand that along with the stream as a "future." And then only when we need the host result, we sync. Idk though, I'm less confident about this one, but a TODO comment probably doesn't hurt? It would have to be TODO I think because IIRC you mentioned that CUDA Events aren't supported in RMM yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you say, lack of an Event API is one thing preventing us from doing this. The join_streams implementation uses events to record an event on one stream and (asynchronously) wait for it on another.
But then we run into the fact that at the time we need to join the two streams (the stream the stats collection happened on and the stream we're wanting to use that result), we don't have access to the original stream anymore. My earlier "stream singleton" thing did this, but end up being ugly. And I think it ended up not being worthwhile since IIUC we need to synchronize anyway in order to make decisions about how to lower the IR nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think it ended up not being worthwhile since IIUC we need to synchronize anyway in order to make decisions about how to lower the IR nodes.
Ok thanks, thought I'd just mention it. That makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The to_ast change is causing problems since we used that for validating whether something was supported and raising a NotImplementedError. By deferring the to_ast until inside do_evaluate, we've passed the time when we can do the usual fallback.
For now, I've reverted the to_ast change (so we're back to a synchronize). But I'll see if there's an easy way to do it for this PR. We might need to split out the "validate whether you can do this" part from the "actually do stuff that might require a stream" part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave rewriting how we use to_ast one more shot and failed. More details at #20372
For now, I think we just live with this temporary stream & synchronization. I'll look into it more this week.
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
Move `to_ast` call to a `do_evaluate` context.
This updates the callers of `offsets_to_windows` to wait until they're in a context with a stream available.
This reverts commit 4bce714.
This defers the point at which we call offsets_to_windows, until a time when we have a CUDA stream in do_evaluate. We do still perform some of the work (calling duration_to_int) early on to catch any translation issues (e.g. we want to raise with month is non-zero, since we don't support that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for helping me understand streams better, this is getting close
| ast_result = to_ast(predicate) | ||
| stream = get_cuda_stream() | ||
| ast_result = to_ast(predicate, stream=stream) | ||
| stream.synchronize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling stats is proving more complicated. IIUC, we use those results to make lowering decisions, so it really does need to be done before we're in some
IR.do_evaluate. So synchronization is required / isn't an (additional) problem.
Could CUDA events could help a little here? We record the event and hand that along with the stream as a "future." And then only when we need the host result, we sync. Idk though, I'm less confident about this one, but a TODO comment probably doesn't hurt? It would have to be TODO I think because IIRC you mentioned that CUDA Events aren't supported in RMM yet.
| raise NotImplementedError( | ||
| f"Conditional join with predicate {self.predicate}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upstream polars tests are failing because we're raising in do_evaluate now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we separate concerns here? That is split up "translating the predicate?" from "allocate scalars on stream"?
This reverts commit 617f450.
|
/merge |
#20291 missed a spot in `Join` where we need to pass the CUDA stream to the pylibcudf join function. This shows up in PDSH query 4. Authors: - Tom Augspurger (https://github.com/TomAugspurger) Approvers: - Matthew Murray (https://github.com/Matt711) URL: #20398
Description
This adds CUDA streams to all pylibcudf calls in cudf-polars.
At the moment, we continue to use the default stream for all operations, so we're explicitly using the default stream. A future PR will update things to use non-default streams.
As far as I can tell, this should get all the pylibcudf calls in cudf-polars. It's a lot of code to review. Unfortunately, it mixes many trivial changes (add
stream=streamto a bunch of spots) with a handful of non-trivial changes. I'll comment inline on all the non-trivial changes. I'm more than happy to break those changes out to their own PR (but it gets complicated. The changes toColumn.nan_count, for example, forces the change tobroadcastandaggregation.py...)Closes #20239
Part of #20228