[spark] Support batch union read for lake-enabled primary key tables#3042
[spark] Support batch union read for lake-enabled primary key tables#3042luoyuxia merged 14 commits intoapache:mainfrom
Conversation
|
CI failed with known issue #2992 |
|
@YannByron would you help review this pr? |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Yohahaha Ty for the PR, looks good, just one comment
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Yohahaha Ty, LGTM 👍
|
LGTM. |
There was a problem hiding this comment.
Pull request overview
This PR adds Spark SQL batch support for reading lake-enabled primary key (upsert) tables by unioning lake snapshot data with Fluss KV/log tail, and refactors shared lake-reading components for reuse across connectors.
Changes:
- Introduces Spark lake upsert batch scan/planning/reader implementation (lake snapshot + KV/log tail merge).
- Refactors Spark lake test infrastructure and adds primary-key lake read test coverage (Paimon).
- Moves/refactors
LakeSnapshotAndLogSplitScannerintofluss-clientand updates Flink/Spark integrations accordingly.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala | New shared Spark lake test base, including tier-to-lake helper. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala | New PK-table lake read tests (fallback, lake-only, union, updates) + Paimon concrete test. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala | Refactors log-table lake tests to reuse the new shared base; inlines concrete Paimon/Iceberg tests. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala | Removes standalone Paimon log-table lake test (now inlined). |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala | Removes standalone Iceberg log-table lake test (now inlined). |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala | Moves/renames shared lake utilities and adds split (de)serialization helpers. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala | New shared base for lake batch planning + stopping-offset logic. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala | Adds lake-only and lake+tail (upsert) Spark input partition types. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala | New batch planner for PK lake union read (lake snapshot + log tail). |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala | New reader that drives LakeSnapshotAndLogSplitScanner and returns Spark rows. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala | New reader factory dispatching between upsert-merge and lake-only readers. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala | Refactors append-lake batch to reuse FlussLakeBatch and new utils/package. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala | Moves to new package and uses renamed utils/reader. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala | Renames lake-only reader class and simplifies reader context creation. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala | Adds Spark DSv2 scan builder for lake-enabled PK tables. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala | Adds FlussLakeUpsertScan wiring into Spark batch scans. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala | Removes lake split partition type (moved) and fixes upsert partition toString formatting. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala | Routes PK scans to lake upsert scan when datalake is enabled. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java | Updates to use the refactored scanner API now located in fluss-client. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java | Moves/refactors scanner to be connector-agnostic (accepts splits + explicit offsets). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
8ff9199 to
4fde215
Compare
|
@luoyuxia would you help review again? |
Thanks, will have a quick review today. |
|
Seems we can now remove the limit in https://fluss.apache.org/docs/next/engine-spark/reads/#limitations |
|
Btw, I think it maybe great to have a quick for spark just we did for flink in https://fluss.apache.org/docs/next/quickstart/flink |
more read optimizations like 'filter pushdown', 'fine-grained split generation' are on the way. docs update will in following pr. |
Purpose
Linked issue: close #2984
This PR adds support for reading lake-enabled primary key tables in the spark sql.
Brief change log
Tests
SparkLakePrimaryKeyTableReadTestBasewith paimon.API and Format
No API or format changes.
Documentation
No new feature documentation required (extends existing lake reading capability).