From 8f79da02f1c4303c6704b6947135c1a9566aae6e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 16 Sep 2025 13:31:54 -0400 Subject: [PATCH 1/7] feat: query client and basic query execution flow --- src/index.ts | 1 + src/query/client.ts | 132 +++++++++++++++++++++++++++++ src/query/index.ts | 18 ++++ src/query/iterator.ts | 34 ++++++++ src/query/options.ts | 19 +++++ src/query/query.ts | 136 ++++++++++++++++++++++++++++++ src/query/row.ts | 19 +++++ system-test/fixtures/transport.ts | 36 ++++++++ system-test/query/query.ts | 126 +++++++++++++++++++++++++++ 9 files changed, 521 insertions(+) create mode 100644 src/query/client.ts create mode 100644 src/query/index.ts create mode 100644 src/query/iterator.ts create mode 100644 src/query/options.ts create mode 100644 src/query/query.ts create mode 100644 src/query/row.ts create mode 100644 system-test/fixtures/transport.ts create mode 100644 system-test/query/query.ts diff --git a/src/index.ts b/src/index.ts index f86616dca..f92513e49 100644 --- a/src/index.ts +++ b/src/index.ts @@ -46,3 +46,4 @@ export { }; import * as protos from '../protos/protos'; export {protos}; +export * as query from './query'; diff --git a/src/query/client.ts b/src/query/client.ts new file mode 100644 index 000000000..e29cffcd9 --- /dev/null +++ b/src/query/client.ts @@ -0,0 +1,132 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + BigQueryClient, + BigQueryClientOptions, +} from '../bigquery'; +import {Query} from './query'; +import {CallOptions} from './options'; +import {protos} from '../'; + +/** + * QueryClient is a client for running queries in BigQuery. + */ +export class QueryClient { + private client: BigQueryClient; + private projectId: string; + + /** + * @param {BigQueryClientOptions} options - The configuration object. + */ + constructor( + options?: BigQueryClientOptions, + ) { + this.client = new BigQueryClient(options); + this.projectId = ''; + void this.initialize(); + } + + async getProjectId(): Promise { + if (this.projectId) { + return this.projectId; + } + const {jobClient} = this.getBigQueryClient(); + const projectId = await jobClient.getProjectId(); + this.projectId = projectId; + return projectId; + } + /** + * Initialize the client. + * Performs asynchronous operations (such as authentication) and prepares the client. + * This function will be called automatically when any class method is called for the + * first time, but if you need to initialize it before calling an actual method, + * feel free to call initialize() directly. + * + * You can await on this method if you want to make sure the client is initialized. + * + * @returns {Promise} A promise that resolves when auth is complete. + */ + initialize = async (): Promise => { + if (this.projectId) { + return; + } + const {jobClient} = this.getBigQueryClient(); + await jobClient.initialize(); + const projectId = await this.getProjectId(); + this.projectId = projectId; + }; + + /** + * Runs a query and returns a QueryJob handle. + * + * @param {protos.google.cloud.bigquery.v2.IPostQueryRequest} request + * The request object that will be sent. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async startQuery( + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, + options?: CallOptions, + ): Promise { + const [response] = await this.client.jobClient.query(request, options); + return new Query(this, response); + } + + /** + * Starts a new asynchronous job. + * + * @param {protos.google.cloud.bigquery.v2.IJob} job + * A job resource to insert + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async startQueryJob( + job: protos.google.cloud.bigquery.v2.IJob, + options?: CallOptions, + ): Promise { + const [response] = await this.client.jobClient.insertJob( + { + projectId: this.projectId, + job, + }, + options, + ); + const { jobReference } = response; + return new Query(this, {jobReference}); + } + + /** + * Create a managed Query from a job reference. + * + * @param {protos.google.cloud.bigquery.v2.IJob} job + * A job resource to insert + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async attachJob( + jobRef: protos.google.cloud.bigquery.v2.IJobReference, + ): Promise { + return new Query(this, { + jobReference: jobRef, + }); + } + + getBigQueryClient(): BigQueryClient { + return this.client; + } +} diff --git a/src/query/index.ts b/src/query/index.ts new file mode 100644 index 000000000..855be3f5a --- /dev/null +++ b/src/query/index.ts @@ -0,0 +1,18 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +export {QueryClient} from './client'; +export {Query} from './query'; +export {Row} from './row'; +export {RowIterator} from './iterator'; diff --git a/src/query/iterator.ts b/src/query/iterator.ts new file mode 100644 index 000000000..6b559cb0a --- /dev/null +++ b/src/query/iterator.ts @@ -0,0 +1,34 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Query} from './query'; +import {Row} from './row'; + +/** + * RowIterator iterates over the results of a query. + */ +export class RowIterator { + private query: Query; + + constructor(query: Query) { + this.query = query; + } + + /** + * Asynchronously iterates over the rows in the query result. + */ + async *[Symbol.asyncIterator](): AsyncGenerator { + // TODO: implement iterator + } +} diff --git a/src/query/options.ts b/src/query/options.ts new file mode 100644 index 000000000..7faee828b --- /dev/null +++ b/src/query/options.ts @@ -0,0 +1,19 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {CallOptions as GaxCallOptions} from 'google-gax'; + +export interface CallOptions extends GaxCallOptions { + signal?: AbortSignal; +} \ No newline at end of file diff --git a/src/query/query.ts b/src/query/query.ts new file mode 100644 index 000000000..6c9d3b3f0 --- /dev/null +++ b/src/query/query.ts @@ -0,0 +1,136 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {QueryClient} from './client'; +import {protos} from '../'; +import {RowIterator} from './iterator'; +import { CallOptions } from './options'; + +/** + * Query represents a query job. + */ +export class Query { + private client: QueryClient; + private jobComplete: boolean; + private projectId: string; + private jobId: string; + private location: string; + + constructor( + client: QueryClient, + response: protos.google.cloud.bigquery.v2.IQueryResponse, + ) { + this.client = client; + this.jobComplete = false; + + this.consumeQueryResponse({ + jobComplete: response.jobComplete, + schema: response.schema, + pageToken: response.pageToken, + totalRows: response.totalRows, + rows: response.rows, + }); + + this.jobId = ''; + this.location = response.location ?? ''; + this.projectId = ''; + if (response.jobReference) { + this.projectId = response.jobReference.projectId!; + this.jobId = response.jobReference.jobId!; + this.location = response.jobReference.location?.value || ''; + } + if (response.queryId) { + this.jobId = response.queryId; + } + } + + get jobReference(): protos.google.cloud.bigquery.v2.IJobReference { + return { + jobId: this.jobId, + projectId: this.projectId, + location: {value: this.location}, + }; + } + + get schema(): protos.google.cloud.bigquery.v2.ITableSchema | null { + return null; + } + + get complete(): boolean { + return this.jobComplete + } + + /** + * Waits for the query to complete. + * + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + */ + async wait(options?: CallOptions): Promise { + const signal = options?.signal; + while (!this.complete) { + if (signal?.aborted) { + throw new Error('The operation was aborted.'); + } + await this.checkStatus(options); + if (!this.complete) { + await this.waitFor(signal); + } + } + } + + private async waitFor(signal?: AbortSignal): Promise { + const delay = 1000; // TODO: backoff settings + return new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, delay); + signal?.addEventListener('abort', () => { + clearTimeout(timeout); + reject(new Error('The operation was aborted.')); + }); + }); + } + + /** + * Returns a RowIterator for the query results. + * + * @returns {RowIterator} + */ + async read(): Promise { + const it = new RowIterator(this); + return it; + } + + private consumeQueryResponse( + response: protos.google.cloud.bigquery.v2.IGetQueryResultsResponse, + ) { + this.jobComplete = response.jobComplete?.value ?? false; + } + + private async checkStatus(options?: CallOptions): Promise { + const {jobClient} = this.client.getBigQueryClient(); + const [response] = await jobClient.getQueryResults( + { + projectId: this.projectId, + jobId: this.jobId, + location: this.location, + maxResults: {value: 0}, + formatOptions: { + useInt64Timestamp: true, + }, + }, + options, + ); + this.consumeQueryResponse(response); + } +} diff --git a/src/query/row.ts b/src/query/row.ts new file mode 100644 index 000000000..3d8e75ca0 --- /dev/null +++ b/src/query/row.ts @@ -0,0 +1,19 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Represents a row in a query result. + */ +export class Row { +} diff --git a/system-test/fixtures/transport.ts b/system-test/fixtures/transport.ts new file mode 100644 index 000000000..bc3f84257 --- /dev/null +++ b/system-test/fixtures/transport.ts @@ -0,0 +1,36 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe} from 'mocha'; +import {query} from '../../src'; +import { QueryClient } from '../../src/query'; + +export const describeWithBothTransports = (title: string, fn: (client: QueryClient) => void) => { + describe(title, () => { + describe("REST", () => { + const client = new query.QueryClient({ fallback: true }); + before(async () => { + await client.initialize(); + }); + fn(client); + }); + describe("GRPC", () => { + const client = new query.QueryClient({ fallback: false }); + before(async () => { + await client.initialize(); + }); + fn(client); + }); + }); +} \ No newline at end of file diff --git a/system-test/query/query.ts b/system-test/query/query.ts new file mode 100644 index 000000000..097cd295a --- /dev/null +++ b/system-test/query/query.ts @@ -0,0 +1,126 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import {it} from 'mocha'; + +import {protos} from '../../src'; +import {describeWithBothTransports} from '../fixtures/transport'; + +const sleep = (ms: number) => + new Promise(resolve => { + setTimeout(resolve, ms); + }); + +describeWithBothTransports('Run Query', client => { + let getQueryResultsSpy: sinon.SinonSpy; + let projectId; + + beforeEach(async () => { + await client.initialize(); + const {jobClient} = client.getBigQueryClient(); + projectId = client.getProjectId(); + + getQueryResultsSpy = sinon.spy(jobClient, 'getQueryResults'); + }); + + afterEach(() => { + getQueryResultsSpy.restore(); + }); + + it('should run a stateless query', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + jobCreationMode: protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_OPTIONAL, + }, + projectId + }; + + const q = await client.startQuery(req); + await q.wait(); + + // TODO: read rows and assert row count + }); + + it('should stop waiting for query to complete', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + useQueryCache: { value: false }, + jobCreationMode: protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED, + timeoutMs: { value: 500 }, + }, + projectId + }; + + const q = await client.startQuery(req); + const abortCtrl = new AbortController(); + q.wait({ + signal: abortCtrl.signal, + }).catch(err => { + assert(err, 'aborted'); + }); + await sleep(1000); + abortCtrl.abort(); + + assert(getQueryResultsSpy.callCount >= 1); + assert(getQueryResultsSpy.callCount <= 2); + }).timeout(5000); + + it('should attach a query job without cache', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + jobCreationMode: protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED, + }, + projectId + }; + + let q = await client.startQuery(req); + await q.wait(); + + const jobRef = q.jobReference; + q = await client.attachJob(jobRef); + + // TODO: read rows and assert row count + }); + + it('should insert a query job', async () => { + const q = await client.startQueryJob({ + configuration: { + query: { + query: 'SELECT CURRENT_DATETIME() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + }, + }, + }); + await q.wait(); + + // TODO: read rows and assert row count + }); +}); From bda3123e4dcbf5dadfdab7c821690f1f1f304cf6 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 16 Sep 2025 15:37:50 -0400 Subject: [PATCH 2/7] fix: assert job is complete --- src/query/client.ts | 11 +- src/query/options.ts | 2 +- src/query/query.ts | 8 +- src/query/row.ts | 3 +- system-test/fixtures/transport.ts | 36 ----- system-test/query/query.ts | 225 +++++++++++++++++------------- tsconfig.json | 5 +- 7 files changed, 139 insertions(+), 151 deletions(-) delete mode 100644 system-test/fixtures/transport.ts diff --git a/src/query/client.ts b/src/query/client.ts index e29cffcd9..8e633f9e2 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -12,10 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { - BigQueryClient, - BigQueryClientOptions, -} from '../bigquery'; +import {BigQueryClient, BigQueryClientOptions} from '../bigquery'; import {Query} from './query'; import {CallOptions} from './options'; import {protos} from '../'; @@ -30,9 +27,7 @@ export class QueryClient { /** * @param {BigQueryClientOptions} options - The configuration object. */ - constructor( - options?: BigQueryClientOptions, - ) { + constructor(options?: BigQueryClientOptions) { this.client = new BigQueryClient(options); this.projectId = ''; void this.initialize(); @@ -105,7 +100,7 @@ export class QueryClient { }, options, ); - const { jobReference } = response; + const {jobReference} = response; return new Query(this, {jobReference}); } diff --git a/src/query/options.ts b/src/query/options.ts index 7faee828b..d58bbda80 100644 --- a/src/query/options.ts +++ b/src/query/options.ts @@ -16,4 +16,4 @@ import {CallOptions as GaxCallOptions} from 'google-gax'; export interface CallOptions extends GaxCallOptions { signal?: AbortSignal; -} \ No newline at end of file +} diff --git a/src/query/query.ts b/src/query/query.ts index 6c9d3b3f0..2048e4414 100644 --- a/src/query/query.ts +++ b/src/query/query.ts @@ -15,7 +15,7 @@ import {QueryClient} from './client'; import {protos} from '../'; import {RowIterator} from './iterator'; -import { CallOptions } from './options'; +import {CallOptions} from './options'; /** * Query represents a query job. @@ -68,7 +68,7 @@ export class Query { } get complete(): boolean { - return this.jobComplete + return this.jobComplete; } /** @@ -107,14 +107,14 @@ export class Query { * @returns {RowIterator} */ async read(): Promise { - const it = new RowIterator(this); + const it = new RowIterator(this); return it; } private consumeQueryResponse( response: protos.google.cloud.bigquery.v2.IGetQueryResultsResponse, ) { - this.jobComplete = response.jobComplete?.value ?? false; + this.jobComplete = response.jobComplete?.value ?? false; } private async checkStatus(options?: CallOptions): Promise { diff --git a/src/query/row.ts b/src/query/row.ts index 3d8e75ca0..a45b8cf5f 100644 --- a/src/query/row.ts +++ b/src/query/row.ts @@ -15,5 +15,4 @@ /** * Represents a row in a query result. */ -export class Row { -} +export class Row {} diff --git a/system-test/fixtures/transport.ts b/system-test/fixtures/transport.ts deleted file mode 100644 index bc3f84257..000000000 --- a/system-test/fixtures/transport.ts +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2025 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import {describe} from 'mocha'; -import {query} from '../../src'; -import { QueryClient } from '../../src/query'; - -export const describeWithBothTransports = (title: string, fn: (client: QueryClient) => void) => { - describe(title, () => { - describe("REST", () => { - const client = new query.QueryClient({ fallback: true }); - before(async () => { - await client.initialize(); - }); - fn(client); - }); - describe("GRPC", () => { - const client = new query.QueryClient({ fallback: false }); - before(async () => { - await client.initialize(); - }); - fn(client); - }); - }); -} \ No newline at end of file diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 097cd295a..a1a1974dd 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -15,112 +15,139 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; import {it} from 'mocha'; - +import {QueryClient} from '../../src/query'; import {protos} from '../../src'; -import {describeWithBothTransports} from '../fixtures/transport'; const sleep = (ms: number) => new Promise(resolve => { setTimeout(resolve, ms); }); -describeWithBothTransports('Run Query', client => { - let getQueryResultsSpy: sinon.SinonSpy; - let projectId; - - beforeEach(async () => { - await client.initialize(); - const {jobClient} = client.getBigQueryClient(); - projectId = client.getProjectId(); - - getQueryResultsSpy = sinon.spy(jobClient, 'getQueryResults'); - }); - - afterEach(() => { - getQueryResultsSpy.restore(); - }); - - it('should run a stateless query', async () => { - const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { - queryRequest: { - query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', - useLegacySql: {value: false}, - formatOptions: { - useInt64Timestamp: true, - }, - jobCreationMode: protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_OPTIONAL, - }, - projectId - }; - - const q = await client.startQuery(req); - await q.wait(); - - // TODO: read rows and assert row count - }); - - it('should stop waiting for query to complete', async () => { - const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { - queryRequest: { - query: 'SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num', - useLegacySql: {value: false}, - formatOptions: { - useInt64Timestamp: true, - }, - useQueryCache: { value: false }, - jobCreationMode: protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED, - timeoutMs: { value: 500 }, - }, - projectId - }; - - const q = await client.startQuery(req); - const abortCtrl = new AbortController(); - q.wait({ - signal: abortCtrl.signal, - }).catch(err => { - assert(err, 'aborted'); - }); - await sleep(1000); - abortCtrl.abort(); - - assert(getQueryResultsSpy.callCount >= 1); - assert(getQueryResultsSpy.callCount <= 2); - }).timeout(5000); - - it('should attach a query job without cache', async () => { - const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { - queryRequest: { - query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', - useLegacySql: {value: false}, - formatOptions: { - useInt64Timestamp: true, - }, - jobCreationMode: protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode.JOB_CREATION_REQUIRED, - }, - projectId - }; - - let q = await client.startQuery(req); - await q.wait(); - - const jobRef = q.jobReference; - q = await client.attachJob(jobRef); - - // TODO: read rows and assert row count - }); - - it('should insert a query job', async () => { - const q = await client.startQueryJob({ - configuration: { - query: { - query: 'SELECT CURRENT_DATETIME() as foo, SESSION_USER() as bar', - useLegacySql: {value: false}, - }, - }, +// the GCLOUD_PROJECT environment variable is set as part of test harness setup +const projectId = process.env.GCLOUD_PROJECT; +//const transports = ['grpc', 'rest']; +const transports = ['grpc']; + +// run tests with the gRPC client and the REST fallback client +transports.forEach(transport => { + let client; + if (transport === 'grpc') { + client = new QueryClient({}); + } else { + client = new QueryClient({fallback: true}); + } + + describe('Run Query', () => { + describe(transport, () => { + let getQueryResultsSpy: sinon.SinonSpy; + + beforeEach(async () => { + await client.initialize(); + const {jobClient} = client.getBigQueryClient(); + + getQueryResultsSpy = sinon.spy(jobClient, 'getQueryResults'); + }); + + afterEach(() => { + getQueryResultsSpy.restore(); + }); + + it('should run a stateless query', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + jobCreationMode: + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode + .JOB_CREATION_OPTIONAL, + }, + projectId, + }; + + const q = await client.startQuery(req); + await q.wait(); + + assert(q.complete); + + // TODO: read rows and assert row count + }); + + it('should stop waiting for query to complete', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + useQueryCache: {value: false}, + jobCreationMode: + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode + .JOB_CREATION_REQUIRED, + timeoutMs: {value: 500}, + }, + projectId, + }; + + const q = await client.startQuery(req); + const abortCtrl = new AbortController(); + q.wait({ + signal: abortCtrl.signal, + }).catch(err => { + assert(err, 'aborted'); + }); + await sleep(1000); + abortCtrl.abort(); + + assert(getQueryResultsSpy.callCount >= 1); + assert(getQueryResultsSpy.callCount <= 2); + }).timeout(5000); + + it('should allow attach with job reference to a query handler', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + jobCreationMode: + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode + .JOB_CREATION_REQUIRED, + }, + projectId, + }; + + let q = await client.startQuery(req); + await q.wait(); + + const jobRef = q.jobReference; + q = await client.attachJob(jobRef); + await q.wait(); + + assert(q.complete); + + // TODO: read rows and assert row count + }); + + it('should insert a query job', async () => { + const q = await client.startQueryJob({ + configuration: { + query: { + query: 'SELECT CURRENT_DATETIME() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + }, + }, + }); + await q.wait(); + + assert(q.complete); + + // TODO: read rows and assert row count + }); }); - await q.wait(); - - // TODO: read rows and assert row count }); }); diff --git a/tsconfig.json b/tsconfig.json index 7ecdf9fc6..ba1dec360 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -16,11 +16,14 @@ "test/*.ts", "test/**/*.ts", "system-test/*.ts", + "system-test/**/*.ts", "src/**/*.json", "protos/protos.json", "benchmark/*.ts", "scripts/*.ts", "samples/**/*.json" - + ], + "exclude": [ + "system-test/fixtures/sample/**/*.ts" ] } \ No newline at end of file From 6836d679df5d5f2c93ae8f0e63f652fa577a8abb Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 17 Sep 2025 14:00:15 -0400 Subject: [PATCH 3/7] feat: make job pooling happen in the background --- src/query/client.ts | 16 +++--- src/query/query.ts | 108 +++++++++++++++++++++++++------------ system-test/query/query.ts | 3 +- 3 files changed, 84 insertions(+), 43 deletions(-) diff --git a/src/query/client.ts b/src/query/client.ts index 8e633f9e2..dac0fa605 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -77,7 +77,7 @@ export class QueryClient { options?: CallOptions, ): Promise { const [response] = await this.client.jobClient.query(request, options); - return new Query(this, response); + return Query.fromResponse_(this, response, options); } /** @@ -101,24 +101,26 @@ export class QueryClient { options, ); const {jobReference} = response; - return new Query(this, {jobReference}); + if (!jobReference) { + throw new Error('Failed to insert job. Missing job reference.'); + } + return Query.fromJobRef_(this, jobReference, options); } /** * Create a managed Query from a job reference. * - * @param {protos.google.cloud.bigquery.v2.IJob} job + * @param {protos.google.cloud.bigquery.v2.IJobReference} jobReference * A job resource to insert * @param {CallOptions} [options] * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. * @returns {Promise} */ async attachJob( - jobRef: protos.google.cloud.bigquery.v2.IJobReference, + jobReference: protos.google.cloud.bigquery.v2.IJobReference, + options?: CallOptions, ): Promise { - return new Query(this, { - jobReference: jobRef, - }); + return Query.fromJobRef_(this, jobReference, options); } getBigQueryClient(): BigQueryClient { diff --git a/src/query/query.ts b/src/query/query.ts index 2048e4414..e9b69e59d 100644 --- a/src/query/query.ts +++ b/src/query/query.ts @@ -16,6 +16,8 @@ import {QueryClient} from './client'; import {protos} from '../'; import {RowIterator} from './iterator'; import {CallOptions} from './options'; +import {setInterval} from 'timers/promises'; +import {EventEmitter} from 'stream'; /** * Query represents a query job. @@ -23,36 +25,55 @@ import {CallOptions} from './options'; export class Query { private client: QueryClient; private jobComplete: boolean; + + private _queryId: string; private projectId: string; private jobId: string; private location: string; - constructor( - client: QueryClient, - response: protos.google.cloud.bigquery.v2.IQueryResponse, - ) { + private emitter: EventEmitter; + + constructor(client: QueryClient) { this.client = client; this.jobComplete = false; - - this.consumeQueryResponse({ - jobComplete: response.jobComplete, - schema: response.schema, - pageToken: response.pageToken, - totalRows: response.totalRows, - rows: response.rows, - }); + this.emitter = new EventEmitter(); this.jobId = ''; - this.location = response.location ?? ''; + this.location = ''; this.projectId = ''; - if (response.jobReference) { - this.projectId = response.jobReference.projectId!; - this.jobId = response.jobReference.jobId!; - this.location = response.jobReference.location?.value || ''; - } + this._queryId = ''; + } + + /** + * Internal method to instantiate Query handler from jobs.query response + * @internal + */ + static fromResponse_( + client: QueryClient, + response: protos.google.cloud.bigquery.v2.IQueryResponse, + options?: CallOptions, + ): Query { + const q = new Query(client); + q.location = response.location ?? ''; if (response.queryId) { - this.jobId = response.queryId; + q._queryId = response.queryId; } + q.consumeQueryResponse({...response}); + void q.waitQueryBackground(options); + + return q; + } + + /** + * Internal method to instantiate Query handler from job reference + * @internal + */ + static fromJobRef_( + client: QueryClient, + jobReference: protos.google.cloud.bigquery.v2.IJobReference, + options?: CallOptions, + ): Query { + return this.fromResponse_(client, {jobReference}, options); } get jobReference(): protos.google.cloud.bigquery.v2.IJobReference { @@ -71,6 +92,26 @@ export class Query { return this.jobComplete; } + get queryId(): string { + return this._queryId; + } + + private async waitQueryBackground(options?: CallOptions) { + if (this.complete) { + return; + } + const signal = options?.signal; + let waitTime = 1; + for await (const _ of setInterval(waitTime, undefined, {signal})) { + await this.checkStatus(options); + if (this.complete) { + this.emitter.emit('done'); + break; + } + waitTime = 1000; + } + } + /** * Waits for the query to complete. * @@ -78,25 +119,19 @@ export class Query { * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. */ async wait(options?: CallOptions): Promise { - const signal = options?.signal; - while (!this.complete) { - if (signal?.aborted) { - throw new Error('The operation was aborted.'); - } - await this.checkStatus(options); - if (!this.complete) { - await this.waitFor(signal); - } + if (this.complete) { + return; } - } - - private async waitFor(signal?: AbortSignal): Promise { - const delay = 1000; // TODO: backoff settings + const signal = options?.signal; return new Promise((resolve, reject) => { - const timeout = setTimeout(resolve, delay); + const callback = () => { + resolve(); + this.emitter.removeListener('done', callback); + }; + this.emitter.addListener('done', callback); signal?.addEventListener('abort', () => { - clearTimeout(timeout); reject(new Error('The operation was aborted.')); + this.emitter.removeListener('done', callback); }); }); } @@ -114,6 +149,11 @@ export class Query { private consumeQueryResponse( response: protos.google.cloud.bigquery.v2.IGetQueryResultsResponse, ) { + if (response.jobReference) { + this.projectId = response.jobReference.projectId!; + this.jobId = response.jobReference.jobId!; + this.location = response.jobReference.location?.value || ''; + } this.jobComplete = response.jobComplete?.value ?? false; } diff --git a/system-test/query/query.ts b/system-test/query/query.ts index a1a1974dd..4d596abc1 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -25,8 +25,7 @@ const sleep = (ms: number) => // the GCLOUD_PROJECT environment variable is set as part of test harness setup const projectId = process.env.GCLOUD_PROJECT; -//const transports = ['grpc', 'rest']; -const transports = ['grpc']; +const transports = ['grpc', 'rest']; // run tests with the gRPC client and the REST fallback client transports.forEach(transport => { From 268d30183c85e4fe21b2c496fb6836a252b7ead1 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 24 Sep 2025 11:31:14 -0400 Subject: [PATCH 4/7] fix: point TODOs to the right issue --- src/query/client.ts | 2 +- src/query/iterator.ts | 2 +- system-test/query/query.ts | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/client.ts b/src/query/client.ts index dac0fa605..911b76918 100644 --- a/src/query/client.ts +++ b/src/query/client.ts @@ -37,7 +37,7 @@ export class QueryClient { if (this.projectId) { return this.projectId; } - const {jobClient} = this.getBigQueryClient(); + const {jobClient} = this.getBigQueryClient(); const projectId = await jobClient.getProjectId(); this.projectId = projectId; return projectId; diff --git a/src/query/iterator.ts b/src/query/iterator.ts index 6b559cb0a..052ad8689 100644 --- a/src/query/iterator.ts +++ b/src/query/iterator.ts @@ -29,6 +29,6 @@ export class RowIterator { * Asynchronously iterates over the rows in the query result. */ async *[Symbol.asyncIterator](): AsyncGenerator { - // TODO: implement iterator + // TODO(#1541): implement iterator } } diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 4d596abc1..3c4369ac4 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -71,7 +71,7 @@ transports.forEach(transport => { assert(q.complete); - // TODO: read rows and assert row count + // TODO(#1541): read rows and assert row count }); it('should stop waiting for query to complete', async () => { @@ -129,7 +129,7 @@ transports.forEach(transport => { assert(q.complete); - // TODO: read rows and assert row count + // TODO(#1541): read rows and assert row count }); it('should insert a query job', async () => { @@ -145,7 +145,7 @@ transports.forEach(transport => { assert(q.complete); - // TODO: read rows and assert row count + // TODO(#1541): read rows and assert row count }); }); }); From e7acd90cdc585d51e3254135b706de6db7ff01ff Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 24 Sep 2025 12:04:47 -0400 Subject: [PATCH 5/7] ci: rollback tsconfig --- tsconfig.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/tsconfig.json b/tsconfig.json index ba1dec360..748cce5ed 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -22,8 +22,5 @@ "benchmark/*.ts", "scripts/*.ts", "samples/**/*.json" - ], - "exclude": [ - "system-test/fixtures/sample/**/*.ts" ] } \ No newline at end of file From 8eec4df068308cef26ee8deca6011a2c4a13f7ef Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Sep 2025 15:21:34 -0400 Subject: [PATCH 6/7] feat: move query creation to the background and rename client to helper --- src/query/{client.ts => helper.ts} | 61 ++++++++---- src/query/index.ts | 2 +- src/query/query.ts | 151 ++++++++++++++++++++--------- system-test/query/query.ts | 34 ++++--- tsconfig.json | 3 + 5 files changed, 168 insertions(+), 83 deletions(-) rename src/query/{client.ts => helper.ts} (72%) diff --git a/src/query/client.ts b/src/query/helper.ts similarity index 72% rename from src/query/client.ts rename to src/query/helper.ts index 911b76918..c0b35b071 100644 --- a/src/query/client.ts +++ b/src/query/helper.ts @@ -15,21 +15,22 @@ import {BigQueryClient, BigQueryClientOptions} from '../bigquery'; import {Query} from './query'; import {CallOptions} from './options'; -import {protos} from '../'; +import {protos} from '..'; +import {randomUUID} from 'crypto'; /** - * QueryClient is a client for running queries in BigQuery. + * QueryHelper is a helper for running queries in BigQuery. */ -export class QueryClient { +export class QueryHelper { private client: BigQueryClient; - private projectId: string; + private projectId?: string; /** * @param {BigQueryClientOptions} options - The configuration object. */ - constructor(options?: BigQueryClientOptions) { - this.client = new BigQueryClient(options); - this.projectId = ''; + constructor(opts: {client: BigQueryClient; projectId?: string}) { + this.client = opts.client; + this.projectId = opts.projectId; void this.initialize(); } @@ -37,11 +38,12 @@ export class QueryClient { if (this.projectId) { return this.projectId; } - const {jobClient} = this.getBigQueryClient(); + const {jobClient} = this.getBigQueryClient(); const projectId = await jobClient.getProjectId(); this.projectId = projectId; return projectId; } + /** * Initialize the client. * Performs asynchronous operations (such as authentication) and prepares the client. @@ -76,8 +78,16 @@ export class QueryClient { request: protos.google.cloud.bigquery.v2.IPostQueryRequest, options?: CallOptions, ): Promise { - const [response] = await this.client.jobClient.query(request, options); - return Query.fromResponse_(this, response, options); + if (!request.projectId) { + request.projectId = this.projectId; + } + if (!request.queryRequest) { + throw new Error('queryRequest is required'); + } + if (!request.queryRequest.requestId) { + request.queryRequest.requestId = randomUUID(); + } + return Query.fromQueryRequest_(this, request, options); } /** @@ -93,18 +103,20 @@ export class QueryClient { job: protos.google.cloud.bigquery.v2.IJob, options?: CallOptions, ): Promise { - const [response] = await this.client.jobClient.insertJob( - { - projectId: this.projectId, - job, - }, - options, - ); - const {jobReference} = response; - if (!jobReference) { - throw new Error('Failed to insert job. Missing job reference.'); + const config = job.configuration; + if (!config) { + throw new Error('job is missing configuration'); } - return Query.fromJobRef_(this, jobReference, options); + const queryConfig = config.query; + if (!queryConfig) { + throw new Error('job is not a query'); + } + job.jobReference ||= {}; + if (!job.jobReference.jobId) { + job.jobReference.jobId = randomUUID(); + } + + return Query.fromJobRequest_(this, job, this.projectId, options); } /** @@ -120,6 +132,13 @@ export class QueryClient { jobReference: protos.google.cloud.bigquery.v2.IJobReference, options?: CallOptions, ): Promise { + if (!jobReference.jobId) { + throw new Error('attachJob requires a non-empty JobReference.JobId'); + } + if (!jobReference.projectId) { + jobReference.projectId = this.projectId; + } + return Query.fromJobRef_(this, jobReference, options); } diff --git a/src/query/index.ts b/src/query/index.ts index 855be3f5a..2c644a2a8 100644 --- a/src/query/index.ts +++ b/src/query/index.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -export {QueryClient} from './client'; +export {QueryHelper} from './helper'; export {Query} from './query'; export {Row} from './row'; export {RowIterator} from './iterator'; diff --git a/src/query/query.ts b/src/query/query.ts index e9b69e59d..1402c3a3e 100644 --- a/src/query/query.ts +++ b/src/query/query.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {QueryClient} from './client'; +import {QueryHelper} from './helper'; import {protos} from '../'; import {RowIterator} from './iterator'; import {CallOptions} from './options'; @@ -23,45 +23,47 @@ import {EventEmitter} from 'stream'; * Query represents a query job. */ export class Query { - private client: QueryClient; + private helper: QueryHelper; private jobComplete: boolean; - private _queryId: string; - private projectId: string; - private jobId: string; - private location: string; + private _queryId: string | null; + private projectId?: string; + private jobId: string | null; + private location?: string | null; private emitter: EventEmitter; - constructor(client: QueryClient) { - this.client = client; + private constructor(helper: QueryHelper, projectId?: string) { + this.helper = helper; this.jobComplete = false; this.emitter = new EventEmitter(); - this.jobId = ''; - this.location = ''; - this.projectId = ''; - this._queryId = ''; + this.projectId = projectId; + this._queryId = null; + this.jobId = null; } - /** - * Internal method to instantiate Query handler from jobs.query response - * @internal - */ - static fromResponse_( - client: QueryClient, - response: protos.google.cloud.bigquery.v2.IQueryResponse, + static fromQueryRequest_( + helper: QueryHelper, + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, options?: CallOptions, - ): Query { - const q = new Query(client); - q.location = response.location ?? ''; - if (response.queryId) { - q._queryId = response.queryId; - } - q.consumeQueryResponse({...response}); - void q.waitQueryBackground(options); + ): Promise { + const q = new Query(helper, request.projectId ?? undefined); + void q.runQuery(request, options); + + return Promise.resolve(q); + } + + static fromJobRequest_( + helper: QueryHelper, + job: protos.google.cloud.bigquery.v2.IJob, + projectId?: string, + options?: CallOptions, + ): Promise { + const q = new Query(helper, projectId); + void q.insertQuery(job, options); - return q; + return Promise.resolve(q); } /** @@ -69,14 +71,21 @@ export class Query { * @internal */ static fromJobRef_( - client: QueryClient, + helper: QueryHelper, jobReference: protos.google.cloud.bigquery.v2.IJobReference, options?: CallOptions, - ): Query { - return this.fromResponse_(client, {jobReference}, options); + ): Promise { + const q = new Query(helper, jobReference.projectId || undefined); + + q.consumeQueryResponse({jobReference}); + void q.waitQueryBackground(options); + return Promise.resolve(q); } - get jobReference(): protos.google.cloud.bigquery.v2.IJobReference { + get jobReference(): protos.google.cloud.bigquery.v2.IJobReference | null { + if (!this.jobId) { + return null; + } return { jobId: this.jobId, projectId: this.projectId, @@ -92,12 +101,52 @@ export class Query { return this.jobComplete; } - get queryId(): string { + get queryId(): string | null { return this._queryId; } + private async runQuery( + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, + options?: CallOptions, + ) { + const {jobClient} = this.helper.getBigQueryClient(); + try { + const [response] = await jobClient.query(request, options); + this.location = response.location; + if (response.queryId) { + this._queryId = response.queryId; + } + this.consumeQueryResponse(response); + void this.waitQueryBackground(options); + } catch (err) { + this.markDone(err as Error); + } + } + + private async insertQuery( + job: protos.google.cloud.bigquery.v2.IJob, + options?: CallOptions, + ) { + const {jobClient} = this.helper.getBigQueryClient(); + try { + const [response] = await jobClient.insertJob( + { + job: job, + projectId: this.projectId, + }, + options, + ); + this.emitter.emit('query:created', response); + this.consumeQueryResponse(response); + void this.waitQueryBackground(options); + } catch (err) { + this.markDone(err as Error); + } + } + private async waitQueryBackground(options?: CallOptions) { if (this.complete) { + this.markDone(); return; } const signal = options?.signal; @@ -105,13 +154,17 @@ export class Query { for await (const _ of setInterval(waitTime, undefined, {signal})) { await this.checkStatus(options); if (this.complete) { - this.emitter.emit('done'); + this.markDone(); break; } waitTime = 1000; } } + private markDone(err?: Error) { + this.emitter.emit('done', err); + } + /** * Waits for the query to complete. * @@ -124,9 +177,13 @@ export class Query { } const signal = options?.signal; return new Promise((resolve, reject) => { - const callback = () => { - resolve(); + const callback = (err: Error) => { this.emitter.removeListener('done', callback); + if (err) { + reject(err); + return; + } + resolve(); }; this.emitter.addListener('done', callback); signal?.addEventListener('abort', () => { @@ -158,19 +215,17 @@ export class Query { } private async checkStatus(options?: CallOptions): Promise { - const {jobClient} = this.client.getBigQueryClient(); - const [response] = await jobClient.getQueryResults( - { - projectId: this.projectId, - jobId: this.jobId, - location: this.location, - maxResults: {value: 0}, - formatOptions: { - useInt64Timestamp: true, - }, + const {jobClient} = this.helper.getBigQueryClient(); + const req: protos.google.cloud.bigquery.v2.IGetQueryResultsRequest = { + projectId: this.projectId, + jobId: this.jobId, + location: this.location, + maxResults: {value: 0}, + formatOptions: { + useInt64Timestamp: true, }, - options, - ); + }; + const [response] = await jobClient.getQueryResults(req, options); this.consumeQueryResponse(response); } } diff --git a/system-test/query/query.ts b/system-test/query/query.ts index 3c4369ac4..6e75af538 100644 --- a/system-test/query/query.ts +++ b/system-test/query/query.ts @@ -15,8 +15,8 @@ import * as assert from 'assert'; import * as sinon from 'sinon'; import {it} from 'mocha'; -import {QueryClient} from '../../src/query'; -import {protos} from '../../src'; +import {QueryHelper} from '../../src/query'; +import {BigQueryClient, protos} from '../../src'; const sleep = (ms: number) => new Promise(resolve => { @@ -25,24 +25,29 @@ const sleep = (ms: number) => // the GCLOUD_PROJECT environment variable is set as part of test harness setup const projectId = process.env.GCLOUD_PROJECT; +if (!projectId) { + throw new Error('GCLOUD_PROJECT environment variable is not set'); +} + const transports = ['grpc', 'rest']; // run tests with the gRPC client and the REST fallback client transports.forEach(transport => { - let client; + let client: BigQueryClient; if (transport === 'grpc') { - client = new QueryClient({}); + client = new BigQueryClient({}); } else { - client = new QueryClient({fallback: true}); + client = new BigQueryClient({fallback: true}); } + const helper = new QueryHelper({client, projectId}); describe('Run Query', () => { describe(transport, () => { let getQueryResultsSpy: sinon.SinonSpy; beforeEach(async () => { - await client.initialize(); - const {jobClient} = client.getBigQueryClient(); + await helper.initialize(); + const {jobClient} = helper.getBigQueryClient(); getQueryResultsSpy = sinon.spy(jobClient, 'getQueryResults'); }); @@ -66,7 +71,7 @@ transports.forEach(transport => { projectId, }; - const q = await client.startQuery(req); + const q = await helper.startQuery(req); await q.wait(); assert(q.complete); @@ -91,14 +96,14 @@ transports.forEach(transport => { projectId, }; - const q = await client.startQuery(req); + const q = await helper.startQuery(req); const abortCtrl = new AbortController(); q.wait({ signal: abortCtrl.signal, }).catch(err => { assert(err, 'aborted'); }); - await sleep(1000); + await sleep(2000); abortCtrl.abort(); assert(getQueryResultsSpy.callCount >= 1); @@ -120,11 +125,14 @@ transports.forEach(transport => { projectId, }; - let q = await client.startQuery(req); + let q = await helper.startQuery(req); await q.wait(); const jobRef = q.jobReference; - q = await client.attachJob(jobRef); + if (!jobRef) { + throw new Error('jobRef is null'); + } + q = await helper.attachJob(jobRef); await q.wait(); assert(q.complete); @@ -133,7 +141,7 @@ transports.forEach(transport => { }); it('should insert a query job', async () => { - const q = await client.startQueryJob({ + const q = await helper.startQueryJob({ configuration: { query: { query: 'SELECT CURRENT_DATETIME() as foo, SESSION_USER() as bar', diff --git a/tsconfig.json b/tsconfig.json index 748cce5ed..ba1dec360 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -22,5 +22,8 @@ "benchmark/*.ts", "scripts/*.ts", "samples/**/*.json" + ], + "exclude": [ + "system-test/fixtures/sample/**/*.ts" ] } \ No newline at end of file From b47a918ec80ae187537fc76c9ac4f375e5206d25 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 25 Sep 2025 15:48:26 -0400 Subject: [PATCH 7/7] docs: improve overall method jsdocs --- src/query/helper.ts | 38 +++++++++++++++++++--------- src/query/iterator.ts | 9 ++++++- src/query/options.ts | 6 +++++ src/query/query.ts | 59 ++++++++++++++++++++++++++++++++++++++++--- src/query/row.ts | 2 +- 5 files changed, 97 insertions(+), 17 deletions(-) diff --git a/src/query/helper.ts b/src/query/helper.ts index c0b35b071..b69d76c70 100644 --- a/src/query/helper.ts +++ b/src/query/helper.ts @@ -19,14 +19,18 @@ import {protos} from '..'; import {randomUUID} from 'crypto'; /** - * QueryHelper is a helper for running queries in BigQuery. + * The QueryHelper provides a simplified interface for interacting with BigQuery + * queries. It handles the lifecycle of query jobs, from creation to result + * retrieval. */ export class QueryHelper { private client: BigQueryClient; private projectId?: string; /** - * @param {BigQueryClientOptions} options - The configuration object. + * @param {object} opts - The configuration object. + * @param {BigQueryClient} opts.client - A BigQueryClient instance. + * @param {string} [opts.projectId] - The project ID to use. Optional. */ constructor(opts: {client: BigQueryClient; projectId?: string}) { this.client = opts.client; @@ -34,6 +38,11 @@ export class QueryHelper { void this.initialize(); } + /** + * Retrieves the project ID from the client. + * + * @returns {Promise} A promise that resolves with the project ID. + */ async getProjectId(): Promise { if (this.projectId) { return this.projectId; @@ -55,7 +64,7 @@ export class QueryHelper { * * @returns {Promise} A promise that resolves when auth is complete. */ - initialize = async (): Promise => { + async initialize(): Promise { if (this.projectId) { return; } @@ -63,16 +72,16 @@ export class QueryHelper { await jobClient.initialize(); const projectId = await this.getProjectId(); this.projectId = projectId; - }; + } /** - * Runs a query and returns a QueryJob handle. + * Starts a query job using the `jobs.query` API. * * @param {protos.google.cloud.bigquery.v2.IPostQueryRequest} request * The request object that will be sent. * @param {CallOptions} [options] * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. - * @returns {Promise} + * @returns {Promise} A promise that resolves with a Query instance. */ async startQuery( request: protos.google.cloud.bigquery.v2.IPostQueryRequest, @@ -91,13 +100,13 @@ export class QueryHelper { } /** - * Starts a new asynchronous job. + * Starts a new asynchronous query job using the `jobs.insert` API. * * @param {protos.google.cloud.bigquery.v2.IJob} job - * A job resource to insert + * A job resource to insert. * @param {CallOptions} [options] * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. - * @returns {Promise} + * @returns {Promise} A promise that resolves with a Query instance. */ async startQueryJob( job: protos.google.cloud.bigquery.v2.IJob, @@ -120,13 +129,13 @@ export class QueryHelper { } /** - * Create a managed Query from a job reference. + * Attaches to an existing query job. * * @param {protos.google.cloud.bigquery.v2.IJobReference} jobReference - * A job resource to insert + * A job reference object. * @param {CallOptions} [options] * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. - * @returns {Promise} + * @returns {Promise} A promise that resolves with a Query instance. */ async attachJob( jobReference: protos.google.cloud.bigquery.v2.IJobReference, @@ -142,6 +151,11 @@ export class QueryHelper { return Query.fromJobRef_(this, jobReference, options); } + /** + * Returns the BigQueryClient instance. + * + * @returns {BigQueryClient} The BigQueryClient instance. + */ getBigQueryClient(): BigQueryClient { return this.client; } diff --git a/src/query/iterator.ts b/src/query/iterator.ts index 052ad8689..b608cffeb 100644 --- a/src/query/iterator.ts +++ b/src/query/iterator.ts @@ -16,17 +16,24 @@ import {Query} from './query'; import {Row} from './row'; /** - * RowIterator iterates over the results of a query. + * The RowIterator provides a way to iterate over the rows of a query result. + * It can be used with `for await...of` loops. */ export class RowIterator { private query: Query; + /** + * @param {Query} query - The Query instance to iterate over. + * @internal + */ constructor(query: Query) { this.query = query; } /** * Asynchronously iterates over the rows in the query result. + * + * @yields {Row} A row from the query result. */ async *[Symbol.asyncIterator](): AsyncGenerator { // TODO(#1541): implement iterator diff --git a/src/query/options.ts b/src/query/options.ts index d58bbda80..1d0c56a75 100644 --- a/src/query/options.ts +++ b/src/query/options.ts @@ -14,6 +14,12 @@ import {CallOptions as GaxCallOptions} from 'google-gax'; +/** + * Extended call options that include an AbortSignal. + */ export interface CallOptions extends GaxCallOptions { + /** + * An AbortSignal to cancel the operation. + */ signal?: AbortSignal; } diff --git a/src/query/query.ts b/src/query/query.ts index 1402c3a3e..ad819cf17 100644 --- a/src/query/query.ts +++ b/src/query/query.ts @@ -20,7 +20,8 @@ import {setInterval} from 'timers/promises'; import {EventEmitter} from 'stream'; /** - * Query represents a query job. + * The Query object provides a handle to a BigQuery query job. It allows you to + * wait for the job to complete, retrieve results, and access job metadata. */ export class Query { private helper: QueryHelper; @@ -43,6 +44,16 @@ export class Query { this.jobId = null; } + /** + * Creates a Query instance from a query request. + * + * @internal + * + * @param {QueryHelper} helper - The QueryHelper instance. + * @param {protos.google.cloud.bigquery.v2.IPostQueryRequest} request - The query request. + * @param {CallOptions} [options] - Call options. + * @returns {Promise} A promise that resolves with a Query instance. + */ static fromQueryRequest_( helper: QueryHelper, request: protos.google.cloud.bigquery.v2.IPostQueryRequest, @@ -54,6 +65,17 @@ export class Query { return Promise.resolve(q); } + /** + * Creates a Query instance from a job request. + * + * @internal + * + * @param {QueryHelper} helper - The QueryHelper instance. + * @param {protos.google.cloud.bigquery.v2.IJob} job - The job object. + * @param {string} [projectId] - The project ID. + * @param {CallOptions} [options] - Call options. + * @returns {Promise} A promise that resolves with a Query instance. + */ static fromJobRequest_( helper: QueryHelper, job: protos.google.cloud.bigquery.v2.IJob, @@ -67,8 +89,14 @@ export class Query { } /** - * Internal method to instantiate Query handler from job reference + * Creates a Query instance from a job reference. + * * @internal + * + * @param {QueryHelper} helper - The QueryHelper instance. + * @param {protos.google.cloud.bigquery.v2.IJobReference} jobReference - The job reference. + * @param {CallOptions} [options] - Call options. + * @returns {Promise} A promise that resolves with a Query instance. */ static fromJobRef_( helper: QueryHelper, @@ -82,6 +110,12 @@ export class Query { return Promise.resolve(q); } + /** + * Returns a job reference for the query job. + * This will be null until the query job has been successfully submitted. + * + * @returns {protos.google.cloud.bigquery.v2.IJobReference | null} The job reference, or null if not available. + */ get jobReference(): protos.google.cloud.bigquery.v2.IJobReference | null { if (!this.jobId) { return null; @@ -93,14 +127,32 @@ export class Query { }; } + /** + * Returns the schema of the query results. + * This will be null until the query has completed and the schema is available. + * + * @returns {protos.google.cloud.bigquery.v2.ITableSchema | null} The schema, or null if not available. + */ get schema(): protos.google.cloud.bigquery.v2.ITableSchema | null { return null; } + /** + * Whether the query job is complete. + * + * @returns {boolean} True if the job is complete, false otherwise. + */ get complete(): boolean { return this.jobComplete; } + /** + * Returns the auto-generated ID for the query. + * This is only populated for stateless queries (i.e. those started via jobs.query) + * after the query has been submitted. + * + * @returns {string | null} The query ID, or null if not available. + */ get queryId(): string | null { return this._queryId; } @@ -170,6 +222,7 @@ export class Query { * * @param {CallOptions} [options] * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} */ async wait(options?: CallOptions): Promise { if (this.complete) { @@ -196,7 +249,7 @@ export class Query { /** * Returns a RowIterator for the query results. * - * @returns {RowIterator} + * @returns {Promise} A promise that resolves with a RowIterator. */ async read(): Promise { const it = new RowIterator(this); diff --git a/src/query/row.ts b/src/query/row.ts index a45b8cf5f..6a3a67cb5 100644 --- a/src/query/row.ts +++ b/src/query/row.ts @@ -13,6 +13,6 @@ // limitations under the License. /** - * Represents a row in a query result. + * Represents a row in a BigQuery query result. */ export class Row {}