Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import java.io.File

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.commons.io.FileUtils
import org.apache.commons.math3.util.Precision
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -158,11 +160,14 @@ trait SparkExpressionTestsBase
logInfo("Offload to native backend in the test.\n")
} else {
logInfo("Not supported in Auron, fall back to vanilla spark in the test.\n")
shouldNotFallback()
shouldNotFallback(
expression,
resultDF,
"expression and children use Auron-supported data types")
}
} else {
logInfo("Has unsupported data type, fall back to vanilla spark.\n")
shouldNotFallback()
shouldNotFallback(expression, resultDF, "expression or child has unsupported data type")
}

if (!(checkResult(result.head.get(0), expected, expression.dataType, expression.nullable)
Expand Down Expand Up @@ -271,19 +276,40 @@ trait SparkExpressionTestsBase
}

private def checkDataTypeSupported(expr: Expression): Boolean = {
SUPPORTED_DATA_TYPES.acceptsType(expr.dataType)
SUPPORTED_DATA_TYPES.acceptsType(expr.dataType) && NativeConverters.isTypeSupported(
expr.dataType)
}

/**
* Placeholder for future fallback checks.
*
* TODO: Implement logic to verify that no unexpected fallbacks occur during expression
* evaluation. Currently, this method is intentionally left empty because the Auron engine has
* many legitimate fallback cases that are not yet fully handled. Once fallback handling is
* stabilized and the expected cases are well defined, implement assertions or checks here to
* ensure that only allowed fallbacks occur.
* Override this in a suite to keep result checks for known fallback cases.
*/
private def shouldNotFallback(): Unit = {}
protected def allowNativeExpressionFallback(expression: Expression): Boolean = {
Set.empty[Expression].contains(expression)
}
Comment on lines +286 to +288

private def shouldNotFallback(
expression: Expression,
resultDF: DataFrame,
fallbackReason: String): Unit = {
if (!allowNativeExpressionFallback(expression) &&
canConvertExpressionToNative(expression)) {
fail(
s"Expression unexpectedly fell back to Spark: $expression\n" +
s"Fallback reason: $fallbackReason\n" +
s"Executed plan:\n${resultDF.queryExecution.executedPlan}")
}
}

private def canConvertExpressionToNative(expression: Expression): Boolean = {
try {
NativeConverters.convertExpr(expression)
true
} catch {
case _: NotImplementedError => false
case _: AssertionError => false
case NonFatal(_) => false
}
}
Comment on lines +303 to +312

/**
* Whether the input row can be converted a to data frame.
Expand Down
Loading