feat(bigquery-v2): add lro polling support for bq jobs - WIP - prototype#5835
feat(bigquery-v2): add lro polling support for bq jobs - WIP - prototype#5835haphungw wants to merge 11 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces long-running operation (LRO) polling support for BigQuery jobs by adding the google-cloud-lro dependency and implementing the DiscoveryOperation trait and poller extension traits for InsertJob and GetQueryResults. The feedback primarily focuses on adhering to the repository style guide by documenting all public modules, traits, and methods with /// instead of suppressing the missing_docs lint. Additionally, it is noted that manual changes to lib.rs will be overwritten upon regeneration, so the generator should be updated to include the module declaration automatically.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5835 +/- ##
==========================================
- Coverage 97.89% 97.88% -0.01%
==========================================
Files 226 226
Lines 57625 57625
==========================================
- Hits 56410 56409 -1
- Misses 1215 1216 +1 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
alvarowolfx
left a comment
There was a problem hiding this comment.
just some thoughts by reading it real quick. I think looks promising, but I wonder if we can make it even easier by not requiring extra parameters on the .poller method.
| @@ -196,12 +198,24 @@ pub async fn job_service() -> Result<()> { | |||
| .set_query(JobConfigurationQuery::new().set_query(query)), | |||
| ), | |||
| ) | |||
| .send() | |||
| .await?; | |||
| println!("CREATE JOB = {job:?}"); | |||
| .poller(&client, &project_id, None); | |||
|
|
|||
| let job = poller.until_done().await?; | |||
| println!("CREATE JOB (POLLED) = {job:?}"); | |||
|
|
|||
| assert!(job.job_reference.is_some(), "{job:?}"); | |||
|
|
|||
| // Also test polling for query results | |||
| let results_poller = client | |||
| .get_query_results() | |||
| .set_project_id(&project_id) | |||
| .set_job_id(&job_id) | |||
| .poller(&client, &project_id, None); | |||
|
|
|||
| let results = results_poller.until_done().await?; | |||
| println!("QUERY RESULTS (POLLED) = {results:?}"); | |||
| assert_eq!(results.job_complete, Some(true)); | |||
There was a problem hiding this comment.
this looks super interesting. Make life much easier to wait for a query to be executed, even with GAPICs.
One thing that's looks odd is that .poller right now is accepting the same parameters used right before it, like client and project_id. Our other poller don't have that and are just a method without any parameters.
I wonder if we can access the client from inside GetQueryResult/InsertJob (
) and don't need that.There was a problem hiding this comment.
looking more closely, this is polling the job two times, one using jobs.get via let job = poller.until_done().await?; and another one via jobs.getQueryResults via let results = results_poller.until_done().await?;. This is not very useful, since after is done by checking with jobs.get, it is already going to be done, so the other poller is not required.
We only need one poller for a job, but different job types, might use different polling mechanisms.
- Query jobs - jobs.getQueryResults
- Copy/Load/Extract - jobs.get
so we only need this for all jobs:
Query job:
let poller = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Job::new()
.set_job_reference(JobReference::new().set_job_id(&job_id))
.set_configuration(
JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_query(JobConfigurationQuery::new().set_query(query)),
),
)
.poller();
let job = poller.until_done().await?;
println!("CREATE QUERY JOB (POLLED) = {job:?}");
Copy job:
let src_table_id = "some_table";
let dest_table_id = "dest_table";
let poller = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Job::new()
.set_job_reference(JobReference::new().set_job_id(&job_id))
.set_configuration(
JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_copy(JobConfigurationCopy::new(). set_source_table(src_table_id). set_destination_table(dest_table_id)),
),
)
.poller();
let job = poller.until_done().await?;
println!("CREATE COPY JOB (POLLED) = {job:?}");
There was a problem hiding this comment.
as we discussed internally, even better for queries, they can just insert it and poll for getQueryResults:
let job = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Job::new()
.set_job_reference(JobReference::new().set_job_id(&job_id))
.set_configuration(
JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_query(JobConfigurationQuery::new().set_query(query)),
),
)
.send()
.await?;
println!("CREATE QUERY JOB = {job:?}");
let results_poller = client
.get_query_results()
.set_project_id(&project_id)
.set_job_id(&job_id)
.poller(&client, &project_id, None);
let results = results_poller.until_done().await?;
println!("QUERY RESULTS (POLLED) = {results:?}");
| client: &JobService, | ||
| project_id: impl Into<String>, | ||
| location: Option<String>, |
There was a problem hiding this comment.
as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_query_results() call accepts a set_project_id and set_location
| client: &JobService, | ||
| project_id: impl Into<String>, | ||
| location: Option<String>, |
There was a problem hiding this comment.
as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_job() call accepts a set_project_id and set_location, and maybe we can grab the JobService from inside InsertJob https://github.com/googleapis/google-cloud-rust/blob/main/src/generated/cloud/bigquery/v2/src/builder.rs#L937
| b = b.set_location(loc); | ||
| } | ||
| let mut options = google_cloud_gax::options::RequestOptions::default(); | ||
| options.set_retry_policy(google_cloud_gax::retry_policy::NeverRetry); |
There was a problem hiding this comment.
we need to have a way to pass a retry policy, because we can actually need to retry in some scenarios (see https://docs.cloud.google.com/bigquery/docs/error-messages) and even crazier, in some cases we only detect that the job has to be retried by pooling it. In some cases we need to re send the query (call jobs.query or jobs.insert again) with a different ID (see jobBackendError at https://docs.cloud.google.com/bigquery/docs/error-messages). We don't need to solve this right away, just pointing out that we need to take into account that we do need to customize retry policies here.
| let mut b = client | ||
| .get_query_results() | ||
| .set_project_id(project_id) | ||
| .set_job_id(name); |
There was a problem hiding this comment.
We should pass maxResults=0 when using getQueryResults to wait for a query to finish. For large rows, it can add many seconds to the request time, even when no rows are actually returned.
| let project_id = project_id.clone(); | ||
| let location = location.clone(); | ||
| async move { | ||
| let mut b = client.get_job().set_project_id(project_id).set_job_id(name); |
There was a problem hiding this comment.
this is using jobs.get to poll for all job types, but jobs.getQueryResults is much better for query jobs. So maybe we need to check if this is a query job and use a different method (or call the jobs.getQueryResults poller)
| @@ -196,12 +198,24 @@ pub async fn job_service() -> Result<()> { | |||
| .set_query(JobConfigurationQuery::new().set_query(query)), | |||
| ), | |||
| ) | |||
| .send() | |||
| .await?; | |||
| println!("CREATE JOB = {job:?}"); | |||
| .poller(&client, &project_id, None); | |||
|
|
|||
| let job = poller.until_done().await?; | |||
| println!("CREATE JOB (POLLED) = {job:?}"); | |||
|
|
|||
| assert!(job.job_reference.is_some(), "{job:?}"); | |||
|
|
|||
| // Also test polling for query results | |||
| let results_poller = client | |||
| .get_query_results() | |||
| .set_project_id(&project_id) | |||
| .set_job_id(&job_id) | |||
| .poller(&client, &project_id, None); | |||
|
|
|||
| let results = results_poller.until_done().await?; | |||
| println!("QUERY RESULTS (POLLED) = {results:?}"); | |||
| assert_eq!(results.job_complete, Some(true)); | |||
There was a problem hiding this comment.
looking more closely, this is polling the job two times, one using jobs.get via let job = poller.until_done().await?; and another one via jobs.getQueryResults via let results = results_poller.until_done().await?;. This is not very useful, since after is done by checking with jobs.get, it is already going to be done, so the other poller is not required.
We only need one poller for a job, but different job types, might use different polling mechanisms.
- Query jobs - jobs.getQueryResults
- Copy/Load/Extract - jobs.get
so we only need this for all jobs:
Query job:
let poller = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Job::new()
.set_job_reference(JobReference::new().set_job_id(&job_id))
.set_configuration(
JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_query(JobConfigurationQuery::new().set_query(query)),
),
)
.poller();
let job = poller.until_done().await?;
println!("CREATE QUERY JOB (POLLED) = {job:?}");
Copy job:
let src_table_id = "some_table";
let dest_table_id = "dest_table";
let poller = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Job::new()
.set_job_reference(JobReference::new().set_job_id(&job_id))
.set_configuration(
JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_copy(JobConfigurationCopy::new(). set_source_table(src_table_id). set_destination_table(dest_table_id)),
),
)
.poller();
let job = poller.until_done().await?;
println!("CREATE COPY JOB (POLLED) = {job:?}");
…nt params - Removed the generic `PollerExt` trait and replaced it with `InsertJobBuilderExt`. - Extracted `project_id`, `location`, and `stub` internally using the builder's raw `pub(crate)` fields, achieving a 0-argument `poller()` method. - The GAPIC `poller` is now tightly scoped to `InsertJob`, leaving other job types (like `jobs.query`) to be handled by the handwritten Query client.
- Updated `tests/bigquery/src/lib.rs` and `driver.rs` to use the 0-argument `poller()`. - Demonstrated idempotency by passing a custom retry policy to the GAPIC `InsertJobBuilder` via `.with_options()`. The `.poller()` gracefully inherits this behavior under the hood.
Introduce Long-Running Operation (LRO) polling support for BigQuery Jobs.
Unlike standard Google Cloud APIs, bq v2 jobs don't use the standard
google.longrunning.Operationproto, so they cannot be automatically wrapped by the generator.To fix this, we implement
DiscoveryOperation, which lets the framework check theDONEstate and pull thejob_idstraight out ofJobandGetQueryResultsResponseobjects.For the actual polling, we disable standard RPC retries with
NeverRetrypolicy to avoid nested retry loops by letting the LRO's own retry policy handle transient network issues.librarian.yamlis updated to havegoogle-cloud-lroas an dependency and leave the customoperation.rsalone when regenerating code.Note on error handling: bq jobs don't throw standard RPC errors when they fail. Instead, they put the error details inside
JobStatus.error_resultfield. Therefore, the LRO will tell us the poll completed successfully(Ok(Job))even when the job failed. We'll need to manually checkerror_resultfield to know if the job actually succeeded.Fixes #5751