Skip to content

Conversation

@TomAugspurger
Copy link
Contributor

@TomAugspurger TomAugspurger commented Oct 16, 2025

Description

This adds CUDA streams to all pylibcudf calls in cudf-polars.

At the moment, we continue to use the default stream for all operations, so we're explicitly using the default stream. A future PR will update things to use non-default streams.

As far as I can tell, this should get all the pylibcudf calls in cudf-polars. It's a lot of code to review. Unfortunately, it mixes many trivial changes (add stream=stream to a bunch of spots) with a handful of non-trivial changes. I'll comment inline on all the non-trivial changes. I'm more than happy to break those changes out to their own PR (but it gets complicated. The changes to Column.nan_count, for example, forces the change to broadcast and aggregation.py...)

Closes #20239

Part of #20228

@copy-pr-bot
Copy link

copy-pr-bot bot commented Oct 16, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels Oct 16, 2025
@GPUtester GPUtester moved this to In Progress in cuDF Python Oct 16, 2025
Copy link
Contributor Author

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't finished flagging the non-trivial changes, but I have to run. I'll finish that up later.

@TomAugspurger TomAugspurger force-pushed the tom/polars-cuda-stream-everything branch 3 times, most recently from 27e8178 to 1d32cdc Compare October 17, 2025 13:25
This adds CUDA streams to all pylibcudf calls in cudf-polars.

At the moment, we continue to use the default stream for all operations,
so we're *explicitly* using the default stream. A future PR will update
things to use non-default streams.
TomAugspurger added a commit to TomAugspurger/pygdf that referenced this pull request Oct 17, 2025
This adds CUDA streams to `cudf_polars.dsl.expressions.aggregation`.
Streams are still missing from some `cudf_polars.containers.Column`
calls in this file, but all the directly pylibcudf calls should be
covered.

Split off rapidsai#20291.
@TomAugspurger TomAugspurger added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Oct 17, 2025
Copy link
Contributor Author

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remaining non-trivial changes.

