From 9d965843733f4e09b4add3a4219c735be147d8fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Hoarau?= Date: Fri, 3 Apr 2026 08:57:00 +0400 Subject: [PATCH] Add PR closed_before_date support and retry transient GitHub search failures --- CHANGELOG.md | 11 +++ plugin.json | 2 +- .../connector.json | 10 +- .../github_search-pull-requests/connector.py | 62 ++++++++++--- python-lib/utils/github_utils.py | 91 ++++++++++++++----- 5 files changed, 136 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eef5583..6ae73b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +# Version 1.1.0 - 2026-04-03 +* Search GitHub Pull Requests + * Add a `closed_before_date` parameter to restrict closed pull requests by closing date. + * Allow `since_date` and `closed_before_date` to use DSS variables in addition to literal `YYYY-MM-DD` values. + * Validate resolved date values after variable expansion and improve date-related error messages. +* Reliability + * Retry transient GitHub search failures for HTTP `502`, `503`, and `504` in addition to rate-limit retries. + * Keep upstream HTTP status codes visible in retry logs to make runtime failures easier to diagnose. +* Internal cleanup + * Fix incorrect integer identity comparisons in the updated GitHub search pull request code paths. + # Version 1.0.1 - 2025-05-13 * Fix check of unicity that was not working in case of retry due to rate limit. * Freeze version of PyGithub due to breaking chances on more recent versions around rate limit. diff --git a/plugin.json b/plugin.json index c27b0b7..dd2b685 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id": "github", - "version": "1.0.1", + "version": "1.1.0", "meta": { "label": "Github", "description": "Retrieve data about Github repositories", diff --git a/python-connectors/github_search-pull-requests/connector.json b/python-connectors/github_search-pull-requests/connector.json index 68da96d..c47e35b 100644 --- a/python-connectors/github_search-pull-requests/connector.json +++ b/python-connectors/github_search-pull-requests/connector.json @@ -79,7 +79,15 @@ "label": "Pull Requests created after", "type": "STRING", "defaultValue": "YYYY-MM-DD", - "description": "Keep Pull Requests created after the selected date (YYYY-MM-DD)." + "description": "Keep Pull Requests created after the selected date (YYYY-MM-DD). You can also enter a variable such as ${var_name}." + }, + { + "name": "closed_before_date", + "label": "Pull Requests closed before", + "type": "STRING", + "defaultValue": "", + "description": "Optional. When state is Closed, keep only Pull Requests closed before the selected date (YYYY-MM-DD). Leave empty to keep all closed Pull Requests regardless of closing date. You can also enter a variable such as ${var_name}.", + "visibilityCondition": "model.state == 'closed'" }, { "name": "time_consuming_operations", diff --git a/python-connectors/github_search-pull-requests/connector.py b/python-connectors/github_search-pull-requests/connector.py index d597fd6..24c0cf3 100644 --- a/python-connectors/github_search-pull-requests/connector.py +++ b/python-connectors/github_search-pull-requests/connector.py @@ -1,3 +1,4 @@ +import dataiku from dataiku.connector import Connector import logging from utils import get_github_client, fetch_issues @@ -10,22 +11,48 @@ class GithubSearchPullRequestsConnector(Connector): @staticmethod def resolve_github_team_handles(github_team_handles): - if len(github_team_handles) is 1 and re.fullmatch(r'\[.*\]', github_team_handles[0]) is not None: + if len(github_team_handles) == 1 and re.fullmatch(r'\[.*\]', github_team_handles[0]) is not None: # Variable containing list of users return json.loads(github_team_handles[0]) return github_team_handles @staticmethod - def build_search_query(link_to_users, user_handle, owner, state, since_date): - search_query = "{link_to_users}:{user_handle} user:{owner} is:pr created:>{since_date}".format( - link_to_users=link_to_users, - user_handle=user_handle, - owner=owner, - since_date=since_date - ) + def resolve_and_parse_date_parameter(date_value, field_name, required): + normalized_value = (date_value or "").strip() + + if not normalized_value: + if required: + raise ValueError("{} is mandatory and must be in YYYY-MM-DD format or a variable like ${{var_name}}".format(field_name)) + return "" + + resolved_value = normalized_value + variable_match = re.fullmatch(r"\$\{([^}]+)\}", normalized_value) + if variable_match is not None: + variable_name = variable_match.group(1) + resolved_value = dataiku.get_custom_variables().get(variable_name, "").strip() + if not resolved_value: + if required: + raise ValueError("{} variable '{}' is empty or undefined".format(field_name, variable_name)) + return "" + + if re.fullmatch(r"\d{4}-\d{2}-\d{2}", resolved_value) is None: + raise ValueError("{} must resolve to a YYYY-MM-DD date, got '{}'".format(field_name, resolved_value)) + + return resolved_value + + @staticmethod + def build_search_query(link_to_users, user_handle, owner, state, since_date, closed_before_date): + query_parts = [ + "{}:{}".format(link_to_users, user_handle), + "user:{}".format(owner), + "is:pr", + "created:>{}".format(since_date) + ] if state in ["open", "closed"]: - return "{} state:{}".format(search_query, state) - return search_query + query_parts.append("state:{}".format(state)) + if closed_before_date and state == "closed": + query_parts.append("closed:<{}".format(closed_before_date)) + return " ".join(query_parts) def __init__(self, config, plugin_config): super().__init__(config, plugin_config) # pass the parameters to the base class @@ -35,7 +62,10 @@ def __init__(self, config, plugin_config): self.github_team_handles = self.resolve_github_team_handles(config["github_team_handles"]) self.link_to_users = config["link_to_users"] self.state = config["state"] - self.since_date = config["since_date"] + self.since_date = self.resolve_and_parse_date_parameter(config.get("since_date"), "since_date", required=True) + self.closed_before_date = self.resolve_and_parse_date_parameter( + config.get("closed_before_date"), "closed_before_date", required=False + ) self.fetch_additional_costly_fields = config["fetch_additional_costly_fields"] self.enable_auto_retry = config["enable_auto_retry"] self.number_of_fetch_retry = config["number_of_fetch_retry"] @@ -54,9 +84,9 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, partitio fetched_issues = \ self.fetch_issues_for_users("author", records_limit, remaining_records_to_fetch, query_date) - can_add_new_records = records_limit is -1 or len(self.fetched_issues_unique_ids) < records_limit + can_add_new_records = records_limit == -1 or len(self.fetched_issues_unique_ids) < records_limit if can_add_new_records and self.link_to_users in ["all", "reviewed_by"]: - if records_limit is not -1: + if records_limit != -1: remaining_records_to_fetch -= len(self.fetched_issues_unique_ids) fetched_issues += \ self.fetch_issues_for_users("reviewed-by", records_limit, remaining_records_to_fetch, query_date) @@ -72,7 +102,7 @@ def fetch_issues_for_users(self, link, records_limit, remaining_records_to_fetch ) result += new_issues - if records_limit is not -1: + if records_limit != -1: remaining_records_to_fetch -= len(new_issues) if remaining_records_to_fetch <= 0: logging.info("Max number of record reached ({}). Stop fetching.".format(records_limit)) @@ -81,7 +111,9 @@ def fetch_issues_for_users(self, link, records_limit, remaining_records_to_fetch return result def fetch_issues_for_link_to_users(self, query_date, link_to_users, user_handle, remaining_records_to_fetch, records_limit): - search_query = self.build_search_query(link_to_users, user_handle, self.owner, self.state, self.since_date) + search_query = self.build_search_query( + link_to_users, user_handle, self.owner, self.state, self.since_date, self.closed_before_date + ) logging.info( "Fetching Issues corresponding to search query '{}' (remaining records to fetch: {}, already fetched items: {})".format( search_query, remaining_records_to_fetch, len(self.fetched_issues_unique_ids) diff --git a/python-lib/utils/github_utils.py b/python-lib/utils/github_utils.py index d584292..52d5365 100644 --- a/python-lib/utils/github_utils.py +++ b/python-lib/utils/github_utils.py @@ -27,7 +27,7 @@ def fetch_issues(query_date, github_client, search_query, records_limit, ) searched_issues = github_client.search_issues(query=search_query) for issue in searched_issues: - if records_limit is not -1 and 0 <= records_limit <= current_number_of_fetched_issues: + if records_limit != -1 and 0 <= records_limit <= current_number_of_fetched_issues: logging.info("Limit of {} reached.".format(records_limit)) break new_record = _build_base_issue_record(issue, query_date) @@ -37,13 +37,11 @@ def fetch_issues(query_date, github_client, search_query, records_limit, _handle_costly_fields(fetch_additional_costly_fields, issue, new_record) results.append(new_record) current_number_of_fetched_issues += 1 - except (GithubException, RateLimitExceededException) as rate_limit_exceeded_exception: - if isinstance(rate_limit_exceeded_exception, GithubException) and not \ - (rate_limit_exceeded_exception.status == 403 and - "rate limit" in rate_limit_exceeded_exception.data.get('message', '')): - _raise_unexpected_exception(rate_limit_exceeded_exception) - sleep_or_throw_because_of_rate_limit( - enable_auto_retry, number_of_fetch_retry, current_attempt, github_client, rate_limit_exceeded_exception + except (GithubException, RateLimitExceededException) as fetch_exception: + if not _is_retryable_fetch_exception(fetch_exception): + _raise_unexpected_exception(fetch_exception) + sleep_or_throw_because_of_retryable_fetch_failure( + enable_auto_retry, number_of_fetch_retry, current_attempt, github_client, fetch_exception ) return fetch_issues(query_date, github_client, search_query, records_limit, enable_auto_retry, number_of_fetch_retry, @@ -55,23 +53,39 @@ def fetch_issues(query_date, github_client, search_query, records_limit, return results -def sleep_or_throw_because_of_rate_limit(enable_auto_retry, number_of_fetch_retry, current_attempt, github_client, - rate_limit_exceeded_exception): - logging.error(rate_limit_exceeded_exception) +def sleep_or_throw_because_of_retryable_fetch_failure(enable_auto_retry, number_of_fetch_retry, current_attempt, + github_client, fetch_exception): + logging.error(fetch_exception) now = datetime.utcnow() - search_rate_limit = github_client.get_rate_limit().search retry_log = _build_retry_log(current_attempt, enable_auto_retry, number_of_fetch_retry) - logging.info("Data only partially fetched. Rate limits: {}. Current time: {} {}".format( - _to_rate_limit_dict(search_rate_limit), now, retry_log - )) - infinite_retry = enable_auto_retry and number_of_fetch_retry is -1 - disable_auto_retry = not enable_auto_retry or number_of_fetch_retry is 0 + retry_reason = _get_retry_reason(fetch_exception) + status_code = getattr(fetch_exception, "status", "n/a") + if retry_reason == "rate limit": + search_rate_limit = github_client.get_rate_limit().search + logging.info("Data only partially fetched. Retry reason: {}. Status code: {}. Rate limits: {}. Current time: {} {}".format( + retry_reason, status_code, _to_rate_limit_dict(search_rate_limit), now, retry_log + )) + else: + logging.info("Data only partially fetched. Retry reason: {}. Status code: {}. Current time: {} {}".format( + retry_reason, status_code, now, retry_log + )) + infinite_retry = enable_auto_retry and number_of_fetch_retry == -1 + disable_auto_retry = not enable_auto_retry or number_of_fetch_retry == 0 if disable_auto_retry or (not infinite_retry and current_attempt >= number_of_fetch_retry): - logging.info("Could not fetch result due to rate limits even after {}.".format(retry_log)) - raise rate_limit_exceeded_exception - - seconds_before_reset = (search_rate_limit.reset - now).total_seconds() + 5 - logging.info("Sleeping {} seconds before next attempt to fetch data.".format(seconds_before_reset)) + logging.info("Could not fetch result due to {} (status {}) even after {}.".format( + retry_reason, status_code, retry_log + )) + raise fetch_exception + + if retry_reason == "rate limit": + seconds_before_reset = (search_rate_limit.reset - now).total_seconds() + 5 + else: + # A 502/503/504 from GitHub search is a transient upstream failure. Retrying is safe because + # the connector is read-only and the existing retry controls still bound the behavior. + seconds_before_reset = 5 + logging.info("Sleeping {} seconds before next attempt to fetch data after {} (status {}).".format( + seconds_before_reset, retry_reason, status_code + )) time.sleep(seconds_before_reset) @@ -80,6 +94,37 @@ def _build_retry_log(attempt_number, retry_boolean, max_retry): attempt_number=attempt_number, retry_boolean=retry_boolean, max_retry=max_retry) +def _is_retryable_fetch_exception(fetch_exception): + return _is_rate_limit_exception(fetch_exception) or _is_transient_server_failure(fetch_exception) + + +def _get_retry_reason(fetch_exception): + if _is_rate_limit_exception(fetch_exception): + return "rate limit" + return "transient 5xx" + + +def _is_rate_limit_exception(fetch_exception): + return isinstance(fetch_exception, RateLimitExceededException) or ( + isinstance(fetch_exception, GithubException) and + fetch_exception.status == 403 and + "rate limit" in _get_github_exception_message(fetch_exception) + ) + + +def _is_transient_server_failure(fetch_exception): + return isinstance(fetch_exception, GithubException) and fetch_exception.status in [502, 503, 504] + + +def _get_github_exception_message(fetch_exception): + if not isinstance(fetch_exception, GithubException): + return "" + data = getattr(fetch_exception, "data", {}) + if isinstance(data, dict): + return data.get("message", "").lower() + return "" + + def _handle_costly_fields(fetch_additional_costly_fields, issue_handle, new_record): if not fetch_additional_costly_fields: return @@ -141,4 +186,4 @@ def _enrich_with_column_values(record_raw_data, record_to_enrich, column_names): def _raise_unexpected_exception(unexpected_exception): logging.error("An unexpected exception occurred while fetching issues: %s", unexpected_exception) - raise unexpected_exception \ No newline at end of file + raise unexpected_exception