Skip to content
37 changes: 22 additions & 15 deletions google/cloud/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
* a :class:`~google.cloud.bigtable.table.Table` owns a
:class:`~google.cloud.bigtable.row.Row` (and all the cells in the row)
"""
import copy
import os
import warnings
import grpc # type: ignore

from google.api_core.gapic_v1 import client_info as client_info_lib
from google.auth.credentials import AnonymousCredentials # type: ignore

from google.cloud import bigtable_v2
from google.cloud.bigtable import admin
from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport
from google.cloud.bigtable.admin.services.bigtable_instance_admin.transports import (
BigtableInstanceAdminGrpcTransport,
)
Expand All @@ -54,6 +53,11 @@
from google.cloud.bigtable.cluster import _CLUSTER_NAME_RE
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore

from google.cloud.bigtable.data import BigtableDataClient
from google.cloud.bigtable.data._helpers import (
_DEFAULT_BIGTABLE_EMULATOR_CLIENT,
)


INSTANCE_TYPE_PRODUCTION = instance.Instance.Type.PRODUCTION
INSTANCE_TYPE_DEVELOPMENT = instance.Instance.Type.DEVELOPMENT
Expand All @@ -66,7 +70,6 @@
READ_ONLY_SCOPE = "https://www.googleapis.com/auth/bigtable.data.readonly"
"""Scope for reading table data."""

_DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator"
_GRPC_CHANNEL_OPTIONS = (
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
Expand Down Expand Up @@ -290,18 +293,7 @@ def table_data_client(self):
:rtype: :class:`.bigtable_v2.BigtableClient`
:returns: A BigtableClient object.
"""
if self._table_data_client is None:
transport = self._create_gapic_client_channel(
bigtable_v2.BigtableClient,
BigtableGrpcTransport,
)
klass = _create_gapic_client(
bigtable_v2.BigtableClient,
client_options=self._client_options,
transport=transport,
)
self._table_data_client = klass(self)
return self._table_data_client
return self._veneer_data_client._gapic_client

@property
def table_admin_client(self):
Expand Down Expand Up @@ -369,6 +361,21 @@ def instance_admin_client(self):
self._instance_admin_client = klass(self)
return self._instance_admin_client

@property
def _veneer_data_client(self):
"""Getter for the new Data Table API."""
if self._table_data_client is None:
client_info = copy.copy(self._client_info)
client_info.client_library_version = f"{bigtable.__version__}-data-shim"
self._table_data_client = BigtableDataClient(
project=self.project,
credentials=self._credentials,
client_options=self._client_options,
_client_info=client_info,
_disable_background_refresh=True,
)
return self._table_data_client

def instance(self, instance_id, display_name=None, instance_type=None, labels=None):
"""Factory to create a instance associated with this client.

Expand Down
19 changes: 15 additions & 4 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import google.auth.credentials
import google.auth._default
from google.api_core import client_options as client_options_lib
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
from google.cloud.bigtable.data.row import Row
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
Expand All @@ -73,6 +72,7 @@
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_error_type
Expand Down Expand Up @@ -184,9 +184,15 @@ def __init__(
"""
if "pool_size" in kwargs:
warnings.warn("pool_size no longer supported")
# set up client info headers for veneer library
self.client_info = DEFAULT_CLIENT_INFO
self.client_info.client_library_version = self._client_version()

# set up client info headers for veneer library. _client_info is for internal use only,
# for the legacy client shim.
if kwargs.get("_client_info"):
self.client_info = kwargs["_client_info"]
else:
self.client_info = DEFAULT_CLIENT_INFO
self.client_info.client_library_version = self._client_version()

# parse client options
if type(client_options) is dict:
client_options = client_options_lib.from_dict(client_options)
Expand Down Expand Up @@ -236,6 +242,10 @@ def __init__(
"is the default."
)
self._is_closed = CrossSync.Event()
# Private argument, for internal use only
self._disable_background_refresh = bool(
kwargs.get("_disable_background_refresh", False)
)
self.transport = cast(TransportType, self._gapic_client.transport)
# keep track of active instances to for warmup on channel refresh
self._active_instances: Set[_WarmedInstanceKey] = set()
Expand Down Expand Up @@ -329,6 +339,7 @@ def _start_background_channel_refresh(self) -> None:
not self._channel_refresh_task
and not self._emulator_host
and not self._is_closed.is_set()
and not self._disable_background_refresh
):
# raise error if not in an event loop in async client
CrossSync.verify_async_event_loop()
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
# used by read_rows_sharded to limit how many requests are attempted in parallel
_CONCURRENCY_LIMIT = 10