@TomAugspurger TomAugspurger changed the title [WIP]: Add CUDA streams to cudf-polars Add CUDA streams to cudf-polars Oct 17, 2025
@TomAugspurger TomAugspurger marked this pull request as ready for review October 17, 2025 20:27
@TomAugspurger TomAugspurger requested a review from a team as a code owner October 17, 2025 20:27
ast_result = to_ast(predicate)
stream = get_cuda_stream()
ast_result = to_ast(predicate, stream=stream)
stream.synchronize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait...Why are we synchronizing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the discussion at #20291 (comment). In these spots, we don't have natural access to a CUDA stream like we do in IR.do_evaluate. But, we do need to ensure that the output is valid by the time we use the result. So our two options were

  1. Have some sort of well-known stream ("a stream singleton" I called it) that can do these things and be synchronized with before using the result
  2. Ensure the result is valid before returning control flow to the rest of the program (i.e. call stream.syncrhonize()

All the cases in the thread linked above are quick things just initializing some small pieces of data, so calling stream.synchronize() here should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another option is to do it lazily

def to_ast_lazy(predicate):
    def f(stream):
        return to_ast(predicate, stream=stream) 
    return f

And then call when we have the stream to_ast_lazy(predicate)(df.stream)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that seems like it'd work for our (current) usage of ConditionalJoin.Predicate.ast, which is in a do_evaluate. I'm less certain whether it'd work for duration_to_scalar and the stats things.

Given how this is used (just making a plc.Scalar) I think a synchronize is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm less certain whether it'd work for duration_to_scalar and the stats things.

FWIW I think it should work.

Given how this is used (just making a plc.Scalar) I think a synchronize is OK.

I think we should try to avoid stream syncs when we can. Maybe add TODO comment saying we should consider implementing the lazy approach or the refactor @wence- #20291 (comment) mentioned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling stats is proving more complicated. IIUC, we use those results to make lowering decisions, so it really does need to be done before we're in some IR.do_evaluate. So synchronization is required / isn't an (additional) problem.

Could CUDA events could help a little here? We record the event and hand that along with the stream as a "future." And then only when we need the host result, we sync. Idk though, I'm less confident about this one, but a TODO comment probably doesn't hurt? It would have to be TODO I think because IIRC you mentioned that CUDA Events aren't supported in RMM yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you say, lack of an Event API is one thing preventing us from doing this. The join_streams implementation uses events to record an event on one stream and (asynchronously) wait for it on another.

But then we run into the fact that at the time we need to join the two streams (the stream the stats collection happened on and the stream we're wanting to use that result), we don't have access to the original stream anymore. My earlier "stream singleton" thing did this, but end up being ugly. And I think it ended up not being worthwhile since IIUC we need to synchronize anyway in order to make decisions about how to lower the IR nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it ended up not being worthwhile since IIUC we need to synchronize anyway in order to make decisions about how to lower the IR nodes.

Ok thanks, thought I'd just mention it. That makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The to_ast change is causing problems since we used that for validating whether something was supported and raising a NotImplementedError. By deferring the to_ast until inside do_evaluate, we've passed the time when we can do the usual fallback.

For now, I've reverted the to_ast change (so we're back to a synchronize). But I'll see if there's an easy way to do it for this PR. We might need to split out the "validate whether you can do this" part from the "actually do stuff that might require a stream" part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave rewriting how we use to_ast one more shot and failed. More details at #20372

For now, I think we just live with this temporary stream & synchronization. I'll look into it more this week.

@coderabbitai
Copy link

coderabbitai bot commented Oct 23, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Move `to_ast` call to a `do_evaluate` context.
This updates the callers of `offsets_to_windows` to wait until they're
in a context with a stream available.
This defers the point at which we call offsets_to_windows, until a time
when we have a CUDA stream in do_evaluate.

We do still perform some of the work (calling duration_to_int) early on
to catch any translation issues (e.g. we want to raise with month is
non-zero, since we don't support that).
Copy link
Contributor

@Matt711 Matt711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for helping me understand streams better, this is getting close

ast_result = to_ast(predicate)
stream = get_cuda_stream()
ast_result = to_ast(predicate, stream=stream)
stream.synchronize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling stats is proving more complicated. IIUC, we use those results to make lowering decisions, so it really does need to be done before we're in some IR.do_evaluate. So synchronization is required / isn't an (additional) problem.

Could CUDA events could help a little here? We record the event and hand that along with the stream as a "future." And then only when we need the host result, we sync. Idk though, I'm less confident about this one, but a TODO comment probably doesn't hurt? It would have to be TODO I think because IIRC you mentioned that CUDA Events aren't supported in RMM yet.

Comment on lines 1940 to 1941
raise NotImplementedError(
f"Conditional join with predicate {self.predicate}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upstream polars tests are failing because we're raising in do_evaluate now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate concerns here? That is split up "translating the predicate?" from "allocate scalars on stream"?

@TomAugspurger
Copy link
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit fa41b4e into rapidsai:main Oct 27, 2025
132 checks passed
@github-project-automation github-project-automation bot moved this from In Progress to Done in cuDF Python Oct 27, 2025
@TomAugspurger TomAugspurger deleted the tom/polars-cuda-stream-everything branch October 27, 2025 16:02
rapids-bot bot pushed a commit that referenced this pull request Oct 28, 2025
#20291 missed a spot in `Join` where we need to pass the CUDA stream to the pylibcudf join function. This shows up in PDSH query 4.

Authors:
  - Tom Augspurger (https://github.com/TomAugspurger)

Approvers:
  - Matthew Murray (https://github.com/Matt711)

URL: #20398
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cudf-polars Issues specific to cudf-polars improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

[FEA]: Update cudf_polars.containers.Column to use CUDA streams

4 participants