Skip to content

Conversation

@gaogaotiantian
Copy link
Contributor

What changes were proposed in this pull request?

Respect spark.sql.session.timeZone in UDF workers.

This is discussed in #52980 but we decided to move it to a separate PR. There are still open questions left

  1. It seems like this method can't get the changes by spark.conf.set. I believe this is trivial to people who are familiar with the configs so I did not investigate too much.
  2. pandas/arrow UDFs are actually reading this config, but seems like that's only passed for those kind of UDFs. The message has no structure.

Why are the changes needed?

Relying on the timezone of local machine does not make any sense.

Does this PR introduce any user-facing change?

Yes. The UDF behavior regarding to timestamps and timezones will be changed.

How was this patch tested?

Manually

Was this patch authored or co-authored using generative AI tooling?

No

@gaogaotiantian
Copy link
Contributor Author

@cloud-fan , @ueshin , @zhengruifeng we've discussed this but did not reach to a conclusion. I had a draft here and a few questions. We probably need to further discuss about the implementation and implication.

@gaogaotiantian gaogaotiantian changed the title [SPARK-33863] Respect session timezone in udf workers [SPARK-33863][PYTHON] Respect session timezone in udf workers Nov 21, 2025
Copy link
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a behavior change, I think we need a flag for it.
Also we need new tests in test_udf.py


