Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Runtime Filter in TiDB #40220

Closed
3 tasks
elsa0520 opened this issue Dec 28, 2022 · 2 comments · Fixed by #42899
Closed
3 tasks

Support Runtime Filter in TiDB #40220

elsa0520 opened this issue Dec 28, 2022 · 2 comments · Fixed by #42899
Assignees
Labels
type/feature-request Categorizes issue or PR as related to a new feature.

Comments

@elsa0520
Copy link
Contributor

elsa0520 commented Dec 28, 2022

Goal

Improve the efficiency of Join execution in OLAP scenarios.

Background

PhysicalHashJoin completes the Join by constructing a HashTable from the data in the right table, and the data in the left table keeps probing the HashTable. If the Join Key value in part of the Probe process cannot Hit HashTable, then this part of the data in the description does not exist in the right table and will not appear in the last Join result.

In other words, this part of the data from PhysicalTableScan to send to PhysicalHashJoin is meaningless. If you can filter out this part of the Join Key data in advance, it will reduce scanning time and network overhead, thereby greatly improving Join efficiency .

Runtime Filter

Runtime Filter is a predicate that is generated during query planning The predicate that takes values dynamically. This predicate has the same function as other predicates in Selection. It is applied to filter rows in TableScan that do not meet the predicate condition.

The only difference is that the parameter values of the Runtime Filter predicate are constructed in HashJoin.

The following is an example:
T1 is a fact table with 100w data rows and T2 is a dimension table with 100 data rows.
select * from T1, T2 where T1.id=T2.id
The RF execution method is to scan the data of T2 first, PhysicalHashJoin calculates a filter based on the data of T2, such as the maximum and minimum values of the T2 data. Then send this filter to TableFullScan waiting to scan T1. T1 then applies this filter and gives the filtered data to PhysicalHashJoin, thereby reducing the rows of probe hash tables and network overhead.

                         2. build RF values
            +-------->+-------------------+
            |         |PhysicalHashJoin   |<-----+
            |    +----+                   |      |
4.after RF  |    |    +-------------------+      | 1. scan T2
    5000    |    |3. send RF                     |      100
            |    | filter data                   |
            |    |                               |
      +-----+----v------+                +-------+--------+
      |  TableFullScan  |                | TabelFullScan  |
      |      T1         |                |      T2        |
      +-----------------+                +----------------+

In summary, Runtime Filters are generated in the planner, built in PhysicalHashJoin, and applied in TableScan.

Type of predicate

  1. IN, it is suitable for the case where the Join Key column has a small number of values. The predicate is of the form T1.id in (1, 5, 7, 8, 10)
  2. MIN MAX, suitable for columns where the Join Key column is a range value. T1.id > 1 and T1.id < 10
  3. Bloom Filter, suitable for situations where there are many values in the Join Key column.

TiFlash or TiKV

There are two storage engine in TiDB: TiFlash and TiKV. To achieve the best efficiency, RF necessary to combine the "predicate push down" and "delayed materialization" of the storage engine. However, TiKV does not have the function of delayed materialization because it is line storage engine, so RF applied to TiKV is not ideal.
Only TiFlash supports RF.

Precondition

Stage goal

  1. The first phase supports Broadcast local, IN the situation.
  2. The second phase supports Shuffle Global's RF, as well as Bloom Filter, Min Max, BF/IN.

Design

Runtime Filter needs to be planned in the Planner and executed in Compute Engine. So the design is mainly divided into two parts:

  1. Planner: Based on the matching pattern, generate RF and assign to the correct target node. ( TiDB )
  2. Compute Engine: HashJoinNode builds RF and applies to target node ( TiFlash )
    This article does not cover the design of Global's RF.

Planner design

The planner part of the RF is mainly used to identify HashJoin nodes through the structure of Tree, and plan the RF that meet the Patterns to the corresponding target node. According to the above logic, the Planner can be split into two core processes:

  1. Generate Filter: Responsible for matching HashJoinNode that meets the Patterns, and combining EqPredicates on HashJoinNode to construct RF.
  2. Assign Filter: Find the target node according to the constructed RF.

Process
Preorder traversal PhysicalPlanTree. Generate Filter when encountering PhysicalHashJoin node, Assign Filter when encountering TableFullScan.

Generate Filter

Patterns are following:

  1. PhysicalHashJoin : exclude Left outer,full outer,anti join
  2. Predicate
  • eq predicate
  • The left child is only column expr, or cast (column expr).
  • Right child types: numeric type, bool, character type. (Other json, blob, binary, etc. are not supported)
  • Evaluate selectivity

