Skip to content

[spark] Support batch union read for lake-enabled primary key tables#3042

Merged
luoyuxia merged 14 commits intoapache:mainfrom
Yohahaha:batch-union-read-pk
Apr 17, 2026
Merged

[spark] Support batch union read for lake-enabled primary key tables#3042
luoyuxia merged 14 commits intoapache:mainfrom
Yohahaha:batch-union-read-pk

Conversation

@Yohahaha
Copy link
Copy Markdown
Contributor

@Yohahaha Yohahaha commented Apr 9, 2026

Purpose

Linked issue: close #2984

This PR adds support for reading lake-enabled primary key tables in the spark sql.

Brief change log

  • Introduces Spark lake upsert batch scan/planning/reader implementation (lake snapshot + KV/log tail merge).
  • Refactors Spark lake test infrastructure and adds primary-key lake read test coverage (Paimon).
  • Moves/refactors LakeSnapshotAndLogSplitScanner into fluss-client and updates Flink/Spark integrations accordingly.

Tests

  • Added SparkLakePrimaryKeyTableReadTestBase with paimon.

API and Format

No API or format changes.

Documentation

No new feature documentation required (extends existing lake reading capability).

@Yohahaha Yohahaha changed the title [spark] Support reading lake-enabled primary key tables in Spark connector [spark] Support reading lake-enabled primary key tables Apr 9, 2026
@Yohahaha Yohahaha changed the title [spark] Support reading lake-enabled primary key tables [spark] Support batch union read for lake-enabled primary key tables Apr 9, 2026
@Yohahaha
Copy link
Copy Markdown
Contributor Author

CI failed with known issue #2992

@Yohahaha
Copy link
Copy Markdown
Contributor Author

@YannByron would you help review this pr?

Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yohahaha Ty for the PR, looks good, just one comment

Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yohahaha Ty, LGTM 👍

@YannByron
Copy link
Copy Markdown
Contributor

LGTM.

@Yohahaha
Copy link
Copy Markdown
Contributor Author

@wuchong @luoyuxia would you help take a look?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Spark SQL batch support for reading lake-enabled primary key (upsert) tables by unioning lake snapshot data with Fluss KV/log tail, and refactors shared lake-reading components for reuse across connectors.

Changes:

  • Introduces Spark lake upsert batch scan/planning/reader implementation (lake snapshot + KV/log tail merge).
  • Refactors Spark lake test infrastructure and adds primary-key lake read test coverage (Paimon).
  • Moves/refactors LakeSnapshotAndLogSplitScanner into fluss-client and updates Flink/Spark integrations accordingly.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala New shared Spark lake test base, including tier-to-lake helper.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala New PK-table lake read tests (fallback, lake-only, union, updates) + Paimon concrete test.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala Refactors log-table lake tests to reuse the new shared base; inlines concrete Paimon/Iceberg tests.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala Removes standalone Paimon log-table lake test (now inlined).
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala Removes standalone Iceberg log-table lake test (now inlined).
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala Moves/renames shared lake utilities and adds split (de)serialization helpers.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala New shared base for lake batch planning + stopping-offset logic.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala Adds lake-only and lake+tail (upsert) Spark input partition types.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala New batch planner for PK lake union read (lake snapshot + log tail).
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala New reader that drives LakeSnapshotAndLogSplitScanner and returns Spark rows.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala New reader factory dispatching between upsert-merge and lake-only readers.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala Refactors append-lake batch to reuse FlussLakeBatch and new utils/package.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala Moves to new package and uses renamed utils/reader.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala Renames lake-only reader class and simplifies reader context creation.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala Adds Spark DSv2 scan builder for lake-enabled PK tables.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala Adds FlussLakeUpsertScan wiring into Spark batch scans.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala Removes lake split partition type (moved) and fixes upsert partition toString formatting.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala Routes PK scans to lake upsert scan when datalake is enabled.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java Updates to use the refactored scanner API now located in fluss-client.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java Moves/refactors scanner to be connector-agnostic (accepts splits + explicit offsets).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Yohahaha Yohahaha force-pushed the batch-union-read-pk branch from 8ff9199 to 4fde215 Compare April 16, 2026 07:47
@Yohahaha
Copy link
Copy Markdown
Contributor Author

@luoyuxia would you help review again?

@luoyuxia
Copy link
Copy Markdown
Contributor

@luoyuxia would you help review again?

Thanks, will have a quick review today.

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yohahaha Thanks for the pr. LGTM overall. Only one minor comments.
Btw, will you support filter pushdown in following pr?

@luoyuxia
Copy link
Copy Markdown
Contributor

Seems we can now remove the limit in https://fluss.apache.org/docs/next/engine-spark/reads/#limitations

@luoyuxia
Copy link
Copy Markdown
Contributor

Btw, I think it maybe great to have a quick for spark just we did for flink in https://fluss.apache.org/docs/next/quickstart/flink

@Yohahaha
Copy link
Copy Markdown
Contributor Author

@Yohahaha Thanks for the pr. LGTM overall. Only one minor comments. Btw, will you support filter pushdown in following pr?

more read optimizations like 'filter pushdown', 'fine-grained split generation' are on the way.

docs update will in following pr.

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@luoyuxia luoyuxia merged commit 26a4a72 into apache:main Apr 17, 2026
6 checks passed
@Yohahaha Yohahaha deleted the batch-union-read-pk branch April 17, 2026 08:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Batch union read for pk table

5 participants