# Use the local timezone to convert the timestamp
tz = datetime.datetime.now().astimezone().tzinfo
tzname = os.environ.get("SPARK_SESSION_LOCAL_TIMEZONE", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, we will hit this branch for every udf execution, not just once per python worker initialization, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, but it doesn't seem like spark.session.conf.set("spark.sql.session.timeZone") impacts the result. This only works when I create the session with the conf. Any ideas? I can investigate if that's an issue or we want to understand it. I just thought you might understand immediately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaogaotiantian We can use the same way as the other configs to get the runtime config, like hideTraceback or simplifiedTraceback above. Please take a look at PythonRunner and its subclasses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so basically overwrite this for every subclassed worker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Also if we have a flag, the subclasses should decide whether it returns the session local timezone or None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the flag should be a conf in the same level as session local timezone? Or just Python udf level? Will it be default to the original behavior or the new behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the flag should be the same level as the session local timezone, a runtime conf in SQLConf.
It can be enabled by default, but when disabled, the behavior should be the original behavior.
WDYT? cc @zhengruifeng @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arrow-based UDFs already handles the session local timezone, so it may be ok to just update BasePythonUDFRunner.envVars to have the env var there instead of PythonRunner?

envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1")
}
if (sessionLocalTimeZone.isDefined) {
envVars.put("SPARK_SESSION_LOCAL_TIMEZONE", sessionLocalTimeZone.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for arrow-based UDFs, sessionLocalTimeZone is actually already passed to the python side

val timeZoneConf = Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone)

However this workerConf is not available in vanilla Python UDF, probably we can consider supporting it in vanilla Python UDF in the future. also cc @HeartSaVioR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's better to pass the configs via a proper protocol, instead of system variables. But it's already the case for vanilla python runner and I think it's fine to follow it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's ideal, but I guess it'd need a non-trivial code change.

@cloud-fan
Copy link
Contributor

can we add a test which sets spark.sql.session.timeZone to a different value than the CI machine local timezone?

@gaogaotiantian
Copy link
Contributor Author

Okay now I rethink about this and I think our TimestampType is completely wrong. I don't believe we can patch it without breaking backward compatibility.

The key point of TimestampType is that it's timezone aware - it's impossible to make it correct, if we support naive timestamp (unless we are able to read session config inside the conversion function).

It's not possible to determine which timezone to use, when we get a naive datetime and try to convert it to a timezone aware data type.

The only way I think we can go is to enforce timezone for datetime.datetime objects if users want to convert it to a TimestampType. Raise an error if there is none.

If we can get the current session timezone when we try to convert to TimestampType, that would work too, but I don't think that's feasible. It also breaks OOP principles.

I don't want to fix this so that it's correct in some cases but become wrong in others. We should make a decision about whether we want to fix it (with the risk of breaking users code). If not, that's okay - we can have a timestamp system that sometimes work.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 26, 2025

unless we are able to read session config inside the conversion function

Where do we do the conversion? At lease for the UDF case the conversion should all happen within an active query which belongs to a session ?

@gaogaotiantian
Copy link
Contributor Author

Where do we do the conversion? At lease for the UDF case the conversion should all happen within an active query which belongs to a session ?

We have to do it everywhere.

df = spark.createDataFrame([(datetime.datetime(1990, 8, 10, 0, 0),)], ["ts"])

Here we are trying to create a TimestampType with a naive datetime - how could we determine the timezone info? It's not correct to assume it belongs to any timezone.

There are two correct ways to do this:

  1. For every single conversion, we know session local timezone and we assume the naive datetime is that timezone
  2. We throw an error when the users try to convert a naive timestamp to TimestampType and suggest that they should use TimestampNTZType.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 26, 2025

@gaogaotiantian The key of Spark TimestampType is that it's an absolute timestamp. The session timezone only matters when we render the timestamp without timezone (e.g. df.show, or cast to string, or functions that get year/month/.../second fields from timestamp).

For the case of df = spark.createDataFrame([(datetime.datetime(1990, 8, 10, 0, 0),)], ["ts"]), we use a specific session spark to create the dataframe, and apparently we should respect its session timezone. We should convert datetime.datetime(1990, 8, 10, 0, 0) to an absolute timestamp by attaching the session timezone to it. Moreover, we can have a mix of python datetime.datetime objects which have different timezones or no timezone, and it's OK because we can still convert them to absolute timestamps.

A similar example is reading JDBC table that contains column with standard TIMESTAMP WITH TIMEZONE type. Each value can have a different timezone but it's still OK to read it as Spark TimestampType, because they can be converted to absolute timestamps.

Under the hood, TimestampType is stored as int64 in memory, which means number of microseconds from UTC epoch (1970-01-01 00:00:00 Z)

@gaogaotiantian
Copy link
Contributor Author

gaogaotiantian commented Nov 26, 2025

@cloud-fan , I understand TimestampType under the hood is just a UTC epoch timestamp. We need to convert to UTC timestamp so we have to assume a timezone for naive timestamps - I don't believe we are doing that.

spark.conf.set("spark.sql.session.timeZone", "UTC")
df = spark.createDataFrame([
    (datetime.datetime(1990, 8, 10, 0, 0, tzinfo=datetime.timezone.utc),),
    (datetime.datetime(1990, 8, 10, 0, 0),)
], ["ts"])
df.show()

The two columns above are different - because we do not respect session timezone when converting them. Notice that UDF is not involved at this point.

class TimestampType(DatetimeType, metaclass=DataTypeSingleton):

We don't check timezone info in toInternal and fromInternal. (I don't know if there's other secrets like changing the system timezone, but the result is different).

We can fix that with some hacks, if we really want to - that's the option 1 I mentioned above. Again, we need to do it everywhere.

However, that is not the full picture. We have an even worse issue about datetime.datetime - yes, internally we can convert it to an EPOCH timestamp, but the user might want to play with in in Python.

@udf(returnType=BooleanType())
def greater(ts):
    return ts > datetime.datetime(1990, 8, 10, 0, 0, tzinfo=datetime.timezone.utc)

The code above will raise an error, because we convert the TimestampType to a naive datetime - even though we claim that TimestampType is timezone aware. It's illegal to compare a naive timestamp with an aware timestamp in Python (you can do == check but it will always return False).

Also I found a issue with probably DST.

@udf(returnType=BooleanType())
def same(ts):
    return ts == datetime.datetime(1990, 8, 10, 0, 0)

df = spark.createDataFrame([
    (datetime.datetime(1990, 8, 10, 0, 0),)
], ["ts"])

df.select(same("ts")).show()

Even this returns False - there's an hour diff, probably due to some missing DST checks.

Back to my point - our TimestampType on Python is just broken - it will disappoint users when they try to do some manipulation on it. We can't mix naive and aware timestamps together because Python does not support it.

This is why I propose my second option - it's a bit aggressive but we can make it right - to always map TimestampType with aware datetime and TimestampNTZType with naive datetime. I believe that's the only chance that we can make it completely correct.

However, there is a risk that some of the existing user code can break. If that's a concern. We can just leave this broken. It still works in some occasions.

@cloud-fan
Copy link
Contributor

ah this is tough. I agree with "always map TimestampType with aware datetime", but it can be a breaking change to python UDFs, as it's not only a data change, but also type change (It's illegal to compare a naive timestamp with an aware timestamp in Python).

