From 089dfff3c3e1b2f87ef04d1253c51b26389859a7 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Fri, 5 Jun 2026 11:25:02 +0800 Subject: [PATCH] test: cover mixed-format schema evolution reads --- crates/integration_tests/tests/read_tables.rs | 373 ++++++++++++++++++ dev/spark/provision.py | 132 +++++++ 2 files changed, 505 insertions(+) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index fdf213a3..3526851e 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -96,8 +96,19 @@ async fn scan_and_read_with_fs_catalog( async fn scan_and_read_with_filter( table: &paimon::Table, filter: Predicate, +) -> (Plan, Vec) { + scan_and_read_with_filter_and_projection(table, filter, None).await +} + +async fn scan_and_read_with_filter_and_projection( + table: &paimon::Table, + filter: Predicate, + projection: Option<&[&str]>, ) -> (Plan, Vec) { let mut read_builder = table.new_read_builder(); + if let Some(cols) = projection { + read_builder.with_projection(cols); + } read_builder.with_filter(filter); let scan = read_builder.new_scan(); let plan = scan.plan().await.expect("Failed to plan scan"); @@ -114,6 +125,61 @@ async fn scan_and_read_with_filter( (plan, batches) } +fn extract_plan_file_formats(plan: &Plan) -> HashSet<&str> { + plan.splits() + .iter() + .flat_map(|split| split.data_files()) + .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext)) + .collect() +} + +fn plan_data_file_count(plan: &Plan) -> usize { + plan.splits() + .iter() + .map(|split| split.data_files().len()) + .sum() +} + +async fn plan_files_all_have_field(table: &paimon::Table, plan: &Plan, field_name: &str) -> bool { + for file in plan.splits().iter().flat_map(|split| split.data_files()) { + let schema = table + .schema_manager() + .schema(file.schema_id) + .await + .expect("Failed to load file schema"); + if !schema + .fields() + .iter() + .any(|field| field.name() == field_name) + { + return false; + } + } + true +} + +async fn plan_files_all_missing_field( + table: &paimon::Table, + plan: &Plan, + field_name: &str, +) -> bool { + for file in plan.splits().iter().flat_map(|split| split.data_files()) { + let schema = table + .schema_manager() + .schema(file.schema_id) + .await + .expect("Failed to load file schema"); + if schema + .fields() + .iter() + .any(|field| field.name() == field_name) + { + return false; + } + } + true +} + fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> { let mut rows = Vec::new(); for batch in batches { @@ -1113,6 +1179,142 @@ async fn test_read_schema_evolution_add_column() { ); } +/// Test reading a mixed-format table after ALTER TABLE ADD COLUMNS with +/// projection and filters on the added column. +#[tokio::test] +async fn test_read_schema_evolution_add_column_mixed_format_projection_and_filter() { + use paimon::spec::{Datum, PredicateBuilder}; + + fn extract_age_rows(batches: &[RecordBatch]) -> Vec> { + let mut rows = Vec::new(); + for batch in batches { + assert_eq!( + batch.schema().fields().len(), + 1, + "projection should only read age" + ); + assert_eq!(batch.schema().field(0).name(), "age"); + + let age = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected Int32Array for age"); + for row in 0..batch.num_rows() { + rows.push((!age.is_null(row)).then(|| age.value(row))); + } + } + rows.sort(); + rows + } + + fn extract_age_id_rows(batches: &[RecordBatch]) -> Vec<(Option, i32)> { + let mut rows = Vec::new(); + for batch in batches { + assert_eq!( + batch.schema().fields().len(), + 2, + "projection should only read age and id" + ); + assert_eq!(batch.schema().field(0).name(), "age"); + assert_eq!(batch.schema().field(1).name(), "id"); + + let age = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected Int32Array for age"); + let id = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("Expected Int32Array for id"); + for row in 0..batch.num_rows() { + rows.push(((!age.is_null(row)).then(|| age.value(row)), id.value(row))); + } + } + rows.sort_by_key(|(_, id)| *id); + rows + } + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "schema_evolution_add_column_mixed_format").await; + + let (plan, age_batches) = scan_and_read( + &catalog, + "schema_evolution_add_column_mixed_format", + Some(&["age"]), + ) + .await; + assert_eq!( + extract_plan_file_formats(&plan), + HashSet::from(["avro", "orc", "parquet"]), + "mixed-format add-column table should scan all provisioned formats" + ); + assert_eq!( + extract_age_rows(&age_batches), + vec![None, None, Some(50), Some(50), Some(50)], + "Projection on only the added column should fill nulls for old files" + ); + + let (_, reordered_batches) = scan_and_read( + &catalog, + "schema_evolution_add_column_mixed_format", + Some(&["age", "id"]), + ) + .await; + assert_eq!( + extract_age_id_rows(&reordered_batches), + vec![ + (None, 1), + (None, 2), + (Some(50), 3), + (Some(50), 4), + (Some(50), 5), + ], + "Reordered projection should preserve requested output order across schema ids" + ); + + let pb = PredicateBuilder::new(table.schema().fields()); + let age_is_null = pb.is_null("age").expect("Failed to build age IS NULL"); + let (null_plan, null_batches) = + scan_and_read_with_filter_and_projection(&table, age_is_null, Some(&["age", "id"])).await; + assert_eq!( + extract_plan_file_formats(&null_plan), + HashSet::from(["parquet"]), + "age IS NULL should keep only old schema Parquet files missing age" + ); + assert!( + plan_files_all_missing_field(&table, &null_plan, "age").await, + "age IS NULL should prune all new-schema files with non-null age" + ); + assert_eq!( + extract_age_id_rows(&null_batches), + vec![(None, 1), (None, 2)], + "age IS NULL should read null-filled age values from old files" + ); + + let age_eq_50 = pb + .equal("age", Datum::Int(50)) + .expect("Failed to build age = 50"); + let (eq_plan, eq_batches) = + scan_and_read_with_filter_and_projection(&table, age_eq_50, Some(&["age", "id"])).await; + assert_eq!( + extract_plan_file_formats(&eq_plan), + HashSet::from(["avro", "orc", "parquet"]), + "age = 50 should keep all new-schema file formats" + ); + assert!( + plan_files_all_have_field(&table, &eq_plan, "age").await, + "age = 50 should prune old schema files missing age" + ); + assert_eq!( + extract_age_id_rows(&eq_batches), + vec![(Some(50), 3), (Some(50), 4), (Some(50), 5)], + "age = 50 should read projected columns from every new-schema file format" + ); +} + /// Test reading a table after ALTER TABLE ALTER COLUMN TYPE (INT -> BIGINT). /// Old data files have INT; reader should cast to BIGINT. #[tokio::test] @@ -1246,6 +1448,68 @@ async fn test_stats_pruning_schema_evolution_type_promotion_prunes_old_int_files ); } +/// Type-promotion filters should still prune old INT files in a mixed-format +/// table when the filter column is not part of the requested projection. +#[tokio::test] +async fn test_read_schema_evolution_type_promotion_mixed_format_filter_column_not_projected() { + use paimon::spec::{Datum, PredicateBuilder}; + + let catalog = create_file_system_catalog(); + let table = + get_table_from_catalog(&catalog, "schema_evolution_type_promotion_mixed_format").await; + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .greater_than("value", Datum::Long(2_500_000_000)) + .expect("Failed to build predicate"); + + let (plan, batches) = + scan_and_read_with_filter_and_projection(&table, filter, Some(&["id"])).await; + assert_eq!( + plan_data_file_count(&plan), + 3, + "Old INT files should be pruned while all BIGINT files remain" + ); + assert_eq!( + extract_plan_file_formats(&plan), + HashSet::from(["avro", "orc", "parquet"]), + "Projected type-promotion filter should keep all new-schema file formats" + ); + assert!( + plan.splits() + .iter() + .flat_map(|split| split.data_files()) + .all(|file| file.schema_id == table.schema().id()), + "value > 2_500_000_000 should prune old-schema INT files" + ); + + let mut rows = Vec::new(); + for batch in &batches { + assert_eq!( + batch.num_columns(), + 1, + "Projected read should only return the requested id column" + ); + assert!( + batch.column_by_name("value").is_none(), + "Filter-only column should not be returned by projection" + ); + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + for i in 0..batch.num_rows() { + rows.push(id.value(i)); + } + } + rows.sort_unstable(); + + assert_eq!( + rows, + vec![3, 4, 5], + "Projected read should still return rows matching value > 2_500_000_000" + ); +} + /// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS. /// Old files lack the new column; reader should fill nulls even in data evolution mode. #[tokio::test] @@ -1379,6 +1643,115 @@ async fn test_read_schema_evolution_drop_column() { ); } +/// Test reading a mixed-format table after ALTER COLUMN ... FIRST/AFTER. +/// Old files keep the original physical column order; new files use moved columns. +#[tokio::test] +async fn test_read_mixed_format_schema_evolution_reorder_move_column() { + let (plan, batches) = + scan_and_read_with_fs_catalog("mixed_format_schema_evolution_reorder_move_column", None) + .await; + + let formats: HashSet<&str> = plan + .splits() + .iter() + .flat_map(|split| split.data_files()) + .filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext)) + .collect(); + assert_eq!( + formats, + HashSet::from(["avro", "orc", "parquet"]), + "mixed_format_schema_evolution_reorder_move_column should scan all provisioned file formats" + ); + + for batch in &batches { + let schema = batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["right_value", "left_value", "id"], + "Full read should expose the current table schema order" + ); + } + + let mut rows: Vec<(i32, String, String)> = Vec::new(); + for batch in &batches { + let right_value = batch + .column_by_name("right_value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("right_value"); + let left_value = batch + .column_by_name("left_value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("left_value"); + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + for i in 0..batch.num_rows() { + rows.push(( + id.value(i), + left_value.value(i).to_string(), + right_value.value(i).to_string(), + )); + } + } + rows.sort_by_key(|(id, _, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "parquet-left-1".into(), "parquet-right-1".into()), + (2, "parquet-left-2".into(), "parquet-right-2".into()), + (3, "orc-left-3".into(), "orc-right-3".into()), + (4, "orc-left-4".into(), "orc-right-4".into()), + (5, "avro-left-5".into(), "avro-right-5".into()), + (6, "avro-left-6".into(), "avro-right-6".into()), + ], + "Mixed-format REORDER/MOVE COLUMN should read values by field id, not physical position" + ); + + let (_, projected_batches) = scan_and_read_with_fs_catalog( + "mixed_format_schema_evolution_reorder_move_column", + Some(&["id", "right_value"]), + ) + .await; + let mut projected_rows: Vec<(i32, String)> = Vec::new(); + for batch in &projected_batches { + let schema = batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["id", "right_value"], + "Projection should follow caller-specified order" + ); + + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("projected id"); + let right_value = batch + .column_by_name("right_value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("projected right_value"); + for i in 0..batch.num_rows() { + projected_rows.push((id.value(i), right_value.value(i).to_string())); + } + } + projected_rows.sort_by_key(|(id, _)| *id); + assert_eq!( + projected_rows, + vec![ + (1, "parquet-right-1".into()), + (2, "parquet-right-2".into()), + (3, "orc-right-3".into()), + (4, "orc-right-4".into()), + (5, "avro-right-5".into()), + (6, "avro-right-6".into()), + ], + "Projection should still map reordered old and new files by field id" + ); +} + // --------------------------------------------------------------------------- // Complex type integration tests // --------------------------------------------------------------------------- diff --git a/dev/spark/provision.py b/dev/spark/provision.py index c7c408d3..39bdac8b 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -349,6 +349,47 @@ def main(): "INSERT INTO schema_evolution_add_column VALUES (3, 'carol', 30), (4, 'dave', 40)" ) + # ===== Schema Evolution: Add Column Mixed Format ===== + # Old Parquet files have (id, name); after ADD COLUMNS, Parquet/ORC/Avro files + # have (id, name, age). Reader must combine projection, filter, and schema + # evolution across all data file formats. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS schema_evolution_add_column_mixed_format ( + id INT, + name STRING + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + """ + INSERT INTO schema_evolution_add_column_mixed_format VALUES + (1, 'alice'), + (2, 'bob') + """ + ) + spark.sql( + "ALTER TABLE schema_evolution_add_column_mixed_format ADD COLUMNS (age INT)" + ) + spark.sql( + "INSERT INTO schema_evolution_add_column_mixed_format VALUES (3, 'carol', 50)" + ) + spark.sql( + "ALTER TABLE schema_evolution_add_column_mixed_format SET TBLPROPERTIES ('file.format' = 'orc')" + ) + spark.sql( + "INSERT INTO schema_evolution_add_column_mixed_format VALUES (4, 'dave', 50)" + ) + spark.sql( + "ALTER TABLE schema_evolution_add_column_mixed_format SET TBLPROPERTIES ('file.format' = 'avro')" + ) + spark.sql( + "INSERT INTO schema_evolution_add_column_mixed_format VALUES (5, 'eve', 50)" + ) + # ===== Schema Evolution: Type Promotion (INT -> BIGINT) ===== # Old files have value as INT; after ALTER TABLE, new files have value as BIGINT. # Reader must cast INT to BIGINT when reading old files. @@ -370,6 +411,47 @@ def main(): "INSERT INTO schema_evolution_type_promotion VALUES (3, 3000000000)" ) + # ===== Schema Evolution: Type Promotion Mixed Format (INT -> BIGINT) ===== + # Old Parquet files have value as INT; after ALTER TABLE, Parquet/ORC/Avro + # files have value as BIGINT. Readers should combine type-promotion stats + # pruning with projections whose filter column is not selected. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS schema_evolution_type_promotion_mixed_format ( + id INT, + value INT + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + """ + INSERT INTO schema_evolution_type_promotion_mixed_format VALUES + (1, 100), + (2, 200) + """ + ) + spark.sql( + "ALTER TABLE schema_evolution_type_promotion_mixed_format ALTER COLUMN value TYPE BIGINT" + ) + spark.sql( + "INSERT INTO schema_evolution_type_promotion_mixed_format VALUES (3, 3000000000)" + ) + spark.sql( + "ALTER TABLE schema_evolution_type_promotion_mixed_format SET TBLPROPERTIES ('file.format' = 'orc')" + ) + spark.sql( + "INSERT INTO schema_evolution_type_promotion_mixed_format VALUES (4, 4000000000)" + ) + spark.sql( + "ALTER TABLE schema_evolution_type_promotion_mixed_format SET TBLPROPERTIES ('file.format' = 'avro')" + ) + spark.sql( + "INSERT INTO schema_evolution_type_promotion_mixed_format VALUES (5, 5000000000)" + ) + # ===== Data Evolution + Schema Evolution: Add Column ===== # Combines data-evolution (row-tracking + MERGE INTO) with ALTER TABLE ADD COLUMNS. # Old files lack the new column; MERGE INTO produces partial-column files. @@ -553,6 +635,56 @@ def main(): """ ) + # ===== Mixed-format Schema Evolution: Reorder/Move Column ===== + # Old Parquet files use the original order (id, left_value, right_value). + # ORC and Avro files are written after moving columns; readers should expose + # the current table schema order and map old/new files by field id. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS mixed_format_schema_evolution_reorder_move_column ( + id INT, + left_value STRING, + right_value STRING + ) USING paimon + TBLPROPERTIES ( + 'file.format' = 'parquet' + ) + """ + ) + spark.sql( + """ + INSERT INTO mixed_format_schema_evolution_reorder_move_column VALUES + (1, 'parquet-left-1', 'parquet-right-1'), + (2, 'parquet-left-2', 'parquet-right-2') + """ + ) + spark.sql( + "ALTER TABLE mixed_format_schema_evolution_reorder_move_column ALTER COLUMN right_value FIRST" + ) + spark.sql( + "ALTER TABLE mixed_format_schema_evolution_reorder_move_column SET TBLPROPERTIES ('file.format' = 'orc')" + ) + spark.sql( + """ + INSERT INTO mixed_format_schema_evolution_reorder_move_column VALUES + ('orc-right-3', 3, 'orc-left-3'), + ('orc-right-4', 4, 'orc-left-4') + """ + ) + spark.sql( + "ALTER TABLE mixed_format_schema_evolution_reorder_move_column ALTER COLUMN left_value AFTER right_value" + ) + spark.sql( + "ALTER TABLE mixed_format_schema_evolution_reorder_move_column SET TBLPROPERTIES ('file.format' = 'avro')" + ) + spark.sql( + """ + INSERT INTO mixed_format_schema_evolution_reorder_move_column VALUES + ('avro-right-5', 'avro-left-5', 5), + ('avro-right-6', 'avro-left-6', 6) + """ + ) + # ===== Complex Types table: ARRAY, MAP, STRUCT ===== spark.sql( """