Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix HashAgg cannot pushdown to tiflash_compute #40828

Merged
merged 16 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ func TestDisaggregatedTiFlash(t *testing.T) {
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
require.Contains(t, err.Error(), "Please check tiflash_compute node is available")
require.Contains(t, err.Error(), "tiflash_compute node is unavailable")

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
Expand Down Expand Up @@ -1304,9 +1304,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

needCheckTiFlashComputeNode := "false"
failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode))
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery")
tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")

tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ go_library(
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
"//store/driver/backoff",
"//table",
"//table/tables",
"//table/temptable",
Expand Down
13 changes: 9 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2015,15 +2015,16 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
// In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated.
// So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask.
isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash
isDisaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash
isDisaggregatedTiFlashPath := isDisaggregatedTiFlash && ts.StoreType == kv.TiFlash
canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed()
if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash {
if ts.KeepOrder {
return invalidTask, nil
}
if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) {
if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) {
// If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution.
// But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table.
// But in disaggregated tiflash mode, we enable using mpp for static pruning partition table, because cop and batchCop is deprecated.
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return invalidTask, nil
}
Expand Down Expand Up @@ -2052,7 +2053,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
// So have to return a rootTask, but prop requires mppTask, cannot meet this requirement.
task = invalidTask
} else if prop.TaskTp == property.RootTaskType {
// when got here, canMppConvertToRootForDisaggregatedTiFlash is true.
// When got here, canMppConvertToRootForDisaggregatedTiFlash is true.
// This is for situations like cannot generate mppTask for some operators.
// Such as when the build side of HashJoin is Projection,
// which cannot pushdown to tiflash(because TiFlash doesn't support some expr in Proj)
// So HashJoin cannot pushdown to tiflash. But we still want TableScan to run on tiflash.
task = mppTask
task = task.convertToRootTask(ds.ctx)
if !task.invalid() {
Expand Down
19 changes: 0 additions & 19 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -50,7 +49,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
Expand All @@ -67,7 +65,6 @@ import (
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/size"
"github.com/tikv/client-go/v2/tikv"
)

const (
Expand Down Expand Up @@ -692,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
ds.preferStoreType = 0
return
}
if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) {
// TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available.
errMsg := "No available tiflash_compute node"
warning := ErrInternal.GenWithStack(errMsg)
ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
return
}
for _, path := range ds.possibleAccessPaths {
if path.StoreType == kv.TiFlash {
ds.preferStoreType |= preferTiFlash
Expand All @@ -716,15 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
}
}

func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool {
bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil)
stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil || len(stores) == 0 {
return false
}
return true
}

func resetNotNullFlag(schema *expression.Schema, start, end int) {
for i := start; i < end; i++ {
col := *schema.Columns[i]
Expand Down
73 changes: 73 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -2626,3 +2627,75 @@ func TestCountStarForTiFlash(t *testing.T) {
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
}
}

func TestHashAggPushdownToTiFlashCompute(t *testing.T) {
var (
input []string
output []struct {
SQL string
Plan []string
Warning []string
}
)
planSuiteData := core.GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists tbl_15;")
tk.MustExec(`create table tbl_15 (col_89 text (473) collate utf8mb4_bin ,
col_90 timestamp default '1976-04-03' ,
col_91 tinyint unsigned not null ,
col_92 tinyint ,
col_93 double not null ,
col_94 datetime not null default '1970-06-08' ,
col_95 datetime default '2028-02-13' ,
col_96 int unsigned not null default 2532480521 ,
col_97 char (168) default '') partition by hash (col_91) partitions 4;`)

tk.MustExec("drop table if exists tbl_16;")
tk.MustExec(`create table tbl_16 (col_98 text (246) not null ,
col_99 decimal (30 ,19) ,
col_100 mediumint unsigned ,
col_101 text (410) collate utf8mb4_bin ,
col_102 date not null ,
col_103 timestamp not null default '2003-08-27' ,
col_104 text (391) not null ,
col_105 date default '2010-10-24' ,
col_106 text (9) not null,primary key (col_100, col_98(5), col_103),
unique key idx_23 (col_100, col_106 (3), col_101 (3))) partition by hash (col_100) partitions 2;`)

config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
tableName := tblInfo.Name.L
if tableName == "tbl_15" || tableName == "tbl_16" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
tk.MustExec("set @@tidb_partition_prune_mode = 'static';")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
}
}
24 changes: 2 additions & 22 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -1453,8 +1452,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
var outputComputeNodeErrMsg bool
noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx)
for i := len(paths) - 1; i >= 0; i-- {
// availableEngineStr is for warning message.
if _, ok := availableEngine[paths[i].StoreType]; !ok {
Expand All @@ -1464,20 +1461,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
}
availableEngineStr += paths[i].StoreType.Name()
}
_, exists := isolationReadEngines[paths[i].StoreType]
// Prune this path if:
// 1. path.StoreType doesn't exists in isolationReadEngines or
// 2. TiFlash is disaggregated and the number of tiflash_compute node is zero.
shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash
failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) {
// Ignore check if tiflash_compute node number.
// After we support disaggregated tiflash in test framework, can delete this failpoint.
shouldPruneTiFlashCompute = val.(bool)
})
if shouldPruneTiFlashCompute {
outputComputeNodeErrMsg = true
}
if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute {
if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert to original code(https://github.com/pingcap/tidb/pull/33535/files#diff-eaf58bd971f17f156e1423b286ac4b2a6434e1b434caa42f2ccea25f8e2ed83bL1403), because better to check tiflash_compute nodes when dispatch task instead of explain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if there is no compute node when dispatching tasks, an error will be thrown?

paths = append(paths[:i], paths[i+1:]...)
}
}
Expand All @@ -1486,11 +1470,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
if len(paths) == 0 {
helpMsg := ""
if engineVals == "tiflash" {
if outputComputeNodeErrMsg {
helpMsg = ". Please check tiflash_compute node is available"
} else {
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
}
helpMsg = ". Please check tiflash replica or ensure the query is readonly"
}
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(),
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg))
Expand Down
6 changes: 6 additions & 0 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
if err != nil {
return nil, err
}
// Update mode of new generated firstRow as other agg funcs.
if len(agg.AggFuncs) != 0 {
firstRow.Mode = agg.AggFuncs[0].Mode
} else {
firstRow.Mode = aggregation.Partial1Mode
}
newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow)
}
tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...)
Expand Down
6 changes: 6 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,12 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task {
func (p *PhysicalUnionAll) attach2Task(tasks ...task) task {
for _, t := range tasks {
if _, ok := t.(*mppTask); ok {
if p.TP() == plancodec.TypePartitionUnion {
// In attach2MppTasks(), will attach PhysicalUnion to mppTask directly.
// But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly.
// For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask.
return invalidTask
}
return p.attach2MppTasks(tasks...)
}
}
Expand Down
8 changes: 8 additions & 0 deletions planner/core/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -1186,5 +1186,13 @@
"select a, count(*) from t group by a -- shouldn't be rewritten",
"select sum(a) from t -- sum shouldn't be rewritten"
]
},
{
"name": "TestHashAggPushdownToTiFlashCompute",
"cases": [
"select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;",
"select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;",
"select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;"
]
}
]
Loading