Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkSubmitOperator on kubernetes. Due to task failure and retry, _spark_exit_code is not 0, which eventually leads to task status failure #44810

Open
2 tasks done
bin-lian opened this issue Dec 10, 2024 · 1 comment
Labels
area:core area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:apache-spark

Comments

@bin-lian
Copy link

bin-lian commented Dec 10, 2024

Apache Airflow version

2.10.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I use airflow to schedule spark jobs on k8s using SparkSubmitOperator.The spark job succeeded, but the airflow status is failed

What you think should happen instead?

Due to memory oom exceptions in some tasks, the exit code is generated, resulting in _spark_exit_code not being equal to 0. However, the task will retry itself, and the spark task is ultimately successful. Since _spark_exit_code is not 0, SparkSubmitOperator considers the task status to be a failure. Is it possible to not check _spark_exit_code? The status code returned by the child process shall prevail (returncode)

95DF0514-E3BB-4B53-9F82-3948223F49B6

    def submit(self, application: str = "", **kwargs: Any) -> None:
        """
        Remote Popen to execute the spark-submit job.

        :param application: Submitted application, jar or py file
        :param kwargs: extra arguments to Popen (see subprocess.Popen)
        """
        spark_submit_cmd = self._build_spark_submit_command(application)

        if self._env:
            env = os.environ.copy()
            env.update(self._env)
            kwargs["env"] = env

        self._submit_sp = subprocess.Popen(
            spark_submit_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            bufsize=-1,
            universal_newlines=True,
            **kwargs,
        )

        self._process_spark_submit_log(iter(self._submit_sp.stdout))  # type: ignore
        returncode = self._submit_sp.wait()

        # Check spark-submit return code. In Kubernetes mode, also check the value
        # of exit code in the log, as it may differ.
        if returncode or (self._is_kubernetes and self._spark_exit_code != 0):
            if self._is_kubernetes:
                raise AirflowException(
                    f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}. "
                    f"Kubernetes spark exit code is: {self._spark_exit_code}"
                )
            else:
                raise AirflowException(
                    f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}."
                )

        self.log.debug("Should track driver: %s", self._should_track_driver_status)

        # We want the Airflow job to wait until the Spark driver is finished
        if self._should_track_driver_status:
            if self._driver_id is None:
                raise AirflowException(
                    "No driver id is known: something went wrong when executing the spark submit command"
                )

            # We start with the SUBMITTED status as initial status
            self._driver_status = "SUBMITTED"

            # Start tracking the driver status (blocking function)
            self._start_driver_status_tracking()

            if self._driver_status != "FINISHED":
                raise AirflowException(
                    f"ERROR : Driver {self._driver_id} badly exited with status {self._driver_status}"
                )
        """
        Process the log files and extract useful information out of it.

        If the deploy-mode is 'client', log the output of the submit command as those
        are the output logs of the Spark worker directly.

        Remark: If the driver needs to be tracked for its status, the log-level of the
        spark deploy needs to be at least INFO (log4j.logger.org.apache.spark.deploy=INFO)

        :param itr: An iterator which iterates over the input of the subprocess
        """
        # Consume the iterator
        for line in itr:
            line = line.strip()
            # If we run yarn cluster mode, we want to extract the application id from
            # the logs so we can kill the application when we stop it unexpectedly
            if self._is_yarn and self._connection["deploy_mode"] == "cluster":
                match = re.search("application[0-9_]+", line)
                if match:
                    self._yarn_application_id = match.group(0)
                    self.log.info("Identified spark application id: %s", self._yarn_application_id)

            # If we run Kubernetes cluster mode, we want to extract the driver pod id
            # from the logs so we can kill the application when we stop it unexpectedly
            elif self._is_kubernetes:
                match_driver_pod = re.search(r"\s*pod name: ((.+?)-([a-z0-9]+)-driver$)", line)
                if match_driver_pod:
                    self._kubernetes_driver_pod = match_driver_pod.group(1)
                    self.log.info("Identified spark driver pod: %s", self._kubernetes_driver_pod)

                match_application_id = re.search(r"\s*spark-app-selector -> (spark-([a-z0-9]+)), ", line)
                if match_application_id:
                    self._kubernetes_application_id = match_application_id.group(1)
                    self.log.info("Identified spark application id: %s", self._kubernetes_application_id)

                # Store the Spark Exit code
                match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line)
                if match_exit_code:
                    self._spark_exit_code = int(match_exit_code.group(1))

            # if we run in standalone cluster mode and we want to track the driver status
            # we need to extract the driver id from the logs. This allows us to poll for
            # the status using the driver id. Also, we can kill the driver when needed.
            elif self._should_track_driver_status and not self._driver_id:
                match_driver_id = re.search(r"driver-[0-9\-]+", line)
                if match_driver_id:
                    self._driver_id = match_driver_id.group(0)
                    self.log.info("identified spark driver id: %s", self._driver_id)

            self.log.info(line)

How to reproduce

You can make the internal task reproduce the problem due to partial OOM failure

Operating System

PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@bin-lian bin-lian added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 10, 2024
Copy link

boring-cyborg bot commented Dec 10, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:apache-spark
Projects
None yet
Development

No branches or pull requests

1 participant