# used by every data client as a default project name for testing on Bigtable emulator.
_DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator"

# used to identify an active bigtable resource that needs to be warmed through PingAndWarm
# each instance/app_profile_id pair needs to be individually tracked
_WarmedInstanceKey = namedtuple(
Expand Down
13 changes: 10 additions & 3 deletions google/cloud/bigtable/data/_sync_autogen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@
import google.auth.credentials
import google.auth._default
from google.api_core import client_options as client_options_lib
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
from google.cloud.bigtable.data.row import Row
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_error_type
Expand Down Expand Up @@ -127,8 +127,11 @@ def __init__(
"""
if "pool_size" in kwargs:
warnings.warn("pool_size no longer supported")
self.client_info = DEFAULT_CLIENT_INFO
self.client_info.client_library_version = self._client_version()
if kwargs.get("_client_info"):
self.client_info = kwargs["_client_info"]
else:
self.client_info = DEFAULT_CLIENT_INFO
self.client_info.client_library_version = self._client_version()
if type(client_options) is dict:
client_options = client_options_lib.from_dict(client_options)
client_options = cast(
Expand Down Expand Up @@ -168,6 +171,9 @@ def __init__(
f"The configured universe domain ({self.universe_domain}) does not match the universe domain found in the credentials ({self._credentials.universe_domain}). If you haven't configured the universe domain explicitly, `googleapis.com` is the default."
)
self._is_closed = CrossSync._Sync_Impl.Event()
self._disable_background_refresh = bool(
kwargs.get("_disable_background_refresh", False)
)
self.transport = cast(TransportType, self._gapic_client.transport)
self._active_instances: Set[_WarmedInstanceKey] = set()
self._instance_owners: dict[_WarmedInstanceKey, Set[int]] = {}
Expand Down Expand Up @@ -238,6 +244,7 @@ def _start_background_channel_refresh(self) -> None:
not self._channel_refresh_task
and (not self._emulator_host)
and (not self._is_closed.is_set())
and (not self._disable_background_refresh)
):
CrossSync._Sync_Impl.verify_async_event_loop()
self._channel_refresh_task = CrossSync._Sync_Impl.create_task(
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def __init__(self, table_id, instance, mutation_timeout=None, app_profile_id=Non
self._app_profile_id = app_profile_id
self.mutation_timeout = mutation_timeout

self._table_impl = self._instance._client._veneer_data_client.get_table(
self._instance.instance_id,
self.table_id,
app_profile_id=self._app_profile_id,
)

@property
def name(self):
"""Table name used in requests.
Expand Down
72 changes: 72 additions & 0 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,13 @@ async def test_ctor(self):
@CrossSync.pytest
async def test_ctor_super_inits(self):
from google.cloud.client import ClientWithProject
from google.cloud.bigtable import __version__ as bigtable_version
from google.api_core import client_options as client_options_lib
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
DEFAULT_CLIENT_INFO,
)

import copy

project = "project-id"
credentials = AnonymousCredentials()
Expand Down Expand Up @@ -147,13 +153,63 @@ async def test_ctor_super_inits(self):
kwargs = bigtable_client_init.call_args[1]
assert kwargs["credentials"] == credentials
assert kwargs["client_options"] == options_parsed

expected_client_info = copy.copy(DEFAULT_CLIENT_INFO)
expected_client_info.client_library_version = (
f"{bigtable_version}-data"
if not CrossSync.is_async
else f"{bigtable_version}-data-async"
)
assert (
kwargs["client_info"].to_user_agent()
== expected_client_info.to_user_agent()
)
assert (
kwargs["client_info"].to_grpc_metadata()
== expected_client_info.to_grpc_metadata()
)

# test mixin superclass init was called
assert client_project_init.call_count == 1
kwargs = client_project_init.call_args[1]
assert kwargs["project"] == project
assert kwargs["credentials"] == credentials
assert kwargs["client_options"] == options_parsed

@CrossSync.pytest
async def test_ctor_legacy_client(self):
from google.api_core import client_options as client_options_lib
from google.api_core.gapic_v1.client_info import ClientInfo

project = "project-id"
credentials = AnonymousCredentials()
client_info = ClientInfo(gapic_version="1.2.3", user_agent="test-client-")
client_options = {"api_endpoint": "foo.bar:1234"}
options_parsed = client_options_lib.from_dict(client_options)
with mock.patch.object(
CrossSync.GapicClient, "__init__"
) as bigtable_client_init:
try:
client = self._make_client(
project=project,
credentials=credentials,
client_options=options_parsed,
use_emulator=False,
_client_info=client_info,
_disable_background_refresh=True,
)

assert client._disable_background_refresh
assert client.client_info is client_info
except TypeError:
pass

# test gapic superclass init was called with the right arguments
assert bigtable_client_init.call_count == 1
kwargs = bigtable_client_init.call_args[1]
assert kwargs["credentials"] == credentials
assert kwargs["client_options"] == options_parsed

@CrossSync.pytest
async def test_ctor_dict_options(self):
from google.api_core.client_options import ClientOptions
Expand Down Expand Up @@ -245,6 +301,22 @@ async def test__start_background_channel_refresh(self):
assert ping_and_warm.call_count == 1
await client.close()

@CrossSync.pytest
async def test__start_background_channel_refresh_disable_background_refresh(self):
client = self._make_client(
project="project-id",
_disable_background_refresh=True,
)
# should create background tasks for each channel
with mock.patch.object(
client, "_ping_and_warm_instances", CrossSync.Mock()
) as ping_and_warm:
client._emulator_host = None
client.transport._grpc_channel = CrossSync.SwappableChannel(mock.Mock)
client._start_background_channel_refresh()
assert client._channel_refresh_task is None
ping_and_warm.assert_not_called()

@CrossSync.drop
@CrossSync.pytest
@pytest.mark.skipif(
Expand Down
64 changes: 64 additions & 0 deletions tests/unit/data/_sync_autogen/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ def test_ctor(self):

def test_ctor_super_inits(self):
from google.cloud.client import ClientWithProject
from google.cloud.bigtable import __version__ as bigtable_version
from google.api_core import client_options as client_options_lib
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
DEFAULT_CLIENT_INFO,
)
import copy

project = "project-id"
credentials = AnonymousCredentials()
Expand All @@ -116,12 +121,56 @@ def test_ctor_super_inits(self):
kwargs = bigtable_client_init.call_args[1]
assert kwargs["credentials"] == credentials
assert kwargs["client_options"] == options_parsed
expected_client_info = copy.copy(DEFAULT_CLIENT_INFO)
expected_client_info.client_library_version = (
f"{bigtable_version}-data"
if not CrossSync._Sync_Impl.is_async
else f"{bigtable_version}-data-async"
)
assert (
kwargs["client_info"].to_user_agent()
== expected_client_info.to_user_agent()
)
assert (
kwargs["client_info"].to_grpc_metadata()
== expected_client_info.to_grpc_metadata()
)
assert client_project_init.call_count == 1
kwargs = client_project_init.call_args[1]
assert kwargs["project"] == project
assert kwargs["credentials"] == credentials
assert kwargs["client_options"] == options_parsed

def test_ctor_legacy_client(self):
from google.api_core import client_options as client_options_lib
from google.api_core.gapic_v1.client_info import ClientInfo

project = "project-id"
credentials = AnonymousCredentials()
client_info = ClientInfo(gapic_version="1.2.3", user_agent="test-client-")
client_options = {"api_endpoint": "foo.bar:1234"}
options_parsed = client_options_lib.from_dict(client_options)
with mock.patch.object(
CrossSync._Sync_Impl.GapicClient, "__init__"
) as bigtable_client_init:
try:
client = self._make_client(
project=project,
credentials=credentials,
client_options=options_parsed,
use_emulator=False,
_client_info=client_info,
_disable_background_refresh=True,
)
assert client._disable_background_refresh
assert client.client_info is client_info
except TypeError:
pass
assert bigtable_client_init.call_count == 1
kwargs = bigtable_client_init.call_args[1]
assert kwargs["credentials"] == credentials
assert kwargs["client_options"] == options_parsed

def test_ctor_dict_options(self):
from google.api_core.client_options import ClientOptions

Expand Down Expand Up @@ -194,6 +243,21 @@ def test__start_background_channel_refresh(self):
assert ping_and_warm.call_count == 1
client.close()

def test__start_background_channel_refresh_disable_background_refresh(self):
client = self._make_client(
project="project-id", _disable_background_refresh=True
)
with mock.patch.object(
client, "_ping_and_warm_instances", CrossSync._Sync_Impl.Mock()
) as ping_and_warm:
client._emulator_host = None
client.transport._grpc_channel = CrossSync._Sync_Impl.SwappableChannel(
mock.Mock
)
client._start_background_channel_refresh()
assert client._channel_refresh_task is None
ping_and_warm.assert_not_called()

def test__ping_and_warm_instances(self):
"""test ping and warm with mocked asyncio.gather"""
client_mock = mock.Mock()
Expand Down
Loading
Loading