Skip to content

Try using Comet columnar shuffle when Comet native shuffle is not supported #1248

@andygrove

Description

@andygrove

What is the problem the feature request solves?

In some cases, we try Comet native shuffle first, and if that is not supported, then we fall back to Spark rather than trying Comet columnar shuffle.

This seems like an unintended design in CometExecRule where there we have one match arm for native shuffle and one match arm for columnar shuffle:

        // Native shuffle for Comet operators
        case s: ShuffleExchangeExec
            if isCometShuffleEnabled(conf) &&
              isCometNativeShuffleMode(conf) &&
              QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 =>
          logInfo("Comet extension enabled for Native Shuffle")
          
          ...

        // Columnar shuffle for regular Spark operators (not Comet) and Comet operators
        // (if configured).
        // If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
        // convert it to CometColumnarShuffle,
        case s: ShuffleExchangeExec
            if isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
              QueryPlanSerde.supportPartitioningTypes(s.child.output, s.outputPartitioning)._1 &&
              !isShuffleOperator(s.child) =>
          logInfo("Comet extension enabled for JVM Columnar Shuffle")
          
          ...

However, within the first match arm for native shuffle, we return the original shuffle if the child of the shuffle is not native:

          val newOp = transform1(s)
          newOp match {
            case Some(nativeOp) =>
              // Switch to use Decimal128 regardless of precision, since Arrow native execution
              // doesn't support Decimal32 and Decimal64 yet.
              conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
              val cometOp = CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
              CometSinkPlaceHolder(nativeOp, s, cometOp)
            case None =>
              s <-- we fall back to Spark here
          }

It would be better to try native first then columnar by combining these match arms and updating the logic.

Describe the potential solution

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions