Skip to content

feat(bigquery-v2): add lro polling support for bq jobs - WIP - prototype#5835

Draft
haphungw wants to merge 11 commits into
googleapis:mainfrom
haphungw:bq-jobs-lro
Draft

feat(bigquery-v2): add lro polling support for bq jobs - WIP - prototype#5835
haphungw wants to merge 11 commits into
googleapis:mainfrom
haphungw:bq-jobs-lro

Conversation

@haphungw

@haphungw haphungw commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

Introduce Long-Running Operation (LRO) polling support for BigQuery Jobs.

Unlike standard Google Cloud APIs, bq v2 jobs don't use the standard google.longrunning.Operation proto, so they cannot be automatically wrapped by the generator.

To fix this, we implement DiscoveryOperation, which lets the framework check the DONE state and pull the job_id straight out of Job and GetQueryResultsResponse objects.

For the actual polling, we disable standard RPC retries with NeverRetry policy to avoid nested retry loops by letting the LRO's own retry policy handle transient network issues.

librarian.yaml is updated to have google-cloud-lro as an dependency and leave the custom operation.rs alone when regenerating code.

Note on error handling: bq jobs don't throw standard RPC errors when they fail. Instead, they put the error details inside JobStatus.error_result field. Therefore, the LRO will tell us the poll completed successfully (Ok(Job)) even when the job failed. We'll need to manually check error_result field to know if the job actually succeeded.

Fixes #5751

@haphungw haphungw changed the title feat(bigquery-v2): add lro polling support for bq jobs - WIP feat(bigquery-v2): add lro polling support for bq jobs - WIP - prototype Jun 3, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces long-running operation (LRO) polling support for BigQuery jobs by adding the google-cloud-lro dependency and implementing the DiscoveryOperation trait and poller extension traits for InsertJob and GetQueryResults. The feedback primarily focuses on adhering to the repository style guide by documenting all public modules, traits, and methods with /// instead of suppressing the missing_docs lint. Additionally, it is noted that manual changes to lib.rs will be overwritten upon regeneration, so the generator should be updated to include the module declaration automatically.

Comment thread src/generated/cloud/bigquery/v2/src/lib.rs Outdated
Comment thread src/generated/cloud/bigquery/v2/src/operation.rs
Comment thread src/generated/cloud/bigquery/v2/src/operation.rs Outdated
@codecov

codecov Bot commented Jun 3, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.88%. Comparing base (08abf33) to head (7ccc707).
⚠️ Report is 17 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5835      +/-   ##
==========================================
- Coverage   97.89%   97.88%   -0.01%     
==========================================
  Files         226      226              
  Lines       57625    57625              
==========================================
- Hits        56410    56409       -1     
- Misses       1215     1216       +1     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@alvarowolfx alvarowolfx left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some thoughts by reading it real quick. I think looks promising, but I wonder if we can make it even easier by not requiring extra parameters on the .poller method.

Comment thread tests/bigquery/src/lib.rs Outdated
Comment on lines +186 to +217
@@ -196,12 +198,24 @@ pub async fn job_service() -> Result<()> {
.set_query(JobConfigurationQuery::new().set_query(query)),
),
)
.send()
.await?;
println!("CREATE JOB = {job:?}");
.poller(&client, &project_id, None);

let job = poller.until_done().await?;
println!("CREATE JOB (POLLED) = {job:?}");

assert!(job.job_reference.is_some(), "{job:?}");

// Also test polling for query results
let results_poller = client
.get_query_results()
.set_project_id(&project_id)
.set_job_id(&job_id)
.poller(&client, &project_id, None);

let results = results_poller.until_done().await?;
println!("QUERY RESULTS (POLLED) = {results:?}");
assert_eq!(results.job_complete, Some(true));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks super interesting. Make life much easier to wait for a query to be executed, even with GAPICs.

One thing that's looks odd is that .poller right now is accepting the same parameters used right before it, like client and project_id. Our other poller don't have that and are just a method without any parameters.

https://googleapis.github.io/google-cloud-rust/working_with_long_running_operations.html#automatically-polling-a-long-running-operation

I wonder if we can access the client from inside GetQueryResult/InsertJob (

) and don't need that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking more closely, this is polling the job two times, one using jobs.get via let job = poller.until_done().await?; and another one via jobs.getQueryResults via let results = results_poller.until_done().await?;. This is not very useful, since after is done by checking with jobs.get, it is already going to be done, so the other poller is not required.

We only need one poller for a job, but different job types, might use different polling mechanisms.

  • Query jobs - jobs.getQueryResults
  • Copy/Load/Extract - jobs.get

so we only need this for all jobs:

Query job:

let poller = client
    .insert_job()
    .set_project_id(&project_id)
    .set_job(
        Job::new()
            .set_job_reference(JobReference::new().set_job_id(&job_id))
            .set_configuration(
                JobConfiguration::new()
                    .set_labels([(INSTANCE_LABEL, "true")])
                    .set_query(JobConfigurationQuery::new().set_query(query)),
            ),
    )
    .poller();

let job = poller.until_done().await?;
println!("CREATE QUERY JOB (POLLED) = {job:?}");

Copy job:

let src_table_id = "some_table";
let dest_table_id = "dest_table";
let poller = client
    .insert_job()
    .set_project_id(&project_id)
    .set_job(
        Job::new()
            .set_job_reference(JobReference::new().set_job_id(&job_id))
            .set_configuration(
                JobConfiguration::new()
                    .set_labels([(INSTANCE_LABEL, "true")])
                    .set_copy(JobConfigurationCopy::new(). set_source_table(src_table_id). set_destination_table(dest_table_id)),
            ),
    )
    .poller();

