Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions google/cloud/aiplatform/utils/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

import datetime
import glob

# Version detection and compatibility layer for google-cloud-storage v2/v3
from importlib.metadata import version as get_version
import logging
import os
import pathlib
import tempfile
from typing import Optional, TYPE_CHECKING
import warnings

from google.auth import credentials as auth_credentials
from google.cloud import storage
from packaging.version import Version

from google.cloud.aiplatform import initializer
from google.cloud.aiplatform.utils import resource_manager_utils
Expand All @@ -35,6 +40,66 @@
_logger = logging.getLogger(__name__)


# Detect google-cloud-storage version once at module load
try:
_GCS_VERSION = Version(get_version("google-cloud-storage"))
except Exception:
# Fallback if version detection fails (should not happen in normal use)
_GCS_VERSION = Version("2.0.0")

_USE_FROM_URI = _GCS_VERSION >= Version("3.0.0")

# Warn users on v2 about upcoming deprecation
if _GCS_VERSION < Version("3.0.0"):
warnings.warn(
"Support for google-cloud-storage < 3.0.0 will be removed in a future"
" version of google-cloud-aiplatform. Please upgrade to"
" google-cloud-storage >= 3.0.0.",
FutureWarning,
stacklevel=2,
)


def blob_from_uri(uri: str, client: storage.Client) -> storage.Blob:
"""Create a Blob from a GCS URI, compatible with v2 and v3.

This function provides compatibility across google-cloud-storage versions:
- v3.x: Uses Blob.from_uri()
- v2.x: Uses Blob.from_string() (deprecated in v3)

Args:
uri: GCS URI (e.g., 'gs://bucket/path/to/blob')
client: Storage client instance

Returns:
storage.Blob: Blob instance
"""
if _USE_FROM_URI:
return storage.Blob.from_uri(uri, client=client)
else:
return storage.Blob.from_string(uri, client=client)


def bucket_from_uri(uri: str, client: storage.Client) -> storage.Bucket:
"""Create a Bucket from a GCS URI, compatible with v2 and v3.

This function provides compatibility across google-cloud-storage versions:
- v3.x: Uses Bucket.from_uri()
- v2.x: Uses Bucket.from_string() (deprecated in v3)

Args:
uri: GCS bucket URI (e.g., 'gs://bucket-name')
client: Storage client instance

Returns:
storage.Bucket: Bucket instance
"""
if _USE_FROM_URI:
return storage.Bucket.from_uri(uri, client=client)
else:
return storage.Bucket.from_string(uri, client=client)


