Skip to content

Commit

Permalink
lightning: make default file router support compression (#38515)
Browse files Browse the repository at this point in the history
ref #38514
  • Loading branch information
lichunzhu authored Oct 19, 2022
1 parent d037637 commit ff5fd4e
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 17 deletions.
9 changes: 6 additions & 3 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func MakeTableRegions(
go func() {
defer wg.Done()
for info := range fileChan {
regions, sizes, err := makeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store)
regions, sizes, err := MakeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store)
select {
case resultChan <- fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}:
case <-ctx.Done():
Expand Down Expand Up @@ -255,7 +255,8 @@ func MakeTableRegions(
return filesRegions, nil
}

func makeSourceFileRegion(
// MakeSourceFileRegion create a new source file region.
func MakeSourceFileRegion(
ctx context.Context,
meta *MDTableMeta,
fi FileInfo,
Expand Down Expand Up @@ -283,7 +284,9 @@ func makeSourceFileRegion(
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
if isCsvFile && cfg.Mydumper.StrictFormat && fi.FileMeta.Compression == CompressionNone &&
dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down
57 changes: 57 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,63 @@ func TestAllocateEngineIDs(t *testing.T) {
})
}

func TestMakeSourceFileRegion(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
},
}
filePath := "./csv/split_large_file.csv"
dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: filePath, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := 3
columns := []string{"a", "b", "c"}

ctx := context.Background()
ioWorkers := worker.NewPool(ctx, 4, "io")
store, err := storage.NewLocalStorage(".")
assert.NoError(t, err)

// test - no compression
fileInfo.FileMeta.Compression = CompressionNone
regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
offsets := [][]int64{{6, 12}, {12, 18}, {18, 24}, {24, 30}}
assert.Len(t, regions, len(offsets))
for i := range offsets {
assert.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
assert.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
assert.Equal(t, columns, regions[i].Chunk.Columns)
}

// test - gzip compression
fileInfo.FileMeta.Compression = CompressionGZ
regions, _, err = MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
assert.Len(t, regions, 1)
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset)
assert.Len(t, regions[0].Chunk.Columns, 0)
}

func TestSplitLargeFile(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Expand Down
31 changes: 18 additions & 13 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ const (
CompressionZStd
// CompressionXZ is the compression type that uses XZ algorithm.
CompressionXZ
// CompressionLZO is the compression type that uses LZO algorithm.
CompressionLZO
// CompressionSnappy is the compression type that uses Snappy algorithm.
CompressionSnappy
)

func parseSourceType(t string) (SourceType, error) {
Expand Down Expand Up @@ -109,14 +113,18 @@ func (s SourceType) String() string {

func parseCompressionType(t string) (Compression, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case "gz":
case "gz", "gzip":
return CompressionGZ, nil
case "lz4":
return CompressionLZ4, nil
case "zstd":
return CompressionZStd, nil
case "xz":
return CompressionXZ, nil
case "lzo":
return CompressionLZO, nil
case "snappy":
return CompressionSnappy, nil
case "":
return CompressionNone, nil
default:
Expand All @@ -128,15 +136,15 @@ var expandVariablePattern = regexp.MustCompile(`\$(?:\$|[\pL\p{Nd}_]+|\{[\pL\p{N

var defaultFileRouteRules = []*config.FileRouteRule{
// ignore *-schema-trigger.sql, *-schema-post.sql files
{Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql$`, Type: "ignore"},
// db schema create file pattern, matches files like '{schema}-schema-create.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$`, Schema: "$1", Table: "", Type: SchemaSchema, Unescape: true},
// table schema create file pattern, matches files like '{schema}.{table}-schema.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$`, Schema: "$1", Table: "$2", Type: TableSchema, Unescape: true},
// view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql$`, Schema: "$1", Table: "$2", Type: ViewSchema, Unescape: true},
// source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Unescape: true},
{Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql(?:\.(\w*?))?$`, Type: "ignore"},
// db schema create file pattern, matches files like '{schema}-schema-create.sql[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "", Type: SchemaSchema, Compression: "$2", Unescape: true},
// table schema create file pattern, matches files like '{schema}.{table}-schema.sql[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "$2", Type: TableSchema, Compression: "$3", Unescape: true},
// view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "$2", Type: ViewSchema, Compression: "$3", Unescape: true},
// source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Compression: "$5", Unescape: true},
}

// FileRouter provides some operations to apply a rule to route file path to target schema/table
Expand Down Expand Up @@ -292,9 +300,6 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R
if err != nil {
return err
}
if compression != CompressionNone {
return errors.New("Currently we don't support restore compressed source file yet")
}
result.Compression = compression
return nil
})
Expand Down
34 changes: 33 additions & 1 deletion br/pkg/lightning/mydump/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ func TestRouteParser(t *testing.T) {
}
}

func TestDefaultRouter(t *testing.T) {
r, err := NewFileRouter(defaultFileRouteRules, log.L())
assert.NoError(t, err)

inputOutputMap := map[string][]string{
"a/test-schema-create.sql": {"test", "", "", "", SchemaSchema},
"test-schema-create.sql.gz": {"test", "", "", "gz", SchemaSchema},
"c/d/test.t-schema.sql": {"test", "t", "", "", TableSchema},
"test.t-schema.sql.lzo": {"test", "t", "", "lzo", TableSchema},
"/bc/dc/test.v1-schema-view.sql": {"test", "v1", "", "", ViewSchema},
"test.v1-schema-view.sql.snappy": {"test", "v1", "", "snappy", ViewSchema},
"my_schema.my_table.sql": {"my_schema", "my_table", "", "", "sql"},
"/test/123/my_schema.my_table.sql.gz": {"my_schema", "my_table", "", "gz", "sql"},
"my_dir/my_schema.my_table.csv.lzo": {"my_schema", "my_table", "", "lzo", "csv"},
"my_schema.my_table.0001.sql.snappy": {"my_schema", "my_table", "0001", "snappy", "sql"},
}
for path, fields := range inputOutputMap {
res, err := r.Route(path)
assert.NoError(t, err)
compress, e := parseCompressionType(fields[3])
assert.NoError(t, e)
ty, e := parseSourceType(fields[4])
assert.NoError(t, e)
exp := &RouteResult{filter.Table{Schema: fields[0], Name: fields[1]}, fields[2], compress, ty}
assert.Equal(t, exp, res)
}
}

func TestInvalidRouteRule(t *testing.T) {
rule := &config.FileRouteRule{}
rules := []*config.FileRouteRule{rule}
Expand Down Expand Up @@ -112,7 +140,6 @@ func TestSingleRouteRule(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, r)
invalidMatchPaths := []string{
"my_schema.my_table.sql.gz",
"my_schema.my_table.sql.rar",
"my_schema.my_table.txt",
}
Expand All @@ -121,6 +148,11 @@ func TestSingleRouteRule(t *testing.T) {
assert.Nil(t, res)
assert.Error(t, err)
}

res, err := r.Route("my_schema.my_table.sql.gz")
assert.NoError(t, err)
exp := &RouteResult{filter.Table{Schema: "my_schema", Name: "my_table"}, "", CompressionGZ, SourceTypeSQL}
assert.Equal(t, exp, res)
}

func TestMultiRouteRule(t *testing.T) {
Expand Down

0 comments on commit ff5fd4e

Please sign in to comment.