let job = poller.until_done().await?;
println!("CREATE COPY JOB (POLLED) = {job:?}");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed internally, even better for queries, they can just insert it and poll for getQueryResults:

let job = client
    .insert_job()
    .set_project_id(&project_id)
    .set_job(
        Job::new()
            .set_job_reference(JobReference::new().set_job_id(&job_id))
            .set_configuration(
                JobConfiguration::new()
                    .set_labels([(INSTANCE_LABEL, "true")])
                    .set_query(JobConfigurationQuery::new().set_query(query)),
            ),
    )
    .send()
    .await?;

println!("CREATE QUERY JOB = {job:?}");

let results_poller = client
    .get_query_results()
    .set_project_id(&project_id)
    .set_job_id(&job_id)
    .poller(&client, &project_id, None);

let results = results_poller.until_done().await?;
println!("QUERY RESULTS (POLLED) = {results:?}");

Comment on lines +161 to +163
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_query_results() call accepts a set_project_id and set_location

Comment on lines +92 to +94
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_job() call accepts a set_project_id and set_location, and maybe we can grab the JobService from inside InsertJob https://github.com/googleapis/google-cloud-rust/blob/main/src/generated/cloud/bigquery/v2/src/builder.rs#L937

@alvarowolfx alvarowolfx left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more thoughts

b = b.set_location(loc);
}
let mut options = google_cloud_gax::options::RequestOptions::default();
options.set_retry_policy(google_cloud_gax::retry_policy::NeverRetry);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have a way to pass a retry policy, because we can actually need to retry in some scenarios (see https://docs.cloud.google.com/bigquery/docs/error-messages) and even crazier, in some cases we only detect that the job has to be retried by pooling it. In some cases we need to re send the query (call jobs.query or jobs.insert again) with a different ID (see jobBackendError at https://docs.cloud.google.com/bigquery/docs/error-messages). We don't need to solve this right away, just pointing out that we need to take into account that we do need to customize retry policies here.

let mut b = client
.get_query_results()
.set_project_id(project_id)
.set_job_id(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass maxResults=0 when using getQueryResults to wait for a query to finish. For large rows, it can add many seconds to the request time, even when no rows are actually returned.

let project_id = project_id.clone();
let location = location.clone();
async move {
let mut b = client.get_job().set_project_id(project_id).set_job_id(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is using jobs.get to poll for all job types, but jobs.getQueryResults is much better for query jobs. So maybe we need to check if this is a query job and use a different method (or call the jobs.getQueryResults poller)

Comment thread tests/bigquery/src/lib.rs Outdated
Comment on lines +186 to +217
@@ -196,12 +198,24 @@ pub async fn job_service() -> Result<()> {
.set_query(JobConfigurationQuery::new().set_query(query)),
),
)
.send()
.await?;
println!("CREATE JOB = {job:?}");
.poller(&client, &project_id, None);

let job = poller.until_done().await?;
println!("CREATE JOB (POLLED) = {job:?}");

assert!(job.job_reference.is_some(), "{job:?}");

// Also test polling for query results
let results_poller = client
.get_query_results()
.set_project_id(&project_id)
.set_job_id(&job_id)
.poller(&client, &project_id, None);

let results = results_poller.until_done().await?;
println!("QUERY RESULTS (POLLED) = {results:?}");
assert_eq!(results.job_complete, Some(true));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking more closely, this is polling the job two times, one using jobs.get via let job = poller.until_done().await?; and another one via jobs.getQueryResults via let results = results_poller.until_done().await?;. This is not very useful, since after is done by checking with jobs.get, it is already going to be done, so the other poller is not required.

We only need one poller for a job, but different job types, might use different polling mechanisms.

  • Query jobs - jobs.getQueryResults
  • Copy/Load/Extract - jobs.get

so we only need this for all jobs:

Query job:

let poller = client
    .insert_job()
    .set_project_id(&project_id)
    .set_job(
        Job::new()
            .set_job_reference(JobReference::new().set_job_id(&job_id))
            .set_configuration(
                JobConfiguration::new()
                    .set_labels([(INSTANCE_LABEL, "true")])
                    .set_query(JobConfigurationQuery::new().set_query(query)),
            ),
    )
    .poller();

let job = poller.until_done().await?;
println!("CREATE QUERY JOB (POLLED) = {job:?}");

Copy job:

let src_table_id = "some_table";
let dest_table_id = "dest_table";
let poller = client
    .insert_job()
    .set_project_id(&project_id)
    .set_job(
        Job::new()
            .set_job_reference(JobReference::new().set_job_id(&job_id))
            .set_configuration(
                JobConfiguration::new()
                    .set_labels([(INSTANCE_LABEL, "true")])
                    .set_copy(JobConfigurationCopy::new(). set_source_table(src_table_id). set_destination_table(dest_table_id)),
            ),
    )
    .poller();

let job = poller.until_done().await?;
println!("CREATE COPY JOB (POLLED) = {job:?}");

haphungw added 4 commits June 10, 2026 05:22
…nt params

- Removed the generic `PollerExt` trait and replaced it with `InsertJobBuilderExt`.
- Extracted `project_id`, `location`, and `stub` internally using the builder's raw `pub(crate)` fields, achieving a 0-argument `poller()` method.
- The GAPIC `poller` is now tightly scoped to `InsertJob`, leaving other job types (like `jobs.query`) to be handled by the handwritten Query client.
- Updated `tests/bigquery/src/lib.rs` and `driver.rs` to use the 0-argument `poller()`.
- Demonstrated idempotency by passing a custom retry policy to the GAPIC `InsertJobBuilder` via `.with_options()`. The `.poller()` gracefully inherits this behavior under the hood.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Prototype BQ Jobs LRO

2 participants