def upload_to_gcs(
source_path: str,
destination_uri: str,
Expand Down Expand Up @@ -79,18 +144,18 @@ def upload_to_gcs(
destination_file_uri = (
destination_uri.rstrip("/") + "/" + source_file_relative_posix_path
)
_logger.debug(f'Uploading "{source_file_path}" to "{destination_file_uri}"')
destination_blob = storage.Blob.from_string(
_logger.debug(
'Uploading "%s" to "%s"', source_file_path, destination_file_uri
)
destination_blob = blob_from_uri(
destination_file_uri, client=storage_client
)
destination_blob.upload_from_filename(filename=source_file_path)
else:
source_file_path = source_path
destination_file_uri = destination_uri
_logger.debug(f'Uploading "{source_file_path}" to "{destination_file_uri}"')
destination_blob = storage.Blob.from_string(
destination_file_uri, client=storage_client
)
_logger.debug('Uploading "%s" to "%s"', source_file_path, destination_file_uri)
destination_blob = blob_from_uri(destination_file_uri, client=storage_client)
destination_blob.upload_from_filename(filename=source_file_path)


Expand Down Expand Up @@ -234,7 +299,7 @@ def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
credentials=credentials,
)

pipelines_bucket = storage.Bucket.from_string(
pipelines_bucket = bucket_from_uri(
uri=output_artifacts_gcs_dir,
client=storage_client,
)
Expand Down Expand Up @@ -294,9 +359,9 @@ def download_file_from_gcs(
credentials = credentials or initializer.global_config.credentials

storage_client = storage.Client(project=project, credentials=credentials)
source_blob = storage.Blob.from_string(source_file_uri, client=storage_client)
source_blob = blob_from_uri(source_file_uri, client=storage_client)

_logger.debug(f'Downloading "{source_file_uri}" to "{destination_file_path}"')
_logger.debug('Downloading "%s" to "%s"', source_file_uri, destination_file_path)

source_blob.download_to_filename(filename=destination_file_path)

Expand Down Expand Up @@ -354,13 +419,10 @@ def _upload_pandas_df_to_gcs(
"""Uploads the provided Pandas DataFrame to a GCS bucket.

Args:
df (pandas.DataFrame):
Required. The Pandas DataFrame to upload.
upload_gcs_path (str):
Required. The GCS path to upload the data file.
file_format (str):
Required. The format to export the DataFrame to. Currently
only JSONL is supported.
df (pandas.DataFrame): Required. The Pandas DataFrame to upload.
upload_gcs_path (str): Required. The GCS path to upload the data file.
file_format (str): Required. The format to export the DataFrame to.
Currently only JSONL is supported.

Raises:
ValueError: When a file format other than JSONL is provided.
Expand All @@ -378,9 +440,9 @@ def _upload_pandas_df_to_gcs(
project=initializer.global_config.project,
credentials=initializer.global_config.credentials,
)
storage.Blob.from_string(
uri=upload_gcs_path, client=storage_client
).upload_from_filename(filename=local_dataset_path)
blob_from_uri(uri=upload_gcs_path, client=storage_client).upload_from_filename(
filename=local_dataset_path
)


def validate_gcs_path(gcs_path: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/aiplatform/utils/yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.auth import transport
from google.cloud import storage
from google.cloud.aiplatform.constants import pipeline as pipeline_constants
from google.cloud.aiplatform.utils.gcs_utils import blob_from_uri

# Pattern for an Artifact Registry URL.
_VALID_AR_URL = pipeline_constants._VALID_AR_URL
Expand Down Expand Up @@ -98,7 +99,7 @@ def _load_yaml_from_gs_uri(
"""
yaml = _maybe_import_yaml()
storage_client = storage.Client(project=project, credentials=credentials)
blob = storage.Blob.from_string(uri, storage_client)
blob = blob_from_uri(uri, storage_client)
return yaml.safe_load(blob.download_as_bytes())


Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@
"proto-plus >= 1.22.3, <2.0.0",
"protobuf>=3.20.2,<7.0.0,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5",
"packaging >= 14.3",
"google-cloud-storage >= 1.32.0, < 3.0.0; python_version<'3.13'",
"google-cloud-storage >= 2.10.0, < 3.0.0; python_version>='3.13'",
"google-cloud-storage >= 1.32.0, < 4.0.0; python_version<'3.13'",
"google-cloud-storage >= 2.10.0, < 4.0.0; python_version>='3.13'",
"google-cloud-bigquery >= 1.15.0, < 4.0.0, !=3.20.0",
"google-cloud-resource-manager >= 1.3.3, < 3.0.0",
"shapely < 3.0.0",
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ google-auth==2.35.0 # Tests google-auth with rest async support
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
grpcio-testing==1.34.0
mlflow==2.16.0 # Pinned to speed up installation
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ google-auth==2.35.0 # Tests google-auth with rest async support
proto-plus
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
pytest-xdist==3.3.1 # Pinned to unbreak unit tests
ray==2.5.0 # Pinned until 2.9.3 is verified for Ray tests
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.12.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ google-api-core==2.21.0 # Tests google-api-core with rest async support
google-auth==2.35.0 # Tests google-auth with rest async support
proto-plus
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
pytest-xdist==3.3.1 # Pinned to unbreak unit tests
ray==2.5.0 # Pinned until 2.9.3 is verified for Ray tests
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.13.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ google-api-core==2.21.0 # Tests google-api-core with rest async support
google-auth==2.35.0 # Tests google-auth with rest async support
proto-plus
mock==4.0.2
google-cloud-storage==2.10.0 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
pytest-xdist==3.3.1 # Pinned to unbreak unit tests
ray==2.5.0 # Pinned until 2.9.3 is verified for Ray tests
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.8.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ google-auth==2.14.1 # Tests google-auth without rest async support
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
grpcio-testing==1.34.0
pytest-xdist==3.3.1 # Pinned to unbreak unit tests
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ google-auth==2.35.0 # Tests google-auth with rest async support
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
grpcio-testing==1.34.0
pytest-xdist==3.3.1 # Pinned to unbreak unit tests
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-ray-2.33.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ google-api-core
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
grpcio-testing==1.34.0
mlflow==1.30.1 # Pinned to speed up installation
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-ray-2.4.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ google-api-core
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==20.0 # Increased for compatibility with MLFlow
grpcio-testing==1.34.0
mlflow==1.30.1 # Pinned to speed up installation
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-ray-2.42.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ google-api-core
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
grpcio-testing==1.34.0
mlflow==1.30.1 # Pinned to speed up installation
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-ray-2.47.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ google-api-core
proto-plus==1.22.3
protobuf
mock==4.0.2
google-cloud-storage==2.2.1 # Increased for kfp 2.0 compatibility
google-cloud-storage==3.0.0 # Updated to v3.x, backward compatible with v2.x via wrapper
packaging==24.1 # Increased to unbreak canonicalize_version error (b/377774673)
grpcio-testing==1.34.0
mlflow==1.30.1 # Pinned to speed up installation
Expand Down
3 changes: 2 additions & 1 deletion tests/system/aiplatform/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from tests.system.aiplatform import e2e_base
from tests.system.aiplatform import test_model_upload
from google.cloud.aiplatform.utils.gcs_utils import blob_from_uri

import numpy as np
import sklearn
Expand Down Expand Up @@ -332,7 +333,7 @@ def test_log_execution_and_artifact(self, shared_state):
shared_state["resources"].append(model)

storage_client = storage.Client(project=e2e_base._PROJECT)
model_blob = storage.Blob.from_string(
model_blob = blob_from_uri(
uri=test_model_upload._XGBOOST_MODEL_URI, client=storage_client
)
model_path = tempfile.mktemp() + ".my_model.xgb"
Expand Down
9 changes: 3 additions & 6 deletions tests/system/aiplatform/test_model_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud import storage

from tests.system.aiplatform import e2e_base
from google.cloud.aiplatform.utils.gcs_utils import blob_from_uri


_XGBOOST_MODEL_URI = "gs://cloud-samples-data-us-central1/vertex-ai/google-cloud-aiplatform-ci-artifacts/models/iris_xgboost/model.bst"
Expand All @@ -42,9 +43,7 @@ def test_upload_and_deploy_xgboost_model(self, shared_state):
)

storage_client = storage.Client(project=e2e_base._PROJECT)
model_blob = storage.Blob.from_string(
uri=_XGBOOST_MODEL_URI, client=storage_client
)
model_blob = blob_from_uri(uri=_XGBOOST_MODEL_URI, client=storage_client)
model_path = tempfile.mktemp() + ".my_model.xgb"
model_blob.download_to_filename(filename=model_path)

Expand All @@ -53,9 +52,7 @@ def test_upload_and_deploy_xgboost_model(self, shared_state):
)
shared_state["resources"] = [model]

staging_bucket = storage.Blob.from_string(
uri=model.uri, client=storage_client
).bucket
staging_bucket = blob_from_uri(uri=model.uri, client=storage_client).bucket
# Checking that the bucket is auto-generated
assert "-vertex-staging-" in staging_bucket.name

Expand Down
7 changes: 3 additions & 4 deletions tests/system/aiplatform/test_model_version_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from tests.system.aiplatform import e2e_base
from tests.system.aiplatform import test_model_upload
from google.cloud.aiplatform.utils.gcs_utils import blob_from_uri


@pytest.mark.usefixtures("tear_down_resources")
Expand All @@ -42,7 +43,7 @@ def test_upload_deploy_manage_versioned_model(self, shared_state):
)

storage_client = storage.Client(project=e2e_base._PROJECT)
model_blob = storage.Blob.from_string(
model_blob = blob_from_uri(
uri=test_model_upload._XGBOOST_MODEL_URI, client=storage_client
)
model_path = tempfile.mktemp() + ".my_model.xgb"
Expand All @@ -60,9 +61,7 @@ def test_upload_deploy_manage_versioned_model(self, shared_state):
)
shared_state["resources"] = [model]

staging_bucket = storage.Blob.from_string(
uri=model.uri, client=storage_client
).bucket
staging_bucket = blob_from_uri(uri=model.uri, client=storage_client).bucket
# Checking that the bucket is auto-generated
assert "-vertex-staging-" in staging_bucket.name

Expand Down
Loading
Loading