Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# Version 1.1.0 - 2026-04-03
* Search GitHub Pull Requests
* Add a `closed_before_date` parameter to restrict closed pull requests by closing date.
* Allow `since_date` and `closed_before_date` to use DSS variables in addition to literal `YYYY-MM-DD` values.
* Validate resolved date values after variable expansion and improve date-related error messages.
* Reliability
* Retry transient GitHub search failures for HTTP `502`, `503`, and `504` in addition to rate-limit retries.
* Keep upstream HTTP status codes visible in retry logs to make runtime failures easier to diagnose.
* Internal cleanup
* Fix incorrect integer identity comparisons in the updated GitHub search pull request code paths.

# Version 1.0.1 - 2025-05-13
* Fix check of unicity that was not working in case of retry due to rate limit.
* Freeze version of PyGithub due to breaking chances on more recent versions around rate limit.
Expand Down
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "github",
"version": "1.0.1",
"version": "1.1.0",
"meta": {
"label": "Github",
"description": "Retrieve data about Github repositories",
Expand Down
10 changes: 9 additions & 1 deletion python-connectors/github_search-pull-requests/connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@
"label": "Pull Requests created after",
"type": "STRING",
"defaultValue": "YYYY-MM-DD",
"description": "Keep Pull Requests created after the selected date (YYYY-MM-DD)."
"description": "Keep Pull Requests created after the selected date (YYYY-MM-DD). You can also enter a variable such as ${var_name}."
},
{
"name": "closed_before_date",
"label": "Pull Requests closed before",
"type": "STRING",
"defaultValue": "",
"description": "Optional. When state is Closed, keep only Pull Requests closed before the selected date (YYYY-MM-DD). Leave empty to keep all closed Pull Requests regardless of closing date. You can also enter a variable such as ${var_name}.",
"visibilityCondition": "model.state == 'closed'"
},
{
"name": "time_consuming_operations",
Expand Down
62 changes: 47 additions & 15 deletions python-connectors/github_search-pull-requests/connector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataiku
from dataiku.connector import Connector
import logging
from utils import get_github_client, fetch_issues
Expand All @@ -10,22 +11,48 @@ class GithubSearchPullRequestsConnector(Connector):

@staticmethod
def resolve_github_team_handles(github_team_handles):
if len(github_team_handles) is 1 and re.fullmatch(r'\[.*\]', github_team_handles[0]) is not None:
if len(github_team_handles) == 1 and re.fullmatch(r'\[.*\]', github_team_handles[0]) is not None:
# Variable containing list of users
return json.loads(github_team_handles[0])
return github_team_handles

@staticmethod
def build_search_query(link_to_users, user_handle, owner, state, since_date):
search_query = "{link_to_users}:{user_handle} user:{owner} is:pr created:>{since_date}".format(
link_to_users=link_to_users,
user_handle=user_handle,
owner=owner,
since_date=since_date
)
def resolve_and_parse_date_parameter(date_value, field_name, required):
normalized_value = (date_value or "").strip()

if not normalized_value:
if required:
raise ValueError("{} is mandatory and must be in YYYY-MM-DD format or a variable like ${{var_name}}".format(field_name))
return ""

resolved_value = normalized_value
variable_match = re.fullmatch(r"\$\{([^}]+)\}", normalized_value)
if variable_match is not None:
variable_name = variable_match.group(1)
resolved_value = dataiku.get_custom_variables().get(variable_name, "").strip()
if not resolved_value:
if required:
raise ValueError("{} variable '{}' is empty or undefined".format(field_name, variable_name))
return ""

if re.fullmatch(r"\d{4}-\d{2}-\d{2}", resolved_value) is None:
raise ValueError("{} must resolve to a YYYY-MM-DD date, got '{}'".format(field_name, resolved_value))

return resolved_value

@staticmethod
def build_search_query(link_to_users, user_handle, owner, state, since_date, closed_before_date):
query_parts = [
"{}:{}".format(link_to_users, user_handle),
"user:{}".format(owner),
"is:pr",
"created:>{}".format(since_date)
]
if state in ["open", "closed"]:
return "{} state:{}".format(search_query, state)
return search_query
query_parts.append("state:{}".format(state))
if closed_before_date and state == "closed":
query_parts.append("closed:<{}".format(closed_before_date))
return " ".join(query_parts)

def __init__(self, config, plugin_config):
super().__init__(config, plugin_config) # pass the parameters to the base class
Expand All @@ -35,7 +62,10 @@ def __init__(self, config, plugin_config):
self.github_team_handles = self.resolve_github_team_handles(config["github_team_handles"])
self.link_to_users = config["link_to_users"]
self.state = config["state"]
self.since_date = config["since_date"]
self.since_date = self.resolve_and_parse_date_parameter(config.get("since_date"), "since_date", required=True)
self.closed_before_date = self.resolve_and_parse_date_parameter(
config.get("closed_before_date"), "closed_before_date", required=False
)
self.fetch_additional_costly_fields = config["fetch_additional_costly_fields"]
self.enable_auto_retry = config["enable_auto_retry"]
self.number_of_fetch_retry = config["number_of_fetch_retry"]
Expand All @@ -54,9 +84,9 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, partitio
fetched_issues = \
self.fetch_issues_for_users("author", records_limit, remaining_records_to_fetch, query_date)

can_add_new_records = records_limit is -1 or len(self.fetched_issues_unique_ids) < records_limit
can_add_new_records = records_limit == -1 or len(self.fetched_issues_unique_ids) < records_limit
if can_add_new_records and self.link_to_users in ["all", "reviewed_by"]:
if records_limit is not -1:
if records_limit != -1:
remaining_records_to_fetch -= len(self.fetched_issues_unique_ids)
fetched_issues += \
self.fetch_issues_for_users("reviewed-by", records_limit, remaining_records_to_fetch, query_date)
Expand All @@ -72,7 +102,7 @@ def fetch_issues_for_users(self, link, records_limit, remaining_records_to_fetch
)
result += new_issues

if records_limit is not -1:
if records_limit != -1:
remaining_records_to_fetch -= len(new_issues)
if remaining_records_to_fetch <= 0:
logging.info("Max number of record reached ({}). Stop fetching.".format(records_limit))
Expand All @@ -81,7 +111,9 @@ def fetch_issues_for_users(self, link, records_limit, remaining_records_to_fetch
return result

def fetch_issues_for_link_to_users(self, query_date, link_to_users, user_handle, remaining_records_to_fetch, records_limit):
search_query = self.build_search_query(link_to_users, user_handle, self.owner, self.state, self.since_date)
search_query = self.build_search_query(
link_to_users, user_handle, self.owner, self.state, self.since_date, self.closed_before_date
)
logging.info(
"Fetching Issues corresponding to search query '{}' (remaining records to fetch: {}, already fetched items: {})".format(
search_query, remaining_records_to_fetch, len(self.fetched_issues_unique_ids)
Expand Down
91 changes: 68 additions & 23 deletions python-lib/utils/github_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def fetch_issues(query_date, github_client, search_query, records_limit,
)
searched_issues = github_client.search_issues(query=search_query)
for issue in searched_issues:
if records_limit is not -1 and 0 <= records_limit <= current_number_of_fetched_issues:
if records_limit != -1 and 0 <= records_limit <= current_number_of_fetched_issues:
logging.info("Limit of {} reached.".format(records_limit))
break
new_record = _build_base_issue_record(issue, query_date)
Expand All @@ -37,13 +37,11 @@ def fetch_issues(query_date, github_client, search_query, records_limit,
_handle_costly_fields(fetch_additional_costly_fields, issue, new_record)
results.append(new_record)
current_number_of_fetched_issues += 1
except (GithubException, RateLimitExceededException) as rate_limit_exceeded_exception:
if isinstance(rate_limit_exceeded_exception, GithubException) and not \
(rate_limit_exceeded_exception.status == 403 and
"rate limit" in rate_limit_exceeded_exception.data.get('message', '')):
_raise_unexpected_exception(rate_limit_exceeded_exception)
sleep_or_throw_because_of_rate_limit(
enable_auto_retry, number_of_fetch_retry, current_attempt, github_client, rate_limit_exceeded_exception
except (GithubException, RateLimitExceededException) as fetch_exception:
if not _is_retryable_fetch_exception(fetch_exception):
_raise_unexpected_exception(fetch_exception)
sleep_or_throw_because_of_retryable_fetch_failure(
enable_auto_retry, number_of_fetch_retry, current_attempt, github_client, fetch_exception
)
return fetch_issues(query_date, github_client, search_query, records_limit,
enable_auto_retry, number_of_fetch_retry,
Expand All @@ -55,23 +53,39 @@ def fetch_issues(query_date, github_client, search_query, records_limit,
return results


def sleep_or_throw_because_of_rate_limit(enable_auto_retry, number_of_fetch_retry, current_attempt, github_client,
rate_limit_exceeded_exception):
logging.error(rate_limit_exceeded_exception)
def sleep_or_throw_because_of_retryable_fetch_failure(enable_auto_retry, number_of_fetch_retry, current_attempt,
github_client, fetch_exception):
logging.error(fetch_exception)
now = datetime.utcnow()
search_rate_limit = github_client.get_rate_limit().search
retry_log = _build_retry_log(current_attempt, enable_auto_retry, number_of_fetch_retry)
logging.info("Data only partially fetched. Rate limits: {}. Current time: {} {}".format(
_to_rate_limit_dict(search_rate_limit), now, retry_log
))
infinite_retry = enable_auto_retry and number_of_fetch_retry is -1
disable_auto_retry = not enable_auto_retry or number_of_fetch_retry is 0
retry_reason = _get_retry_reason(fetch_exception)
status_code = getattr(fetch_exception, "status", "n/a")
if retry_reason == "rate limit":
search_rate_limit = github_client.get_rate_limit().search
logging.info("Data only partially fetched. Retry reason: {}. Status code: {}. Rate limits: {}. Current time: {} {}".format(
retry_reason, status_code, _to_rate_limit_dict(search_rate_limit), now, retry_log
))
else:
logging.info("Data only partially fetched. Retry reason: {}. Status code: {}. Current time: {} {}".format(
retry_reason, status_code, now, retry_log
))
infinite_retry = enable_auto_retry and number_of_fetch_retry == -1
disable_auto_retry = not enable_auto_retry or number_of_fetch_retry == 0
if disable_auto_retry or (not infinite_retry and current_attempt >= number_of_fetch_retry):
logging.info("Could not fetch result due to rate limits even after {}.".format(retry_log))
raise rate_limit_exceeded_exception

seconds_before_reset = (search_rate_limit.reset - now).total_seconds() + 5
logging.info("Sleeping {} seconds before next attempt to fetch data.".format(seconds_before_reset))
logging.info("Could not fetch result due to {} (status {}) even after {}.".format(
retry_reason, status_code, retry_log
))
raise fetch_exception

if retry_reason == "rate limit":
seconds_before_reset = (search_rate_limit.reset - now).total_seconds() + 5
else:
# A 502/503/504 from GitHub search is a transient upstream failure. Retrying is safe because
# the connector is read-only and the existing retry controls still bound the behavior.
seconds_before_reset = 5
logging.info("Sleeping {} seconds before next attempt to fetch data after {} (status {}).".format(
seconds_before_reset, retry_reason, status_code
))
time.sleep(seconds_before_reset)


Expand All @@ -80,6 +94,37 @@ def _build_retry_log(attempt_number, retry_boolean, max_retry):
attempt_number=attempt_number, retry_boolean=retry_boolean, max_retry=max_retry)


def _is_retryable_fetch_exception(fetch_exception):
return _is_rate_limit_exception(fetch_exception) or _is_transient_server_failure(fetch_exception)


def _get_retry_reason(fetch_exception):
if _is_rate_limit_exception(fetch_exception):
return "rate limit"
return "transient 5xx"


def _is_rate_limit_exception(fetch_exception):
return isinstance(fetch_exception, RateLimitExceededException) or (
isinstance(fetch_exception, GithubException) and
fetch_exception.status == 403 and
"rate limit" in _get_github_exception_message(fetch_exception)
)


def _is_transient_server_failure(fetch_exception):
return isinstance(fetch_exception, GithubException) and fetch_exception.status in [502, 503, 504]


def _get_github_exception_message(fetch_exception):
if not isinstance(fetch_exception, GithubException):
return ""
data = getattr(fetch_exception, "data", {})
if isinstance(data, dict):
return data.get("message", "").lower()
return ""


def _handle_costly_fields(fetch_additional_costly_fields, issue_handle, new_record):
if not fetch_additional_costly_fields:
return
Expand Down Expand Up @@ -141,4 +186,4 @@ def _enrich_with_column_values(record_raw_data, record_to_enrich, column_names):

def _raise_unexpected_exception(unexpected_exception):
logging.error("An unexpected exception occurred while fetching issues: %s", unexpected_exception)
raise unexpected_exception
raise unexpected_exception