-
Notifications
You must be signed in to change notification settings - Fork 412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expand the functionality of Local Runtime Filter #7891
Comments
but even with late materialization, we also need to read this pack, which means can not help reduce IO. |
It might help reduce IO of other columns, if none of the filter_column pack pass the filter condition. Besides the major improvement will be the reduction of network communications for exchange. |
make sense, then why we need |
For TPCH100, since the major improvement is achieved by reducing network communications for exchange, identified it with 'apply_late_materialization'. If no exchange in probe side, there seems little performance improvement. |
If there is a filter like |
Not quite get your point. Using late materialization will improve performance in two ways:
For TPCH100, the major improvement is introduced by the second one, and might introduce performance degration when no exhcange cost reduced. Thus introduce "apply_late_materialization" flag to indicate whether exchange cost can be reduced. |
mysql> explain analyze select * from a join b on a.id = b.id where regexp_like(a.p, '.*') and regexp_like(b.p, '.*');
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
| id | estRows | actRows | task | access object | execution info | operator info | memory | disk |
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
| TableReader_37 | 9990.00 | 0 | root | | time:15.1ms, loops:1, RU:0.000000, cop_task: {num: 1, max: 0s, proc_keys: 0, copr_cache_hit_ratio: 0.00} | MppVersion: 2, data:ExchangeSender_36 | 595 Bytes | N/A |
| └─ExchangeSender_36 | 9990.00 | 0 | mpp[tiflash] | | tiflash_task:{time:13ms, loops:0, threads:36} | ExchangeType: PassThrough | N/A | N/A |
| └─HashJoin_35 | 9990.00 | 0 | mpp[tiflash] | | tiflash_task:{time:12ms, loops:0, threads:36} | inner join, equal:[eq(test.a.id, test.b.id)] | N/A | N/A |
| ├─ExchangeReceiver_16(Build) | 7992.00 | 0 | mpp[tiflash] | | tiflash_task:{time:10ms, loops:0, threads:36} | | N/A | N/A |
| │ └─ExchangeSender_15 | 7992.00 | 0 | mpp[tiflash] | | tiflash_task:{time:7.56ms, loops:0, threads:36} | ExchangeType: Broadcast, Compression: FAST | N/A | N/A |
| │ └─Selection_14 | 7992.00 | 0 | mpp[tiflash] | | tiflash_task:{time:6.56ms, loops:0, threads:36} | not(isnull(test.a.id)), regexp_like(test.a.p, ".*") | N/A | N/A |
| │ └─TableFullScan_13 | 10000.00 | 0 | mpp[tiflash] | table:a | tiflash_task:{time:6.56ms, loops:0, threads:36}, tiflash_scan:{dtfile:{total_scanned_packs:0, total_skipped_packs:0, total_scanned_rows:0, total_skipped_rows:0, total_rs_index_load_time: 0ms, total_read_time: 0ms}, total_create_snapshot_time: 0ms, total_local_region_num: 1, total_remote_region_num: 0} | pushed down filter:empty, keep order:false, stats:pseudo | N/A | N/A |
| └─Selection_18(Probe) | 7992.00 | 0 | mpp[tiflash] | | tiflash_task:{time:10ms, loops:0, threads:36} | not(isnull(test.b.id)), regexp_like(test.b.p, ".*") | N/A | N/A |
| └─TableFullScan_17 | 10000.00 | 0 | mpp[tiflash] | table:b | tiflash_task:{time:10ms, loops:0, threads:36}, tiflash_scan:{dtfile:{total_scanned_packs:0, total_skipped_packs:0, total_scanned_rows:0, total_skipped_rows:0, total_rs_index_load_time: 0ms, total_read_time: 0ms}, total_create_snapshot_time: 0ms, total_local_region_num: 1, total_remote_region_num: 0} | pushed down filter:empty, keep order:false, stats:pseudo | N/A | N/A |
+--------------------------------------+----------+---------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------+-----------+------+
9 rows in set (0.02 sec) late materialization may help reduce the cost of |
I see, you're right, it acts more like adding a filter above TableScan operator. |
Yes, and since runtime filter usually can filter out many rows, and it is light weight |
If there are no heavy operators between table scan and join, late materialization seems degrade performance a little. And we can simply treat all exchange node as heavy operator, and we can take "heavy filter function" into consideration later if we have bandwith:). |
Enhancement
Current Local Runtime Filter design can be found here. However, there is no noticable performance improvement for TPCH 100 benchmark. After doing some quick experiments, we confirmed that we can expand the current local runtime filter to acheive 10%+ performance improvements:
Expand the scope of current local runtime filter to include broadcast hash join that across multiple tasks. It can reduce some exchange cost.
For example, A Join (Agg_on_B), A is chosen as build table and broadcast to all TiFlash probe nodes. B first applies two-phase agg operator, and the agg's output is used as probe side. This case doesn't match current local runtime filter pattern, thus won't generate any runtime filters. By expanding the scope, we can push down the filter to TableScan of B, reducing exchange cost introduced by two-phase-agg.
In TiFlash, push runtime filter down to storage layer using late materialization techs.
In TiFlash, current runtime filters will take effect as RS operator in storage layer. RS operator uses min max index inside to implement RS-IN filter operation. And the filter effect of RS-IN filter is not good in TPCH cases even if the Real-IN filter has very good filter effect. For example: if one column "pack" contains 1024 integer values, from 1 to 1024, no duplicate values. And the pushed down IN filter contains {3, 10, 1023}, then for RS-IN filter, the total "pack" pass the filter, and no data is filtered; for Real-IN filter, only 3 values can pass the filter.
TiDB & TiPB
TiFlash
The text was updated successfully, but these errors were encountered: