From 2463c1f39c54dc62ab8e30984a8577dde4caf145 Mon Sep 17 00:00:00 2001 From: David Coe <> Date: Wed, 15 Oct 2025 22:13:27 -0400 Subject: [PATCH 1/5] add metadata calls --- .../Drivers/BigQuery/BigQueryConnection.cs | 12 +- .../Drivers/BigQuery/BigQueryParameters.cs | 26 ++- .../src/Drivers/BigQuery/BigQueryStatement.cs | 169 ++++++++++++++++-- .../BigQuery/BigQueryTestConfiguration.cs | 9 + .../Drivers/BigQuery/BigQueryTestingUtils.cs | 10 ++ csharp/test/Drivers/BigQuery/DriverTests.cs | 58 ++++++ 6 files changed, 262 insertions(+), 22 deletions(-) diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs index 43082d8be8..1a3c70cc27 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs @@ -142,6 +142,8 @@ private bool TryInitTracerProvider(out FileActivityListener? fileActivityListene // if this value is null, the BigQuery API chooses the location (typically the `US` multi-region) internal string? DefaultClientLocation { get; private set; } + internal bool CreateLargeResultsDataset { get; private set; } = true; + public override string AssemblyVersion => BigQueryUtils.BigQueryAssemblyVersion; public override string AssemblyName => BigQueryUtils.BigQueryAssemblyName; @@ -204,6 +206,13 @@ internal BigQueryClient Open(string? projectId = null) activity?.AddBigQueryParameterTag(BigQueryParameters.ClientTimeout, seconds); } + if (this.properties.TryGetValue(BigQueryParameters.CreateLargeResultsDataset, out string? sCreateLargeResultDataset) && + bool.TryParse(sCreateLargeResultDataset, out bool createLargeResultDataset)) + { + CreateLargeResultsDataset = createLargeResultDataset; + activity?.AddBigQueryParameterTag(BigQueryParameters.CreateLargeResultsDataset, createLargeResultDataset); + } + SetCredential(); BigQueryClientBuilder bigQueryClientBuilder = new BigQueryClientBuilder() @@ -1314,7 +1323,8 @@ private Dictionary ParseOptions() BigQueryParameters.MaxFetchConcurrency, BigQueryParameters.StatementType, BigQueryParameters.StatementIndex, - BigQueryParameters.EvaluationKind + BigQueryParameters.EvaluationKind, + BigQueryParameters.IsMetadataCommand }; foreach (string key in statementOptions) diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs index ec18e55f1a..20c79fd0ca 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs @@ -52,14 +52,33 @@ internal class BigQueryParameters public const string StatementType = "adbc.bigquery.multiple_statement.statement_type"; public const string UseLegacySQL = "adbc.bigquery.use_legacy_sql"; + /// + /// Indicates whether the driver should create the dataset specified in the + /// parameter if it does not already exist. + /// + public const string CreateLargeResultsDataset = "adbc.bigquery.create_large_results_dataset"; + + + /// + /// The indicator of whether the AdbcStatement.ExecuteQuery[Async] should execute a metadata command query. + /// In the case this indicator is set to True, the method will execute a metadata command using the native API where + /// the name of the command is given in the AdbcStatement.SqlQuery property value. + /// + public const string IsMetadataCommand = "adbc.bigquery.statement.is_metadata_command"; + + /// + /// Indicates whether the driver should attempt to detect a location for jobs if no "/> is specified. + /// + public const string DetectJobLocation = "adbc.bigquery.detect_job_location"; + // these values are safe to log any time private static HashSet safeToLog = new HashSet(StringComparer.OrdinalIgnoreCase) { AllowLargeResults, AuthenticationType, BillingProjectId, ClientId, ClientTimeout, DefaultClientLocation, EvaluationKind, GetQueryResultsOptionsTimeout, EvaluationKind, GetQueryResultsOptionsTimeout, IncludeConstraintsWithGetObjects, - IncludePublicProjectId, LargeDecimalsAsString, LargeResultsDataset, LargeResultsDestinationTable, + IncludePublicProjectId, LargeDecimalsAsString, CreateLargeResultsDataset, LargeResultsDataset, LargeResultsDestinationTable, MaxFetchConcurrency, MaximumRetryAttempts, ProjectId, RetryDelayMs, StatementIndex, - StatementType, UseLegacySQL + StatementType, UseLegacySQL, IsMetadataCommand }; public static bool IsSafeToLog(string name) @@ -97,9 +116,6 @@ internal class BigQueryConstants public const string PublicProjectId = "bigquery-public-data"; - // this is what the BigQuery API uses as the default location - public const string DefaultClientLocation = "US"; - // from https://cloud.google.com/bigquery/docs/locations#locations_and_regions as of Sept 28, 2025 public static IReadOnlyList ValidLocations = new List() { diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs index cb6d7435ae..2e1261e886 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Data; using System.Diagnostics; using System.IO; using System.Linq; @@ -45,6 +46,8 @@ class BigQueryStatement : TracingStatement, ITokenProtectedResource, IDisposable readonly BigQueryConnection bigQueryConnection; readonly CancellationRegistry cancellationRegistry; + private const string GetDatasetCommandName = "datasetexists"; + public BigQueryStatement(BigQueryConnection bigQueryConnection) : base(bigQueryConnection) { if (bigQueryConnection == null) { throw new AdbcException($"{nameof(bigQueryConnection)} cannot be null", AdbcStatusCode.InvalidArgument); } @@ -72,6 +75,16 @@ public BigQueryStatement(BigQueryConnection bigQueryConnection) : base(bigQueryC public override string AssemblyName => BigQueryUtils.BigQueryAssemblyName; + private bool IsMetadataCommand() + { + bool result = false; + if (Options?.TryGetValue(BigQueryParameters.IsMetadataCommand, out string? isMetadataString) == true) + { + result = bool.TryParse(isMetadataString, out bool isMetadata) && isMetadata; + } + return result; + } + public override void SetOption(string key, string value) { if (Options == null) @@ -84,6 +97,11 @@ public override void SetOption(string key, string value) public override QueryResult ExecuteQuery() { + if (IsMetadataCommand()) + { + return ExecuteMetadataCommandQuery(); + } + return ExecuteQueryInternalAsync().GetAwaiter().GetResult(); } @@ -438,6 +456,7 @@ private QueryOptions ValidateOptions(Activity? activity) return options; string largeResultDatasetId = BigQueryConstants.DefaultLargeDatasetId; + bool detectJobLocation = false; foreach (KeyValuePair keyValuePair in Options) { @@ -451,6 +470,9 @@ private QueryOptions ValidateOptions(Activity? activity) largeResultDatasetId = keyValuePair.Value; activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset, largeResultDatasetId); break; + case BigQueryParameters.DetectJobLocation: + detectJobLocation = true ? keyValuePair.Value.Equals("true", StringComparison.OrdinalIgnoreCase) : false; + break; case BigQueryParameters.LargeResultsDestinationTable: string destinationTable = keyValuePair.Value; @@ -488,6 +510,20 @@ private QueryOptions ValidateOptions(Activity? activity) } } + // if no DefaultLocation was specified for the client, but a dataset is specified, then use its location for the jobs + // to avoid errors like: + // Cannot executeb__1 after 5 tries.Last exception: The service bigquery has thrown an exception. No HttpStatusCode was specified. + // Job gcp-pro-******-****/US/job_2cdc3c18_c36a_4b51_aeac_************ contained 2 error(s).First error message: Not found: Dataset gcp-pro-******-****:view_*************_******* + // was not found in location US + // because the default location for the client is US if not specified + if (string.IsNullOrEmpty(this.Client.DefaultLocation) && + detectJobLocation && + !string.IsNullOrEmpty(largeResultDatasetId) + && DatasetExists(largeResultDatasetId, out BigQueryDataset? dataset)) + { + options.JobLocation = dataset?.Resource.Location; + } + if (options.AllowLargeResults == true && options.DestinationTable == null) { options.DestinationTable = TryGetLargeDestinationTableReference(largeResultDatasetId, activity); @@ -505,22 +541,7 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac { BigQueryDataset? dataset = null; - try - { - activity?.AddBigQueryTag("large_results.dataset.try_find", datasetId); - dataset = this.Client.GetDataset(datasetId); - activity?.AddBigQueryTag("large_results.dataset.found", datasetId); - } - catch (GoogleApiException gaEx) - { - if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound) - { - activity?.AddException(gaEx); - throw new AdbcException($"Failure trying to retrieve dataset {datasetId}", gaEx); - } - } - - if (dataset == null) + if ((!DatasetExists(datasetId, out dataset) || dataset == null) && bigQueryConnection.CreateLargeResultsDataset) { try { @@ -571,6 +592,34 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac private async Task ExecuteWithRetriesAsync(Func> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs); + public bool DatasetExists(string datasetId, out BigQueryDataset? dataset) + { + BigQueryDataset? tempDataset = null; + bool result = this.TraceActivity(activity => + { + try + { + activity?.AddBigQueryTag("large_results.dataset.try_find", datasetId); + tempDataset = this.Client.GetDataset(datasetId); + activity?.AddBigQueryTag("large_results.dataset.found", datasetId); + return true; + } + catch (GoogleApiException gaEx) + { + if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound) + { + activity?.AddException(gaEx); + throw new AdbcException($"Failure trying to retrieve dataset {datasetId}", gaEx); + } + tempDataset = null; + return false; + } + }); + + dataset = tempDataset; + return result; + } + private async Task ExecuteCancellableJobAsync( JobCancellationContext context, Activity? activity, @@ -604,6 +653,94 @@ private async Task ExecuteCancellableJobAsync( } } + /// + /// Allows sending a function name as the SqlQuery to execute a function and return the result. + /// + /// + /// + /// + private QueryResult ExecuteMetadataCommandQuery() + { + return SqlQuery?.ToLowerInvariant() switch + { + GetDatasetCommandName => ExecuteDatasetExistsCommand(), + null or "" => throw new ArgumentNullException(nameof(SqlQuery), $"Metadata command for property 'SqlQuery' must not be empty or null. Supported metadata commands: {GetDatasetCommandName}"), + _ => throw new NotSupportedException($"Metadata command '{SqlQuery}' is not supported. Supported metadata commands: {GetDatasetCommandName}"), + }; + } + + private QueryResult ExecuteDatasetExistsCommand() + { + KeyValuePair? option = Options != null ? Options.Where(x => x.Key == BigQueryParameters.LargeResultsDataset).FirstOrDefault() : null; + + if (option == null || string.IsNullOrEmpty(option?.Value)) + { + throw new ArgumentNullException(nameof(Options), $"The option '{BigQueryParameters.LargeResultsDataset}' must be set to the dataset name."); + } + + bool exists = DatasetExists(option?.Value!, out BigQueryDataset? dataset); + + Schema schema = new Schema(new List + { + new Field("name", StringType.Default, false), + new Field("exists", BooleanType.Default, false), + new Field("location", StringType.Default, false) + }, null); + + StringArray datasetNameArray = new StringArray.Builder() + .Append(option?.Value!) + .Build(); + + BooleanArray datasetExistsArray = new BooleanArray.Builder() + .Append(exists) + .Build(); + + StringArray locationArray = (exists && dataset != null) ? + new StringArray.Builder().Append(dataset.Resource.Location).Build() : + new StringArray.Builder().AppendNull().Build(); + + RecordBatch recordBatch = new RecordBatch( + schema, + new IArrowArray[] { datasetNameArray, datasetExistsArray, locationArray }, + 1 + ); + + // Create a simple array stream using the existing MultiArrowReader pattern + MultiArrowReader stream = new MultiArrowReader(this, schema, new[] { new SingleRecordBatchReader(recordBatch) }, new CancellationContext(cancellationRegistry)); + + return new QueryResult(1, stream); + } + + // Simple reader that yields a single record batch + private class SingleRecordBatchReader : IArrowReader + { + private readonly RecordBatch _batch; + private bool _hasBeenRead = false; + + public SingleRecordBatchReader(RecordBatch batch) + { + _batch = batch; + } + + public Schema Schema => _batch.Schema; + + public ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) + { + if (_hasBeenRead) + { + return new ValueTask(result: null); + } + + _hasBeenRead = true; + return new ValueTask(_batch); + } + + public void Dispose() + { + _batch?.Dispose(); + } + } + private class CancellationContext : IDisposable { private readonly CancellationRegistry cancellationRegistry; diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs index 74f856e102..c59e5b7ffb 100644 --- a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs +++ b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs @@ -143,6 +143,15 @@ public BigQueryTestEnvironment() [JsonPropertyName("queries")] public List ParallelQueries { get; set; } + + [JsonPropertyName("isMetadata")] + public bool IsMetadataCommand { get; set; } = false; + + [JsonPropertyName("detectJobLocation")] + public bool DetectJobLocation { get; set; } = false; + + [JsonPropertyName("expectedMetadataResultJson")] + public string? ExpectedMetadataResultJson { get; set; } } class ParallelQuery diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs index 3945028953..dd0a20acfc 100644 --- a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs +++ b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs @@ -195,6 +195,16 @@ internal static Dictionary GetBigQueryParameters(BigQueryTestEnv parameters.Add(BigQueryParameters.EvaluationKind, testEnvironment.EvaluationKind); } + if (testEnvironment.IsMetadataCommand) + { + parameters.Add(BigQueryParameters.IsMetadataCommand, testEnvironment.IsMetadataCommand.ToString()); + } + + if (testEnvironment.DetectJobLocation) + { + parameters.Add(BigQueryParameters.DetectJobLocation, testEnvironment.DetectJobLocation.ToString()); + } + return parameters; } diff --git a/csharp/test/Drivers/BigQuery/DriverTests.cs b/csharp/test/Drivers/BigQuery/DriverTests.cs index c154581474..dc48535807 100644 --- a/csharp/test/Drivers/BigQuery/DriverTests.cs +++ b/csharp/test/Drivers/BigQuery/DriverTests.cs @@ -18,8 +18,10 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using Apache.Arrow.Adbc.Drivers.BigQuery; +using Apache.Arrow.Adbc.Extensions; using Apache.Arrow.Adbc.Tests.Metadata; using Apache.Arrow.Adbc.Tests.Xunit; using Apache.Arrow.Ipc; @@ -329,6 +331,62 @@ public void CanExecuteParallelQueries() } } + /// + /// Validates if the driver can connect to a live server and + /// parse the results. + /// + [SkippableFact, Order(7)] + public void CanExecuteMetadataQuery() + { + foreach (BigQueryTestEnvironment environment in _environments) + { + if (environment.IsMetadataCommand) + { + AdbcConnection adbcConnection = GetAdbcConnection(environment.Name); + + AdbcStatement statement = adbcConnection.CreateStatement(); + statement.SqlQuery = environment.Query; + + QueryResult queryResult = statement.ExecuteQuery(); + + RecordBatch? recordBatch = queryResult.Stream?.ReadNextRecordBatchAsync().Result; + + Assert.NotNull(recordBatch); + + string json = RecordBatchToJson(recordBatch); + + Assert.True(environment.ExpectedMetadataResultJson?.Equals(json, StringComparison.Ordinal), "Expected results do not match"); + } + } + } + + private static string RecordBatchToJson(RecordBatch recordBatch) + { + List> rows = new List>(); + + for (int rowIndex = 0; rowIndex < recordBatch.Length; rowIndex++) + { + Dictionary row = new Dictionary(); + + for (int colIndex = 0; colIndex < recordBatch.ColumnCount; colIndex++) + { + IArrowArray column = recordBatch.Column(colIndex); + string fieldName = recordBatch.Schema.GetFieldByIndex(colIndex).Name; + + object? value = column.ValueAt(rowIndex); + row[fieldName] = value; + } + + rows.Add(row); + } + + return JsonSerializer.Serialize(rows, new JsonSerializerOptions + { + WriteIndented = false, + PropertyNamingPolicy = null + }); + } + private AdbcConnection GetAdbcConnection(string? environmentName) { if (string.IsNullOrEmpty(environmentName)) From 4f5de6a3d2c56be7edfb4368aed91f7bb43236ca Mon Sep 17 00:00:00 2001 From: David Coe <> Date: Thu, 16 Oct 2025 23:54:18 -0400 Subject: [PATCH 2/5] remove detect job location --- .../src/Drivers/BigQuery/BigQueryParameters.cs | 6 ------ .../src/Drivers/BigQuery/BigQueryStatement.cs | 18 ------------------ .../BigQuery/BigQueryTestConfiguration.cs | 3 --- .../Drivers/BigQuery/BigQueryTestingUtils.cs | 5 ----- 4 files changed, 32 deletions(-) diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs index 20c79fd0ca..c2de7e7b0b 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs @@ -58,7 +58,6 @@ internal class BigQueryParameters /// public const string CreateLargeResultsDataset = "adbc.bigquery.create_large_results_dataset"; - /// /// The indicator of whether the AdbcStatement.ExecuteQuery[Async] should execute a metadata command query. /// In the case this indicator is set to True, the method will execute a metadata command using the native API where @@ -66,11 +65,6 @@ internal class BigQueryParameters /// public const string IsMetadataCommand = "adbc.bigquery.statement.is_metadata_command"; - /// - /// Indicates whether the driver should attempt to detect a location for jobs if no "/> is specified. - /// - public const string DetectJobLocation = "adbc.bigquery.detect_job_location"; - // these values are safe to log any time private static HashSet safeToLog = new HashSet(StringComparer.OrdinalIgnoreCase) { diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs index 2e1261e886..6f77f13d38 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs @@ -456,7 +456,6 @@ private QueryOptions ValidateOptions(Activity? activity) return options; string largeResultDatasetId = BigQueryConstants.DefaultLargeDatasetId; - bool detectJobLocation = false; foreach (KeyValuePair keyValuePair in Options) { @@ -470,9 +469,6 @@ private QueryOptions ValidateOptions(Activity? activity) largeResultDatasetId = keyValuePair.Value; activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset, largeResultDatasetId); break; - case BigQueryParameters.DetectJobLocation: - detectJobLocation = true ? keyValuePair.Value.Equals("true", StringComparison.OrdinalIgnoreCase) : false; - break; case BigQueryParameters.LargeResultsDestinationTable: string destinationTable = keyValuePair.Value; @@ -510,20 +506,6 @@ private QueryOptions ValidateOptions(Activity? activity) } } - // if no DefaultLocation was specified for the client, but a dataset is specified, then use its location for the jobs - // to avoid errors like: - // Cannot executeb__1 after 5 tries.Last exception: The service bigquery has thrown an exception. No HttpStatusCode was specified. - // Job gcp-pro-******-****/US/job_2cdc3c18_c36a_4b51_aeac_************ contained 2 error(s).First error message: Not found: Dataset gcp-pro-******-****:view_*************_******* - // was not found in location US - // because the default location for the client is US if not specified - if (string.IsNullOrEmpty(this.Client.DefaultLocation) && - detectJobLocation && - !string.IsNullOrEmpty(largeResultDatasetId) - && DatasetExists(largeResultDatasetId, out BigQueryDataset? dataset)) - { - options.JobLocation = dataset?.Resource.Location; - } - if (options.AllowLargeResults == true && options.DestinationTable == null) { options.DestinationTable = TryGetLargeDestinationTableReference(largeResultDatasetId, activity); diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs index c59e5b7ffb..ac084137b0 100644 --- a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs +++ b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs @@ -147,9 +147,6 @@ public BigQueryTestEnvironment() [JsonPropertyName("isMetadata")] public bool IsMetadataCommand { get; set; } = false; - [JsonPropertyName("detectJobLocation")] - public bool DetectJobLocation { get; set; } = false; - [JsonPropertyName("expectedMetadataResultJson")] public string? ExpectedMetadataResultJson { get; set; } } diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs index dd0a20acfc..48e209d617 100644 --- a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs +++ b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs @@ -200,11 +200,6 @@ internal static Dictionary GetBigQueryParameters(BigQueryTestEnv parameters.Add(BigQueryParameters.IsMetadataCommand, testEnvironment.IsMetadataCommand.ToString()); } - if (testEnvironment.DetectJobLocation) - { - parameters.Add(BigQueryParameters.DetectJobLocation, testEnvironment.DetectJobLocation.ToString()); - } - return parameters; } From 9a1703b3d082190e467ad80c29d4405de6aff866 Mon Sep 17 00:00:00 2001 From: David Coe <> Date: Sun, 19 Oct 2025 09:15:42 -0400 Subject: [PATCH 3/5] functional --- .../Drivers/BigQuery/BigQueryParameters.cs | 2 +- .../src/Drivers/BigQuery/BigQueryStatement.cs | 206 +++++++++++------- 2 files changed, 124 insertions(+), 84 deletions(-) diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs index c2de7e7b0b..622ad9b39a 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs @@ -72,7 +72,7 @@ internal class BigQueryParameters EvaluationKind, GetQueryResultsOptionsTimeout, IncludeConstraintsWithGetObjects, IncludePublicProjectId, LargeDecimalsAsString, CreateLargeResultsDataset, LargeResultsDataset, LargeResultsDestinationTable, MaxFetchConcurrency, MaximumRetryAttempts, ProjectId, RetryDelayMs, StatementIndex, - StatementType, UseLegacySQL, IsMetadataCommand + StatementType, UseLegacySQL, IsMetadataCommand, DefaultClientLocation }; public static bool IsSafeToLog(string name) diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs index 6f77f13d38..09b975efdd 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs @@ -80,7 +80,10 @@ private bool IsMetadataCommand() bool result = false; if (Options?.TryGetValue(BigQueryParameters.IsMetadataCommand, out string? isMetadataString) == true) { - result = bool.TryParse(isMetadataString, out bool isMetadata) && isMetadata; + if (bool.TryParse(isMetadataString, out bool isMetadata)) + { + result = isMetadata; + } } return result; } @@ -97,12 +100,18 @@ public override void SetOption(string key, string value) public override QueryResult ExecuteQuery() { - if (IsMetadataCommand()) + return this.TraceActivity(activity => { - return ExecuteMetadataCommandQuery(); - } + bool isMetadataCommand = IsMetadataCommand(); + activity?.AddBigQueryParameterTag(BigQueryParameters.IsMetadataCommand, isMetadataCommand); + + if (isMetadataCommand) + { + return ExecuteMetadataCommandQuery(); + } - return ExecuteQueryInternalAsync().GetAwaiter().GetResult(); + return ExecuteQueryInternalAsync().GetAwaiter().GetResult(); + }); } private async Task ExecuteQueryInternalAsync() @@ -169,34 +178,34 @@ private async Task ExecuteQueryInternalAsync() } Func> getMultiJobResults = async () => + { + // To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements. + // Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind + ListJobsOptions listJobsOptions = new ListJobsOptions(); + listJobsOptions.ParentJobId = results.JobReference.JobId; + var joblist = Client.ListJobs(listJobsOptions) + .Select(job => Client.GetJob(job.Reference)) + .Where(job => string.IsNullOrEmpty(evaluationKind) || job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind, StringComparison.OrdinalIgnoreCase)) + .Where(job => string.IsNullOrEmpty(statementType) || job.Statistics.Query.StatementType.Equals(statementType, StringComparison.OrdinalIgnoreCase)) + .OrderBy(job => job.Resource.Statistics.CreationTime) + .ToList(); + + if (joblist.Count > 0) { - // To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements. - // Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind - ListJobsOptions listJobsOptions = new ListJobsOptions(); - listJobsOptions.ParentJobId = results.JobReference.JobId; - var joblist = Client.ListJobs(listJobsOptions) - .Select(job => Client.GetJob(job.Reference)) - .Where(job => string.IsNullOrEmpty(evaluationKind) || job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind, StringComparison.OrdinalIgnoreCase)) - .Where(job => string.IsNullOrEmpty(statementType) || job.Statistics.Query.StatementType.Equals(statementType, StringComparison.OrdinalIgnoreCase)) - .OrderBy(job => job.Resource.Statistics.CreationTime) - .ToList(); - - if (joblist.Count > 0) + if (statementIndex < 1 || statementIndex > joblist.Count) { - if (statementIndex < 1 || statementIndex > joblist.Count) - { - throw new ArgumentOutOfRangeException($"The specified index {statementIndex} is out of range. There are {joblist.Count} jobs available."); - } - BigQueryJob indexedJob = joblist[statementIndex - 1]; - cancellationContext.Job = indexedJob; - return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) => - { - return await indexedJob.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false); - }).ConfigureAwait(false); + throw new ArgumentOutOfRangeException($"The specified index {statementIndex} is out of range. There are {joblist.Count} jobs available."); } + BigQueryJob indexedJob = joblist[statementIndex - 1]; + cancellationContext.Job = indexedJob; + return await ExecuteCancellableJobAsync(cancellationContext, activity, async (context) => + { + return await indexedJob.GetQueryResultsAsync(getQueryResultsOptions, cancellationToken: context.CancellationToken).ConfigureAwait(false); + }).ConfigureAwait(false); + } - throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData); - }; + throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData); + }; results = await ExecuteWithRetriesAsync(getMultiJobResults, activity).ConfigureAwait(false); } @@ -423,34 +432,7 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type) private QueryOptions ValidateOptions(Activity? activity) { QueryOptions options = new QueryOptions(); - - if (Client.ProjectId == BigQueryConstants.DetectProjectId) - { - activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId); - - // An error occurs when calling CreateQueryJob without the ID set, - // so use the first one that is found. This does not prevent from calling - // to other 'project IDs' (catalogs) with a query. - Func?>> func = () => Task.Run(() => - { - return Client?.ListProjects(); - }); - - PagedEnumerable? projects = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult(); - - if (projects != null) - { - string? firstProjectId = projects.Select(x => x.ProjectId).FirstOrDefault(); - - if (firstProjectId != null) - { - options.ProjectId = firstProjectId; - activity?.AddBigQueryTag("detected_client_project_id", firstProjectId); - // need to reopen the Client with the projectId specified - this.bigQueryConnection.Open(firstProjectId); - } - } - } + options.ProjectId = EnsureProjectIdIsConfigured(activity); if (Options == null || Options.Count == 0) return options; @@ -514,6 +496,41 @@ private QueryOptions ValidateOptions(Activity? activity) return options; } + private string EnsureProjectIdIsConfigured(Activity? activity) + { + string projectId = Client.ProjectId; + + if (Client.ProjectId == BigQueryConstants.DetectProjectId) + { + activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId); + + // An error occurs when calling CreateQueryJob without the ID set, + // so use the first one that is found. This does not prevent from calling + // to other 'project IDs' (catalogs) with a query. + Func?>> func = () => Task.Run(() => + { + return Client?.ListProjects(); + }); + + PagedEnumerable? projects = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult(); + + if (projects != null) + { + string? firstProjectId = projects.Select(x => x.ProjectId).FirstOrDefault(); + + if (firstProjectId != null) + { + projectId = firstProjectId; + activity?.AddBigQueryTag("detected_client_project_id", firstProjectId); + // need to reopen the Client with the projectId specified + this.bigQueryConnection.Open(firstProjectId); + } + } + } + + return projectId; + } + /// /// Attempts to retrieve or create the specified dataset. /// @@ -581,9 +598,11 @@ public bool DatasetExists(string datasetId, out BigQueryDataset? dataset) { try { + EnsureProjectIdIsConfigured(activity); activity?.AddBigQueryTag("large_results.dataset.try_find", datasetId); tempDataset = this.Client.GetDataset(datasetId); activity?.AddBigQueryTag("large_results.dataset.found", datasetId); + activity?.AddBigQueryTag("large_results.dataset.found_region", tempDataset.Resource.Location); return true; } catch (GoogleApiException gaEx) @@ -653,44 +672,48 @@ private QueryResult ExecuteMetadataCommandQuery() private QueryResult ExecuteDatasetExistsCommand() { - KeyValuePair? option = Options != null ? Options.Where(x => x.Key == BigQueryParameters.LargeResultsDataset).FirstOrDefault() : null; - - if (option == null || string.IsNullOrEmpty(option?.Value)) + return this.TraceActivity(activity => { - throw new ArgumentNullException(nameof(Options), $"The option '{BigQueryParameters.LargeResultsDataset}' must be set to the dataset name."); - } + activity?.AddBigQueryTag("metadata_command.execute", GetDatasetCommandName); + KeyValuePair? option = Options != null ? Options.Where(x => x.Key == BigQueryParameters.LargeResultsDataset).FirstOrDefault() : null; - bool exists = DatasetExists(option?.Value!, out BigQueryDataset? dataset); + if (option == null || string.IsNullOrEmpty(option?.Value)) + { + throw new ArgumentNullException(nameof(Options), $"The option '{BigQueryParameters.LargeResultsDataset}' must be set to the dataset name."); + } - Schema schema = new Schema(new List + bool exists = DatasetExists(option?.Value!, out BigQueryDataset? dataset); + + Schema schema = new Schema(new List { new Field("name", StringType.Default, false), new Field("exists", BooleanType.Default, false), new Field("location", StringType.Default, false) }, null); - StringArray datasetNameArray = new StringArray.Builder() - .Append(option?.Value!) - .Build(); + StringArray datasetNameArray = new StringArray.Builder() + .Append(option?.Value!) + .Build(); - BooleanArray datasetExistsArray = new BooleanArray.Builder() - .Append(exists) - .Build(); + BooleanArray datasetExistsArray = new BooleanArray.Builder() + .Append(exists) + .Build(); - StringArray locationArray = (exists && dataset != null) ? - new StringArray.Builder().Append(dataset.Resource.Location).Build() : - new StringArray.Builder().AppendNull().Build(); + StringArray locationArray = (exists && dataset != null) ? + new StringArray.Builder().Append(dataset.Resource.Location).Build() : + new StringArray.Builder().AppendNull().Build(); - RecordBatch recordBatch = new RecordBatch( - schema, - new IArrowArray[] { datasetNameArray, datasetExistsArray, locationArray }, - 1 - ); + RecordBatch recordBatch = new RecordBatch( + schema, + new IArrowArray[] { datasetNameArray, datasetExistsArray, locationArray }, + 1 + ); - // Create a simple array stream using the existing MultiArrowReader pattern - MultiArrowReader stream = new MultiArrowReader(this, schema, new[] { new SingleRecordBatchReader(recordBatch) }, new CancellationContext(cancellationRegistry)); + // Create a simple array stream using the existing MultiArrowReader pattern + MultiArrowReader stream = new MultiArrowReader(this, schema, new[] { new SingleRecordBatchReader(recordBatch) }, new CancellationContext(cancellationRegistry)); - return new QueryResult(1, stream); + return new QueryResult(1, stream); + }); } // Simple reader that yields a single record batch @@ -825,12 +848,13 @@ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable< { return await this.TraceActivityAsync(async activity => { - using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.cancellationContext.CancellationToken); if (this.readers == null) { return null; } + CancellationToken effectiveToken = GetSafeCancellationToken(cancellationToken); + while (true) { if (this.reader == null) @@ -843,7 +867,7 @@ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable< this.reader = this.readers.Current; } - RecordBatch result = await this.reader.ReadNextRecordBatchAsync(linkedCts.Token).ConfigureAwait(false); + RecordBatch result = await this.reader.ReadNextRecordBatchAsync(effectiveToken).ConfigureAwait(false); if (result != null) { @@ -855,6 +879,22 @@ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable< }); } + private CancellationToken GetSafeCancellationToken(CancellationToken userToken) + { + try + { + CancellationToken contextToken = this.cancellationContext.CancellationToken; + + using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(userToken, contextToken); + return linkedCts.Token; + } + catch (ObjectDisposedException) + { + // Fall back to user token if context is disposed + return userToken; + } + } + protected override void Dispose(bool disposing) { if (disposing) From 8cdab58fbbf0bfc822e5d5c120204c6f5cbda29e Mon Sep 17 00:00:00 2001 From: David Coe <> Date: Sun, 19 Oct 2025 18:36:01 -0400 Subject: [PATCH 4/5] add detect location --- .../Drivers/BigQuery/BigQueryConnection.cs | 127 ++++++++++++- .../Drivers/BigQuery/BigQueryParameters.cs | 4 + .../src/Drivers/BigQuery/BigQueryStatement.cs | 36 +--- .../BigQuery/BigQueryStatementTests.cs | 173 ------------------ 4 files changed, 132 insertions(+), 208 deletions(-) delete mode 100644 csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs index 1a3c70cc27..7a4b1eb730 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs @@ -93,7 +93,8 @@ public BigQueryConnection(IReadOnlyDictionary properties) : base if (this.properties.TryGetValue(BigQueryParameters.DefaultClientLocation, out string? location) && !string.IsNullOrEmpty(location) && - BigQueryConstants.ValidLocations.Any(l => l.Equals(location, StringComparison.OrdinalIgnoreCase))) + (location.Equals(BigQueryConstants.AutoDetectLocation, StringComparison.OrdinalIgnoreCase) || + BigQueryConstants.ValidLocations.Any(l => l.Equals(location, StringComparison.OrdinalIgnoreCase)))) { DefaultClientLocation = location; } @@ -232,7 +233,30 @@ internal BigQueryClient Open(string? projectId = null) // User does not have permission to query table bigquery-public-data:blockchain_analytics_ethereum_mainnet_us.accounts, // or perhaps it does not exist.' - bigQueryClientBuilder.DefaultLocation = DefaultClientLocation; + if (DefaultClientLocation!.Equals(BigQueryConstants.AutoDetectLocation, StringComparison.OrdinalIgnoreCase) == true) + { + // get a temporary client to detect the location + Client = bigQueryClientBuilder.Build(); + + string ensuredProjectId = EnsureProjectIdIsConfigured(activity); + string? detectedLocation = DetectLocation(ensuredProjectId); + + if (!string.IsNullOrEmpty(detectedLocation)) + { + DefaultClientLocation = detectedLocation; + bigQueryClientBuilder.DefaultLocation = DefaultClientLocation; + activity?.AddBigQueryTag("client.detected_location", detectedLocation); + } + else + { + activity?.AddBigQueryTag("client.detected_location", null); + } + } + else + { + bigQueryClientBuilder.DefaultLocation = DefaultClientLocation; + } + activity?.AddBigQueryParameterTag(BigQueryParameters.DefaultClientLocation, DefaultClientLocation); } else @@ -252,6 +276,105 @@ internal BigQueryClient Open(string? projectId = null) }); } + private string? DetectLocation(string projectId) + { + return this.TraceActivity(activity => + { + ListDatasetsOptions options = new ListDatasetsOptions() + { + IncludeHidden = true + }; + + Func?>> func = () => Task.Run(() => + { + return Client?.ListDatasets(options); + }); + + PagedEnumerable? datasets = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult(); + + if (datasets == null) return null; + + // the ListDatasets call doesn't include the Location value so we have to + // grab the details of N number to determine the most common location + Dictionary locationCounts = new Dictionary(); + + foreach (BigQueryDataset ds in datasets.Take(20)) + { + try + { + Func> getDatasetFunc = () => Task.Run(() => + { + return Client?.GetDataset(ds.Reference.DatasetId); + }); + + BigQueryDataset? fullDataset = ExecuteWithRetriesAsync(getDatasetFunc, activity).GetAwaiter().GetResult();// Client?.GetDataset(ds.Reference.DatasetId); + string? location = fullDataset?.Resource.Location; + if (!string.IsNullOrEmpty(location)) + { + if (locationCounts.TryGetValue(location!, out int currentCount)) + { + locationCounts[location!] = currentCount + 1; + } + else + { + locationCounts[location!] = 1; + } + } + } + catch (Exception ex) + { + activity?.AddException(ex); + } + } + + // Return the most common location + return locationCounts + .OrderByDescending(kvp => kvp.Value) + .Select(kvp => kvp.Key) + .FirstOrDefault(); + }); + } + + internal string EnsureProjectIdIsConfigured(Activity? activity) + { + string? projectId = Client?.ProjectId; + + if (Client?.ProjectId == BigQueryConstants.DetectProjectId) + { + activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId); + + // An error occurs when calling CreateQueryJob without the ID set, + // so use the first one that is found. This does not prevent from calling + // to other 'project IDs' (catalogs) with a query. + Func?>> func = () => Task.Run(() => + { + return Client?.ListProjects(); + }); + + PagedEnumerable? projects = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult(); + + if (projects != null) + { + string? firstProjectId = projects.Select(x => x.ProjectId).FirstOrDefault(); + + if (firstProjectId != null) + { + projectId = firstProjectId; + activity?.AddBigQueryTag("detected_client_project_id", firstProjectId); + // need to reopen the Client with the projectId specified + this.Open(firstProjectId); + } + } + } + + if (string.IsNullOrEmpty(projectId)) + { + throw new ArgumentException("A valid project ID could not be determined. Please specify a project ID using the adbc.bigquery.project_id parameter."); + } + + return projectId!; + } + internal void SetCredential() { this.TraceActivity(activity => diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs index 622ad9b39a..14378eff93 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs @@ -105,6 +105,10 @@ internal class BigQueryConstants // default value per https://pkg.go.dev/cloud.google.com/go/bigquery#section-readme public const string DetectProjectId = "*detect-project-id*"; + // if a location is not specified, use this value to auto-detect based on the most commonly + // found location for the datasets in the project + public const string AutoDetectLocation = "*detect-location*"; + // matches the pattern for odbc, but for adbc public const string DefaultLargeDatasetId = "_bqadbc_temp_tables"; diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs index 09b975efdd..c513fc8fb8 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs @@ -496,39 +496,9 @@ private QueryOptions ValidateOptions(Activity? activity) return options; } - private string EnsureProjectIdIsConfigured(Activity? activity) + internal string EnsureProjectIdIsConfigured(Activity? activity) { - string projectId = Client.ProjectId; - - if (Client.ProjectId == BigQueryConstants.DetectProjectId) - { - activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId); - - // An error occurs when calling CreateQueryJob without the ID set, - // so use the first one that is found. This does not prevent from calling - // to other 'project IDs' (catalogs) with a query. - Func?>> func = () => Task.Run(() => - { - return Client?.ListProjects(); - }); - - PagedEnumerable? projects = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult(); - - if (projects != null) - { - string? firstProjectId = projects.Select(x => x.ProjectId).FirstOrDefault(); - - if (firstProjectId != null) - { - projectId = firstProjectId; - activity?.AddBigQueryTag("detected_client_project_id", firstProjectId); - // need to reopen the Client with the projectId specified - this.bigQueryConnection.Open(firstProjectId); - } - } - } - - return projectId; + return this.bigQueryConnection.EnsureProjectIdIsConfigured(activity); } /// @@ -591,7 +561,7 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac private async Task ExecuteWithRetriesAsync(Func> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs); - public bool DatasetExists(string datasetId, out BigQueryDataset? dataset) + private bool DatasetExists(string datasetId, out BigQueryDataset? dataset) { BigQueryDataset? tempDataset = null; bool result = this.TraceActivity(activity => diff --git a/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs b/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs deleted file mode 100644 index 17fa72b5ae..0000000000 --- a/csharp/test/Drivers/BigQuery/BigQueryStatementTests.cs +++ /dev/null @@ -1,173 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Reflection; -using System.Threading; -using System.Threading.Tasks; -using Apache.Arrow.Adbc.Drivers.BigQuery; -using Apache.Arrow.Ipc; -using Google.Api.Gax.Grpc; -using Google.Apis.Auth.OAuth2; -using Google.Cloud.BigQuery.Storage.V1; -using Grpc.Core; -using Moq; -using Xunit; - -namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery -{ - public class BigQueryStatementTests - { - [Fact] - public void ReadChunkWithRetries_CalledMoreThanOnce() - { - TokenProtectedReadClientManger clientMgr = GetMockTokenProtectedReadClientManger(); - var mockReadRowsStream = GetMockReadRowsStream(clientMgr); - mockReadRowsStream - .Setup(s => s.GetResponseStream()) - .Throws(new InvalidOperationException("GetAsyncEnumerator can only be called once for a gRPC response stream wrapper.")); - - var statement = CreateBigQueryStatementForTest(); - SetupRetryValues(statement); - - // this should remain an issue because it indicates we aren't doing something correctly - // due to the setup, it looks like: - //----System.Reflection.TargetInvocationException : Exception has been thrown by the target of an invocation. - //--------Apache.Arrow.Adbc.AdbcException : Cannot executeb__0 after 5 tries.Last exception: InvalidOperationException: GetAsyncEnumerator can only be called once for a gRPC response stream wrapper. - //------------ System.InvalidOperationException : GetAsyncEnumerator can only be called once for a gRPC response stream wrapper. - - Assert.Throws(() => { statement.ReadChunkWithRetriesForTest(clientMgr, "test-stream", null); }); - } - - [Theory] - [InlineData(true)] //.MoveNextAsync throws the error - [InlineData(false)] //.Current throws the error - public void ReadChunkWithRetries_ThrowsInvalidOperationExceptionOnReadRowsResponse(bool moveNextThrowsError) - { - var clientMgr = GetMockTokenProtectedReadClientManger(); - var mockReadRowsStream = GetMockReadRowsStream(clientMgr); - - var mockAsyncResponseStream = new Mock>(); - - if (moveNextThrowsError) - { - mockAsyncResponseStream - .Setup(s => s.MoveNext(CancellationToken.None)) - .Throws(new InvalidOperationException("No current element is available.")); - } - else - { - mockAsyncResponseStream - .Setup(s => s.MoveNext(CancellationToken.None)) - .Returns(Task.FromResult(true)); - - mockAsyncResponseStream - .SetupGet(s => s.Current) - .Throws(new InvalidOperationException("No current element is available.")); - } - - AsyncResponseStream? mockedResponseStream = typeof(AsyncResponseStream) - .GetConstructor( - BindingFlags.Instance | BindingFlags.NonPublic, - null, - new Type[] { typeof(IAsyncStreamReader) }, - null)? - .Invoke(new object[] { mockAsyncResponseStream.Object }) as AsyncResponseStream; - - Assert.True(mockedResponseStream != null); - - mockReadRowsStream - .Setup(c => c.GetResponseStream()) - .Returns(mockedResponseStream); - - var statement = CreateBigQueryStatementForTest(); - SetupRetryValues(statement); - - var result = statement.ReadChunkWithRetriesForTest(clientMgr, "test-stream", null); - Assert.Null(result); - } - - private Mock GetMockReadRowsStream(TokenProtectedReadClientManger clientMgr) - { - var mockReadClient = new Mock(MockBehavior.Strict); - typeof(TokenProtectedReadClientManger) - .GetField("bigQueryReadClient", BindingFlags.NonPublic | BindingFlags.Instance)? - .SetValue(clientMgr, mockReadClient.Object); - - var mockReadRowsStream = new Mock(); - mockReadClient - .Setup(c => c.ReadRows(It.IsAny(), null)) - .Returns(mockReadRowsStream.Object); - - return mockReadRowsStream; - } - - private TokenProtectedReadClientManger GetMockTokenProtectedReadClientManger() - { - var credential = GoogleCredential.FromAccessToken("dummy-token"); - var clientMgr = new TokenProtectedReadClientManger(credential); - return clientMgr; - } - - private void SetupRetryValues(BigQueryStatement statement) - { - var connection = typeof(BigQueryStatement) - .GetField("bigQueryConnection", BindingFlags.NonPublic | BindingFlags.Instance)? - .GetValue(statement) as BigQueryConnection; - - if (connection != null) - { - typeof(BigQueryConnection) - .GetField("maxRetryAttempts", BindingFlags.NonPublic | BindingFlags.Instance)? - .SetValue(connection, 2); - - typeof(BigQueryConnection) - .GetField("retryDelayMs", BindingFlags.NonPublic | BindingFlags.Instance)? - .SetValue(connection, 50); - } - } - - private BigQueryStatement CreateBigQueryStatementForTest() - { - var properties = new Dictionary - { - ["projectid"] = "test-project" - }; - - var connection = new BigQueryConnection(properties); - return new BigQueryStatement(connection); - } - } - - public static class BigQueryStatementExtensions - { - internal static IArrowReader? ReadChunkWithRetriesForTest( - this BigQueryStatement statement, - TokenProtectedReadClientManger clientMgr, - string streamName, - Activity? activity) - { - var method = typeof(BigQueryStatement).GetMethod( - "ReadChunkWithRetries", - BindingFlags.NonPublic | BindingFlags.Instance); - - return (IArrowReader?)method?.Invoke(statement, new object[] { clientMgr, streamName, activity! }); - } - } -} From cc0214c64e124a12e560cca18407097122e42a12 Mon Sep 17 00:00:00 2001 From: David Coe <> Date: Sun, 19 Oct 2025 21:17:02 -0400 Subject: [PATCH 5/5] update comment --- csharp/src/Drivers/BigQuery/BigQueryConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs index 7a4b1eb730..3c4f276fd8 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs @@ -295,7 +295,7 @@ internal BigQueryClient Open(string? projectId = null) if (datasets == null) return null; // the ListDatasets call doesn't include the Location value so we have to - // grab the details of N number to determine the most common location + // grab the details of N number of datasets to determine the most common location Dictionary locationCounts = new Dictionary(); foreach (BigQueryDataset ds in datasets.Take(20))