How about arrow/pandas? Do they also rely on datetime object?

@gaogaotiantian
Copy link
Contributor Author

Yeah this could be a breaking change, but this is the correct way to go. Mapping TimestampType to naive datetime object is technically not "safer" - it still can't be compared with an aware timestamp. It's not like naive timestamp has better compatibility - you have to choose one or the other.

I don't have the best knowledge of pandas, but it seems like they have similar concerns - https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html

I mean we can't really make it work properly if we mix them up. I can think of a few ways to make it less painful

  1. If the user uses a naive datetime and try to convert it to a TimestampType explicitly, we use utc for the naive timestamp instead of raising an error (configurable).
  2. When we infer types, we infer based on whether datetime has a timezone - do not automatically point to TimestampType.
  3. Provide a flag to keep the original behavior - name it something like keep_the_wrong_timestamp_behavior. If users are not ready, they need to explicitly set that flag.
  4. Generate warnings when users try to mix these things up.

I agree this could be interruptive, but we can't make it right - that's the problem. It's a whole big mess internally and we simply can't make it better while keeping backward compatibility.

@gaogaotiantian
Copy link
Contributor Author

I thought about it and I have an alternative proposal. We can add a conf to enable "strict mode" for timestamps. Where we always pair the aware timestamp with TimestampType and naive timestamp with TimestampNTZType. This is off by default but strict and correct when enabled.

We have to hook this logic into the type conversion and I think the least intrusive way is to set it as a class variable of TimestampType - hook the change on conf change or something.

In this way, when the user asks about the weird timestamp behavior, we can at least say - well the default config will never work properly, but you can try the strict mode. We also have a chance to gradually switch to strict mode in the future.

@cloud-fan
Copy link
Contributor

Let's clearly define this "strict mode". For the input side, Spark can be more lenient and allow mixed timezone aware and non-aware datetime objects, by using session timezone. For the output side, the "strict mode" will let Spark always produce timezone aware datetime objects for TimestampType?

@gaogaotiantian
Copy link
Contributor Author

For the input side, Spark can be more lenient and allow mixed timezone aware and non-aware datetime objects, by using session timezone.

I think it's better to enforce the match, otherwise the user could potentially create a naive timestamp, send to UDF, then get an aware timestamp because we "output" an aware timestamp from TimestampType. Heuristics makes this this to difficult to get correct.

@cloud-fan
Copy link
Contributor

ok so Spark should fail if naive and aware datetimes are mixed? It's another breaking change though...

@gaogaotiantian
Copy link
Contributor Author

Well Python will fail when naive and aware datetimes are mixed - there's no way for spark to avoid it. It is a breaking change I agree, that's why I propose to but it under an optional flag. The reason we should have this is what I've mentioned above - we simply can't make it right when we mix them. Say the user creates a datetime then changes the session config, vs they create a session config then create a datetime - should the behavior be the same? We won't even be able to distinguish. The only way to make it work properly with easy to explain rules, is to strictly not mix them.

The existing code might work in some naive cases, we can keep it, but when the user tries to do something complicated with their timestamps, they will have issues. We should at least provide one way for them to always get the correct and consistent result.

@cloud-fan
Copy link
Contributor

Let’s open a PR so we can discuss the details more closely.

@gaogaotiantian
Copy link
Contributor Author

Okay sure. This won't be a trivial change but I can draft a PR. The tricky part is where to enforce this. I had a few thoughts in my mind:

  1. Check conf directly in fromInternal and toInternal - but this has to access the session conf of spark in datatype conversion, which is a new pattern and will couple the data type and the existence of a session too much
  2. Pass it as an argument to fromInternal and toInternal - still a new pattern and will have to change every container.
  3. (Might go this way) Add a class-level switch variable for TimestampType and TimestampNTZType and set it when we config session (probably will be a hook somewhere) - we need to pass the confs to workers as well to make it work properly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants