Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,8 @@ class AuronSparkTestSettings extends SparkTestSettings {
.disable("Native execution can crash after ParquetQuery in Spark 4")

enableSuite[AuronDateFunctionsSuite]
// Native execution wraps Spark parsing/format validation exceptions in SparkException.
.exclude("function to_date")
.exclude("unix_timestamp")
.exclude("to_unix_timestamp")
// Native date_trunc does not support all Spark aliases such as "yy".
.exclude("function date_trunc")
// Native date_trunc throws for unsupported fields instead of returning NULL as Spark does.
.exclude("unsupported fmt fields for trunc/date_trunc results null")
// Native date_trunc may produce incorrect results for historical timestamps with
// non-UTC timezones due to timezone handling differences in the DataFusion engine.
.exclude("SPARK-30766: date_trunc of old timestamps to hours and days")
.exclude("SPARK-30668: use legacy timestamp parser in to_timestamp")
.disable(
"Native execution can crash in Spark 4 date/partition suites causing cascade failures")

enableSuite[AuronMathFunctionsSuite]
.disable("Native execution can crash in Spark 4")
Expand Down Expand Up @@ -98,8 +88,7 @@ class AuronSparkTestSettings extends SparkTestSettings {
enableSuite[AuronParquetInteroperabilitySuite]
.disable("Native execution can crash in Spark 4")
enableSuite[AuronParquetPartitionDiscoverySuite]
.exclude("read partitioned table - normal case")
.exclude("Resolve type conflicts - decimals, dates and timestamps in partition column")
.disable("Native execution can crash in Spark 4 Parquet partition discovery")
enableSuite[AuronParquetProtobufCompatibilitySuite]
.exclude("unannotated array of primitive type")
.exclude("unannotated array of struct")
Expand Down Expand Up @@ -136,14 +125,7 @@ class AuronSparkTestSettings extends SparkTestSettings {
enableSuite[AuronParquetV1FilterSuite]
.disable("Native execution can crash in Spark 4")
enableSuite[AuronParquetV1PartitionDiscoverySuite]
.exclude("read partitioned table - normal case")
.exclude("read partitioned table - partition key included in Parquet file")
.exclude(
"read partitioned table - with nulls and partition keys are included in Parquet file")
.exclude(
"SPARK-18108 Parquet reader fails when data column types conflict with partition ones")
.exclude(
"SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols")
.disable("Native execution can crash in Spark 4 Parquet partition discovery")
enableSuite[AuronParquetV1QuerySuite]
.exclude("simple select queries")
.exclude("appending")
Expand All @@ -161,9 +143,7 @@ class AuronSparkTestSettings extends SparkTestSettings {
enableSuite[AuronParquetV2FilterSuite]
.disable("Native execution can crash in Spark 4")
enableSuite[AuronParquetV2PartitionDiscoverySuite]
.exclude("read partitioned table - normal case")
.exclude(
"SPARK-22109: Resolve type conflicts between strings and timestamps in partition column")
.disable("Native execution can crash in Spark 4 Parquet partition discovery")
enableSuite[AuronParquetV2QuerySuite]
.exclude("simple select queries")
.exclude("appending")
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.SparkExpressionTestsBase

class AuronCastSuiteWithAnsiModeOn
extends CastSuiteWithAnsiModeOn
with SparkExpressionTestsBase {}
class AuronCastWithAnsiOffSuite extends CastWithAnsiOffSuite with SparkExpressionTestsBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.SparkExpressionTestsBase

class AuronCastSuite extends CastSuite with SparkExpressionTestsBase {}
class AuronCastWithAnsiOnSuite extends CastWithAnsiOnSuite with SparkExpressionTestsBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,8 @@ class AuronSparkTestSettings extends SparkTestSettings {
.disable("Native execution can crash after ParquetQuery in Spark 4")

enableSuite[AuronDateFunctionsSuite]
// Native execution wraps Spark parsing/format validation exceptions in SparkException.
.exclude("function to_date")
.exclude("unix_timestamp")
.exclude("to_unix_timestamp")
// Native date_trunc does not support all Spark aliases such as "yy".
.exclude("function date_trunc")
// Native date_trunc throws for unsupported fields instead of returning NULL as Spark does.
.exclude("unsupported fmt fields for trunc/date_trunc results null")
// Native date_trunc may produce incorrect results for historical timestamps with
// non-UTC timezones due to timezone handling differences in the DataFusion engine.
.exclude("SPARK-30766: date_trunc of old timestamps to hours and days")
.exclude("SPARK-30668: use legacy timestamp parser in to_timestamp")
.disable(
"Native execution can crash in Spark 4 date/partition suites causing cascade failures")

enableSuite[AuronMathFunctionsSuite]
.disable("Native execution can crash in Spark 4")
Expand Down Expand Up @@ -101,8 +91,7 @@ class AuronSparkTestSettings extends SparkTestSettings {
enableSuite[AuronParquetInteroperabilitySuite]
.disable("Native execution can crash in Spark 4")
enableSuite[AuronParquetPartitionDiscoverySuite]
.exclude("read partitioned table - normal case")
.exclude("Infer the TIME data type from partition values")
.disable("Native execution can crash in Spark 4 Parquet partition discovery")
enableSuite[AuronParquetProtobufCompatibilitySuite]
.exclude("unannotated array of primitive type")
.exclude("unannotated array of struct")
Expand All @@ -125,6 +114,11 @@ class AuronSparkTestSettings extends SparkTestSettings {
.exclude("SPARK-31159, SPARK-37705: rebasing timestamps in write")
.exclude("SPARK-31159: rebasing dates in write")
.exclude("SPARK-35427: datetime rebasing in the EXCEPTION mode")
// Spark 4.1 changed datetimeRebaseModeInWrite default to EXCEPTION, which causes these
// tests to fail when writing ancient dates (before 1582-10-15) in LEGACY mode.
.exclude(
"SPARK-33163, SPARK-37705: write the metadata keys 'org.apache.spark.legacyDateTime' and 'org.apache.spark.timeZone'")
.exclude("SPARK-33160, SPARK-37705: write the metadata key 'org.apache.spark.legacyINT96' and 'org.apache.spark.timeZone'")
enableSuite[AuronParquetRebaseDatetimeV1Suite]
.disable("Spark 4 test resources use jar paths unsupported by Hadoop Path")
enableSuite[AuronParquetRebaseDatetimeV2Suite]
Expand All @@ -140,15 +134,7 @@ class AuronSparkTestSettings extends SparkTestSettings {
enableSuite[AuronParquetV1FilterSuite]
.disable("Native execution can crash in Spark 4")
enableSuite[AuronParquetV1PartitionDiscoverySuite]
.exclude("read partitioned table - normal case")
.exclude("Infer the TIME data type from partition values")
.exclude("read partitioned table - partition key included in Parquet file")
.exclude(
"read partitioned table - with nulls and partition keys are included in Parquet file")
.exclude(
"SPARK-18108 Parquet reader fails when data column types conflict with partition ones")
.exclude(
"SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols")
.disable("Native execution can crash in Spark 4 Parquet partition discovery")
enableSuite[AuronParquetV1QuerySuite]
.exclude("simple select queries")
.exclude("appending")
Expand All @@ -167,12 +153,7 @@ class AuronSparkTestSettings extends SparkTestSettings {
enableSuite[AuronParquetV2FilterSuite]
.disable("Native execution can crash in Spark 4")
enableSuite[AuronParquetV2PartitionDiscoverySuite]
.exclude("read partitioned table - normal case")
.exclude("Infer the TIME data type from partition values")
.exclude("_SUCCESS should not break partitioning discovery")
.exclude("Resolve type conflicts - decimals, dates and timestamps in partition column")
.exclude(
"SPARK-22109: Resolve type conflicts between strings and timestamps in partition column")
.disable("Native execution can crash in Spark 4 Parquet partition discovery")
enableSuite[AuronParquetV2QuerySuite]
.exclude("simple select queries")
.exclude("appending")
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.SparkExpressionTestsBase

class AuronCastSuiteWithAnsiModeOn
extends CastSuiteWithAnsiModeOn
with SparkExpressionTestsBase {}
class AuronCastWithAnsiOffSuite extends CastWithAnsiOffSuite with SparkExpressionTestsBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.SparkExpressionTestsBase

class AuronCastSuite extends CastSuite with SparkExpressionTestsBase {}
class AuronCastWithAnsiOnSuite extends CastWithAnsiOnSuite with SparkExpressionTestsBase {}
20 changes: 12 additions & 8 deletions native-engine/auron/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,12 @@ impl NativeExecutionRuntime {
};

let native_wrapper_cloned = native_wrapper.clone();
let is_finalizing_for_handler = is_finalizing.clone();
let join_handle = tokio_runtime.spawn(async move {
consume_stream.await.unwrap_or_else(|err| {
handle_unwinded_scope(|| {
let task_running = is_task_running();
if !task_running {
if !task_running || is_finalizing_for_handler.load(Ordering::Acquire) {
log::warn!("task completed before native execution done");
return Ok(());
Comment thread
SteNicholas marked this conversation as resolved.
}
Expand Down Expand Up @@ -271,11 +272,13 @@ impl NativeExecutionRuntime {
match next_batch() {
Ok(ret) => ret,
Err(err) => {
let _ = set_error(
&self.native_wrapper,
&format!("poll record batch error: {err}"),
None,
);
if !self.is_finalizing.load(Ordering::Acquire) {
let _ = set_error(
&self.native_wrapper,
&format!("poll record batch error: {err}"),
None,
);
}
false
}
}
Expand All @@ -288,8 +291,9 @@ impl NativeExecutionRuntime {
self.update_metrics().unwrap_or_default();
drop(self.plan);

// Set finalizing flag before dropping receiver to allow graceful SendError
// handling
// Set finalizing flag before dropping receiver and native_wrapper to prevent
// concurrent set_error calls from next_batch/tokio workers from accessing a
// freed GlobalRef after finalize completes.
self.is_finalizing.store(true, Ordering::Release);
drop(self.batch_receiver);

Expand Down
Loading