Assign Filter

The relationship between the Hash Join node and the Target Node node is shown in the figure below, and in real cases, one or more nodes may be interspersed between the two.
image

If TableScan contains the same Column as RF (the same unique id), then this TableScan will be assigned the current RF.

  • Principle: RF column can be pushed down as long as it has not undergone any other transformation, and the untransformed RF column is the same as the column unique id in TargetNode.

Compute Engine design

The main work of the Runtime Filter at the execution layer is as follows:

  1. Compile: Translate the pb structure into executable RF, and associate the Build and Probe sides through Ref.
  2. Generator Values: During the execution of HashBuild, the values of the required columns in each Block are continuously collected and RF expressions are constructed
  3. Apply RF: When RF is ready, push RF down to the storage engine as a normal expression.

Compile

As shown in the figure, if a RF is encountered in the Compile stage, the corresponding vector < RuntimeFilter > is constructed RF and the reference is stored in HashBuild and Unordered respectively. The two are held together, one is responsible for constructing and the other is responsible for using.
image

Generator Values

Generator Values continuously obtain new Block data through the read method of HashBuild and obtain the data in the Block according to the column name required by the RF and increment to update the RuntimeFilter.

RF cost control
If IN values are too many, it may cause the RF to run too inefficiently. So we need to add a runtime_filter_max_in_values variable here to control that if the maximum value is exceeded, RF is discarded.

Apply RF

The role of the Apply RF is mainly to RF application to the Scan thread, and its location is more important in the whole process. The following are two prerequisites:

  1. After RF ready: Occurs after RF construction. Since RF must wait for the Probe to complete the collection of Join Values before it can be applied by the Target Node. Therefore, the TargetNode's read must occur after RF Read.
    • is_ready: HashJoin and TargetNode jointly hold is_ready variable. TargetNode will not enable scan out of the box until is_ready variable is set to true.
  2. Before scan thread: Occurs before the scan thread starts. If the scan of the storage layer has already started, even after the RF is built, it cannot be pushed down to the storage layer for filtering. It cannot reduce the IO effect.
    • runtime_filter_wait_time_ms: Maximum waiting time for the runtime filter. If this waiting time is exceeded, no RF is performed.
      image

Added session variables

  1. runtime_filter_type:in,min_max, bf,in/bf。
  2. runtime_filter_mode:local,global
  3. runtime_filter_wait_time_ms: the most time spent waiting for the target node, more than this time does not wait for RF. (for TiFlash for)
  4. runtime_filter_in_max_values: The number of values used to control IN this RF, if it exceeds RF will not be run. (for TiFlash for)

Observability

  1. Explain: RF id, RF producer, PF consumer, reference column. Used to check the correctness of the planner layer RF.
MySQL [ml_test]> explain select * from  t1, t2 where t1.k2=t2.k2;
+------------------------------+----------+-----------+---------------+---------------------------------------------------------------------------+
| id                           | estRows  | task      | access object | operator info                                                             |
+------------------------------+----------+-----------+---------------+---------------------------------------------------------------------------+
| HashJoin_8                   | 12487.50 | root      |               | inner join, equal:[eq(ml_test.t1.k2, ml_test.t2.k2)], rf_1: producer t2.k2|
| ├─TableReader_15(Build)      | 9990.00  | root      |               | data:Selection_14                                                        |
| │ └─Selection_14             | 9990.00  | cop[tikv] |               | not(isnull(ml_test.t2.k2))                                              |
| │   └─TableFullScan_13       | 10000.00 | cop[tikv] | table:t2      | keep order:false, stats:pseudo                                          |
| └─TableReader_12(Probe)      | 9990.00  | root      |               | data:Selection_11                                                        |
|   └─Selection_11             | 9990.00  | cop[tikv] |               | not(isnull(ml_test.t1.k2))                                               |
|     └─TableFullScan_10       | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo , rf_1: consumer t1.k2                    |
+------------------------------+----------+-----------+---------------+---------------------------------------------------------------------------+
  1. Profile display for RF performance tuning and execution data presentation.
    • RF waiting time
    • RF construction time
    • Changes in data volume before and after RF
    • etc
@elsa0520 elsa0520 added the type/feature-request Categorizes issue or PR as related to a new feature. label Dec 28, 2022
@elsa0520
Copy link
Contributor Author

elsa0520 commented Dec 28, 2022

@fixdb Could you please assign to me and add label ?

@fixdb
Copy link
Contributor

fixdb commented Dec 29, 2022

/assign @elsa0520

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature-request Categorizes issue or PR as related to a new feature.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants