Rich t kid/dictionary encoding hash optmize#21589
Rich t kid/dictionary encoding hash optmize#21589Rich-T-kid wants to merge 11 commits intoapache:mainfrom
Conversation
|
Currenly the CI is breaking because my current emit implementations returns the values dirrectly instead of returning a dictionary encoded column. & im not handling nulls. |
8ea71fd to
cc18a8f
Compare
|
run benchmarks |
|
Hi @Rich-T-kid, thanks for the request (#21589 (comment)). Only whitelisted users can trigger benchmarks. Allowed users: Dandandan, Fokko, Jefffrey, Omega359, adriangb, alamb, asubiotto, brunal, buraksenn, cetra3, codephage2020, comphead, erenavsarogullari, etseidl, friendlymatthew, gabotechs, geoffreyclaude, grtlr, haohuaijin, jonathanc-n, kevinjqliu, klion26, kosiew, kumarUjjawal, kunalsinghdadhwal, liamzwbao, mbutrovich, mzabaluev, neilconway, rluvaton, sdf-jkl, timsaucer, xudong963, zhuqi-lucas. File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-t-kid/Dictionary-encoding-Hash-optmize (aa69892) to 37cd3de (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-t-kid/Dictionary-encoding-Hash-optmize (aa69892) to 37cd3de (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-t-kid/Dictionary-encoding-Hash-optmize (aa69892) to 37cd3de (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Wrote some criterion benchmarks and they reflect what the query benchmarks show. I think storing Scalar values directly in the hash map is causing performance issues. Need to profile to see heap allocations. |
|
ScalarValue::try_from_array (38%) — allocating a new ScalarValue heap object for every single row lots of heap allocations are happening on the hot path (_xzm_free) - a solution to this may be to pre-allocate space for the unique values array and hashmap. |
Optimization 1 — Hash cachingInstead of hashing each row's value individually, pre-compute hashes for all d distinct values in the values array once per batch and cache them in a Vec indexed by dictionary key. For a batch of n rows with d distinct values this reduces hash computations from O(n) to O(d). Optimization 2 — Eliminate ScalarValueReplace HashMap<ScalarValue, usize> with a structure that stores raw hashes and raw string slices pointing directly into the Arrow buffer. This eliminates per-row heap allocation (ScalarValue::try_from_array) and deallocation (drop_in_place) which the profiler shows accounts for ~60% of intern time combined. Optimization 3 — Pre-allocate using occupancyUse dict_array.occupancy().count_set_bits() to determine the number of truly distinct non-null values in the batch upfront and pre-allocate internal storage accordingly. This avoids incremental Vec growth during intern. |
8adc7ef to
c8fe15a
Compare
796573d to
2d7bc48
Compare
a3ad48e to
eb28497
Compare
Benchmark Results: Dictionary vs GroupValueRows (single dict column)Benchmarks were parameterized across cardinality (XSmall/Small/Medium/Large), batch size (Small=8192 / Medium=32768 / Large=65536), and null rate (Zero/Low/Medium/High). see [single_column_aggr.rs] for the full benchmark code. XSmall cardinality, Small batch Zero nulls: GroupValueRows 73.1 µs → Dictionary 33.9 µs (~2.2x) XSmall cardinality, Medium batch Zero nulls: GroupValueRows 280.5 µs → Dictionary 143.6 µs (~2.0x) Large cardinality, Large batch Zero nulls: GroupValueRows 568.1 µs → Dictionary 217.4 µs (~2.6x) Large cardinality, Large batch (multi_batch) Zero nulls: GroupValueRows 1.668 ms → Dictionary 878 µs (~1.9x) Dictionary is consistently 2–3.4x faster across all configurations. The speedup grows with higher null rates, as nulls increase, Dictionary benefits from its internal value caching, avoiding redundant work on null entries. Full benchmarks belowNote |
eb28497 to
b84ec0b
Compare
b84ec0b to
deec858
Compare
|
CC @tustvold @qrilka, @blaginin given your work on Materialize Dictionaries in Group Keys & Distinct(count x) , would love your thoughts on this. |
| fn supported_single_dictionary_value(t: &DataType) -> bool { | ||
| matches!( | ||
| t, | ||
| DataType::Utf8 | ||
| | DataType::LargeUtf8 | ||
| | DataType::Binary | ||
| | DataType::LargeBinary | ||
| | DataType::Utf8View | ||
| | DataType::BinaryView | ||
| ) | ||
| } |
There was a problem hiding this comment.
These are the most common types, but why not support other types? Other types would fall under the slow path no?
There was a problem hiding this comment.
This PR is intended to serve as a proof of concept that dictionary encoding works and improves efficiency. It makes sense to start with string types since that is what dictionary arrays are most commonly used with and where they provide the most benefit. A follow up issue can be created to support additional types such as LargeUtf8, LargeList, and numeric values ,which should be as minimal as adding the relevant branches in get_raw_bytes and sentinel_repr. This is also related to how sentinel representations of types work but I think thats worth a separate comment.
| @@ -20,4 +20,5 @@ | |||
| pub(crate) mod boolean; | |||
| pub(crate) mod bytes; | |||
| pub(crate) mod bytes_view; | |||
| pub mod dictionary; | |||
There was a problem hiding this comment.
why not pub(crate) ? otherwise we leak GroupValuesDictionary in the public DF API
There was a problem hiding this comment.
this was only to expose the GroupValuesDictionary::new() method for performance benchmarks.
| let raw = Self::get_raw_bytes(values, value_idx); | ||
| if let Some((group_id, _)) = entries | ||
| .iter() | ||
| .find(|(_, stored_bytes)| raw == stored_bytes.as_slice()) |
There was a problem hiding this comment.
Is this comparison for hash collisions?
There was a problem hiding this comment.
Yes, there's a chance of hash collisions where hash_array[i] == hash_array[j] but original_array[i] != original_array[j]. So when a hash already exists in the hash table, we can't just return that group id, we need to verify the actual value matches. We iterate through the vector of values that map to this hash, comparing each one to find the entry where the actual value matches, and return that group id. If no match is found it's a new group.
|
|
||
| */ | ||
| // stores the order new unique elements are seen for self.emit() | ||
| seen_elements: Vec<Vec<u8>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) |
There was a problem hiding this comment.
maybe instread of a Vec<Vec<u8>> we can use another structure like ArrowBytesMap doc: https://docs.rs/datafusion/latest/datafusion/physical_expr_common/binary_map/struct.ArrowBytesMap.html , that would provide better memory tracking but I don't think the insert functions support dictionary right now
There was a problem hiding this comment.
I think this is a good idea with the current approach. The only issue I can think of is in the future when other value types are supported. ArrowBytesMap only supports,
Utf8,
Utf8View,
Binary,
BinaryView,
in my revised PR I could abstract away how seen_elements are stored into a trait comprised ofstore() retrive(N).This should make it easier for future work to integrate with ArrowDictionaryKeyType while also giving us a performance boost in the string/binary case
There was a problem hiding this comment.
Also I think its worth benchmarking this. ArrowBytesMap insert api forces the client to insert arrays, not single elements. its very unlikely that the keys array is a 1-1 mapping of the values array, this means for each insertion we'll to do a Arc::new(StringArray::from(values[idx])). the overhead that comes with creating a new array (heap allocation for the buffer + heap allocation for the offsets + heap allocation for the null bitmap) might negate the benefits the specialized map provides.
There was a problem hiding this comment.
Also the ArrowBytesMap doesnt allow for partial takes. each call to retrieve values from ArrowBytesMap drains the entire data structure, this doesnt blend well with emit since it allows for partial takes.
Null handlingHow nulls are currently handled Why this is hard to extend to other typesThis approach works for Utf8 because there exist byte sequences that are invalid UTF-8, which can serve as an unambiguous sentinel value that could never be confused with real data. However, this is difficult to extend to other types. For example, there is no equivalent concept of an 'invalid' binary buffer, since any sequence of raw bytes is valid by definition. This problem extends to numeric types as well. |

Which issue does this PR close?
This PR make an effort towards #7000 & closing materializing dictionary columns + #21466
A separate follow up PR aims to close the multi-column + dictionary column case
Rationale for this change
This PR implements a specialized GroupValues implementation for single-column dictionary-encoded GROUP BY columns, motivated by the dictionary encoding efficiency work tracked in #7647.
GroupValuesRowswas inefficient for dictionary-encoded columns because it runs the full RowConverter pipeline on every row, decoding the dictionary back to its underlying string values and serializing them into a packed row format for comparison, completely discarding the integer key structure that dictionary encoding provides and doing O(n) expensive string operations when the same d distinct values could be processed just once.Initial approach
the first implementation used HashMap<ScalarValue, usize> to map group values to group indices. While correct, profiling revealed this was significantly slower than GroupValuesRows due to per-row heap allocation from ScalarValue::try_from_array, with ~60% of intern time spent on allocation and deallocation.
Final approach
after profiling and iteration the implementation now uses a two-pass strategy that directly exploits dictionary encoding's structure. Pass 1 iterates the small values array (d distinct values) once, building a key_to_group lookup via raw byte comparison and pre-computed hashes. Pass 2 iterates the keys array (n rows) using only cheap array index lookups — no hashing, no byte comparison, no hashmap lookup in the hot path. This reduces the expensive work from O(n) to O(d) per batch.
Benchmarks show consistent 1.9x–2.7x improvement over GroupValuesRows across all cardinality and batch size configurations with no regressions.
What changes are included in this PR?
Update the match statement in
new_group_valuesto include a custom dictionary encoding branch that works for single fields that are of typeDictionaryarrayAre these changes tested?
Yes a large portion of the PR are test, these include
Are there any user-facing changes?
No everything is internal.