diff --git a/br/pkg/lightning/mydump/reader.go b/br/pkg/lightning/mydump/reader.go index 2988c3675dfa9..4837b35aceab2 100644 --- a/br/pkg/lightning/mydump/reader.go +++ b/br/pkg/lightning/mydump/reader.go @@ -70,6 +70,13 @@ func decodeCharacterSet(data []byte, characterSet string) ([]byte, error) { // ExportStatement exports the SQL statement in the schema file. func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error) { + if sqlFile.FileMeta.Compression != CompressionNone { + compressType, err := ToStorageCompressType(sqlFile.FileMeta.Compression) + if err != nil { + return nil, errors.Trace(err) + } + store = storage.WithCompression(store, compressType) + } fd, err := store.Open(ctx, sqlFile.FileMeta.Path) if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/lightning/mydump/reader_test.go b/br/pkg/lightning/mydump/reader_test.go index e7506ea869782..1f67f2c31c43a 100644 --- a/br/pkg/lightning/mydump/reader_test.go +++ b/br/pkg/lightning/mydump/reader_test.go @@ -15,6 +15,7 @@ package mydump_test import ( + "compress/gzip" "context" "errors" "os" @@ -173,3 +174,28 @@ func TestExportStatementHandleNonEOFError(t *testing.T) { _, err := ExportStatement(ctx, mockStorage, f, "auto") require.Contains(t, err.Error(), "read error") } + +func TestExportStatementCompressed(t *testing.T) { + dir := t.TempDir() + file, err := os.Create(filepath.Join(dir, "tidb_lightning_test_reader")) + require.NoError(t, err) + defer os.Remove(file.Name()) + + store, err := storage.NewLocalStorage(dir) + require.NoError(t, err) + + gzipFile := gzip.NewWriter(file) + _, err = gzipFile.Write([]byte("CREATE DATABASE whatever;")) + require.NoError(t, err) + err = gzipFile.Close() + require.NoError(t, err) + stat, err := file.Stat() + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + + f := FileInfo{FileMeta: SourceFileMeta{Path: stat.Name(), FileSize: stat.Size(), Compression: CompressionGZ}} + data, err := ExportStatement(context.TODO(), store, f, "auto") + require.NoError(t, err) + require.Equal(t, []byte("CREATE DATABASE whatever;"), data) +} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 8562acc2867b3..ffd9173483896 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -31,9 +31,13 @@ import ( ) const ( - tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold // the increment ratio of large CSV file size threshold by `region-split-size` largeCSVLowerThresholdRation = 10 + // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency + // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. + TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold ) // TableRegion contains information for a table region during import. @@ -292,19 +296,34 @@ func MakeSourceFileRegion( return regions, subFileSizes, err } + fileSize := fi.FileMeta.FileSize + rowIDMax := fileSize / divisor + // for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax. + // set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files. + // TODO: update progress bar calculation for compressed files. + if fi.FileMeta.Compression != CompressionNone { + rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution. + fileSize = TableFileSizeINF + } tableRegion := &TableRegion{ DB: meta.DB, Table: meta.Name, FileMeta: fi.FileMeta, Chunk: Chunk{ Offset: 0, - EndOffset: fi.FileMeta.FileSize, + EndOffset: fileSize, PrevRowIDMax: 0, - RowIDMax: fi.FileMeta.FileSize / divisor, + RowIDMax: rowIDMax, }, } - if tableRegion.Size() > tableRegionSizeWarningThreshold { + regionTooBig := false + if fi.FileMeta.Compression == CompressionNone { + regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold + } else { + regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold + } + if regionTooBig { log.FromContext(ctx).Warn( "file is too big to be processed efficiently; we suggest splitting it at 256 MB each", zap.String("file", fi.FileMeta.Path), diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 0830d378f47ff..0c990278e65cd 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -217,7 +217,7 @@ func TestMakeSourceFileRegion(t *testing.T) { assert.NoError(t, err) assert.Len(t, regions, 1) assert.Equal(t, int64(0), regions[0].Chunk.Offset) - assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset) + assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset) assert.Len(t, regions[0].Chunk.Columns, 0) } diff --git a/br/pkg/lightning/mydump/router.go b/br/pkg/lightning/mydump/router.go index bdc2a922f12f7..bf0ccba834fe0 100644 --- a/br/pkg/lightning/mydump/router.go +++ b/br/pkg/lightning/mydump/router.go @@ -134,7 +134,7 @@ func parseCompressionType(t string) (Compression, error) { return CompressionGZ, nil case "lz4": return CompressionLZ4, nil - case "zstd": + case "zstd", "zst": return CompressionZStd, nil case "xz": return CompressionXZ, nil @@ -324,6 +324,9 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R if err != nil { return err } + if result.Type == SourceTypeParquet && compression != CompressionNone { + return errors.Errorf("can't support whole compressed parquet file, should compress parquet files by choosing correct parquet compress writer, path: %s", r.Path) + } result.Compression = compression return nil }) diff --git a/br/pkg/lightning/mydump/router_test.go b/br/pkg/lightning/mydump/router_test.go index 4e3d8a4215a0d..ab97769e30ce8 100644 --- a/br/pkg/lightning/mydump/router_test.go +++ b/br/pkg/lightning/mydump/router_test.go @@ -292,3 +292,21 @@ func TestRouteWithPath(t *testing.T) { require.NoError(t, err) require.Nil(t, res) } + +func TestRouteWithCompressedParquet(t *testing.T) { + fileName := "myschema.my_table.000.parquet.gz" + rule := &config.FileRouteRule{ + Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`, + Schema: "$1", + Table: "$2", + Type: "$4", + Key: "$3", + Compression: "$5", + Unescape: true, + } + r := *rule + router, err := NewFileRouter([]*config.FileRouteRule{&r}, log.L()) + require.NoError(t, err) + _, err = router.Route(fileName) + require.Error(t, err) +} diff --git a/br/pkg/lightning/restore/get_pre_info.go b/br/pkg/lightning/restore/get_pre_info.go index 287d59c6145a4..93927c6956809 100644 --- a/br/pkg/lightning/restore/get_pre_info.go +++ b/br/pkg/lightning/restore/get_pre_info.go @@ -444,15 +444,7 @@ func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context // ReadFirstNRowsByFileMeta reads the first N rows of an data file. // It implements the PreRestoreInfoGetter interface. func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) { - var ( - reader storage.ReadSeekCloser - err error - ) - if dataFileMeta.Type == mydump.SourceTypeParquet { - reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, dataFileMeta.Path, dataFileMeta.FileSize) - } else { - reader, err = p.srcStorage.Open(ctx, dataFileMeta.Path) - } + reader, err := openReader(ctx, dataFileMeta, p.srcStorage) if err != nil { return nil, nil, errors.Trace(err) } @@ -590,13 +582,7 @@ func (p *PreRestoreInfoGetterImpl) sampleDataFromTable( return resultIndexRatio, isRowOrdered, nil } sampleFile := tableMeta.DataFiles[0].FileMeta - var reader storage.ReadSeekCloser - var err error - if sampleFile.Type == mydump.SourceTypeParquet { - reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, sampleFile.Path, sampleFile.FileSize) - } else { - reader, err = p.srcStorage.Open(ctx, sampleFile.Path) - } + reader, err := openReader(ctx, sampleFile, p.srcStorage) if err != nil { return 0.0, false, errors.Trace(err) } diff --git a/br/pkg/lightning/restore/get_pre_info_test.go b/br/pkg/lightning/restore/get_pre_info_test.go index 8ea57d023c679..f66a76901116f 100644 --- a/br/pkg/lightning/restore/get_pre_info_test.go +++ b/br/pkg/lightning/restore/get_pre_info_test.go @@ -14,6 +14,8 @@ package restore import ( + "bytes" + "compress/gzip" "context" "database/sql" "fmt" @@ -24,6 +26,7 @@ import ( mysql_sql_driver "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/restore/mock" ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts" "github.com/pingcap/tidb/errno" @@ -412,6 +415,118 @@ INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');` require.Equal(t, theDataInfo.ExpectFirstRowDatums, rowDatums) } +func compressGz(t *testing.T, data []byte) []byte { + t.Helper() + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + _, err := w.Write(data) + require.NoError(t, err) + require.NoError(t, w.Close()) + return buf.Bytes() +} + +func TestGetPreInfoReadCompressedFirstRow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( + testCSVData01 = []byte(`ival,sval +111,"aaa" +222,"bbb" +`) + testSQLData01 = []byte(`INSERT INTO db01.tbl01 (ival, sval) VALUES (333, 'ccc'); +INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');`) + ) + + test1CSVCompressed := compressGz(t, testCSVData01) + test1SQLCompressed := compressGz(t, testSQLData01) + + testDataInfos := []struct { + FileName string + Data []byte + FirstN int + CSVConfig *config.CSVConfig + ExpectFirstRowDatums [][]types.Datum + ExpectColumns []string + }{ + { + FileName: "/db01/tbl01/data.001.csv.gz", + Data: test1CSVCompressed, + FirstN: 1, + ExpectFirstRowDatums: [][]types.Datum{ + { + types.NewStringDatum("111"), + types.NewStringDatum("aaa"), + }, + }, + ExpectColumns: []string{"ival", "sval"}, + }, + { + FileName: "/db01/tbl01/data.001.sql.gz", + Data: test1SQLCompressed, + FirstN: 1, + ExpectFirstRowDatums: [][]types.Datum{ + { + types.NewUintDatum(333), + types.NewStringDatum("ccc"), + }, + }, + ExpectColumns: []string{"ival", "sval"}, + }, + } + + tbl01SchemaBytes := []byte("CREATE TABLE db01.tbl01(id INTEGER PRIMARY KEY AUTO_INCREMENT, ival INTEGER, sval VARCHAR(64));") + tbl01SchemaBytesCompressed := compressGz(t, tbl01SchemaBytes) + + tblMockSourceData := &mock.MockTableSourceData{ + DBName: "db01", + TableName: "tbl01", + SchemaFile: &mock.MockSourceFile{ + FileName: "/db01/tbl01/tbl01.schema.sql.gz", + Data: tbl01SchemaBytesCompressed, + }, + DataFiles: []*mock.MockSourceFile{}, + } + for _, testInfo := range testDataInfos { + tblMockSourceData.DataFiles = append(tblMockSourceData.DataFiles, &mock.MockSourceFile{ + FileName: testInfo.FileName, + Data: testInfo.Data, + }) + } + mockDataMap := map[string]*mock.MockDBSourceData{ + "db01": { + Name: "db01", + Tables: map[string]*mock.MockTableSourceData{ + "tbl01": tblMockSourceData, + }, + }, + } + mockSrc, err := mock.NewMockImportSource(mockDataMap) + require.Nil(t, err) + mockTarget := mock.NewMockTargetInfo() + cfg := config.NewConfig() + cfg.TikvImporter.Backend = config.BackendLocal + ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil) + require.NoError(t, err) + + cfg.Mydumper.CSV.Header = true + tblMeta := mockSrc.GetDBMetaMap()["db01"].Tables[0] + for i, dataFile := range tblMeta.DataFiles { + theDataInfo := testDataInfos[i] + dataFile.FileMeta.Compression = mydump.CompressionGZ + cols, rowDatums, err := ig.ReadFirstNRowsByFileMeta(ctx, dataFile.FileMeta, theDataInfo.FirstN) + require.Nil(t, err) + t.Logf("%v, %v", cols, rowDatums) + require.Equal(t, theDataInfo.ExpectColumns, cols) + require.Equal(t, theDataInfo.ExpectFirstRowDatums, rowDatums) + } + + theDataInfo := testDataInfos[0] + cols, rowDatums, err := ig.ReadFirstNRowsByTableName(ctx, "db01", "tbl01", theDataInfo.FirstN) + require.NoError(t, err) + require.Equal(t, theDataInfo.ExpectColumns, cols) + require.Equal(t, theDataInfo.ExpectFirstRowDatums, rowDatums) +} + func TestGetPreInfoSampleSource(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -497,6 +612,100 @@ func TestGetPreInfoSampleSource(t *testing.T) { } } +func TestGetPreInfoSampleSourceCompressed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dataFileName := "/db01/tbl01/tbl01.data.001.csv.gz" + schemaFileData := []byte("CREATE TABLE db01.tbl01 (id INTEGER PRIMARY KEY AUTO_INCREMENT, ival INTEGER, sval VARCHAR(64));") + schemaFileDataCompressed := compressGz(t, schemaFileData) + mockDataMap := map[string]*mock.MockDBSourceData{ + "db01": { + Name: "db01", + Tables: map[string]*mock.MockTableSourceData{ + "tbl01": { + DBName: "db01", + TableName: "tbl01", + SchemaFile: &mock.MockSourceFile{ + FileName: "/db01/tbl01/tbl01.schema.sql.gz", + Data: schemaFileDataCompressed, + }, + DataFiles: []*mock.MockSourceFile{ + { + FileName: dataFileName, + Data: []byte(nil), + }, + }, + }, + }, + }, + } + mockSrc, err := mock.NewMockImportSource(mockDataMap) + require.Nil(t, err) + mockTarget := mock.NewMockTargetInfo() + cfg := config.NewConfig() + cfg.TikvImporter.Backend = config.BackendLocal + ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, ropts.WithIgnoreDBNotExist(true)) + require.NoError(t, err) + + mdDBMeta := mockSrc.GetAllDBFileMetas()[0] + mdTblMeta := mdDBMeta.Tables[0] + dbInfos, err := ig.GetAllTableStructures(ctx) + require.NoError(t, err) + + data := [][]byte{ + []byte(`id,ival,sval +1,111,"aaa" +2,222,"bbb" +`), + []byte(`sval,ival,id +"aaa",111,1 +"bbb",222,2 +`), + []byte(`id,ival,sval +2,222,"bbb" +1,111,"aaa" +`), + []byte(`sval,ival,id +"aaa",111,2 +"bbb",222,1 +`), + } + compressedData := make([][]byte, 0, 4) + for _, d := range data { + compressedData = append(compressedData, compressGz(t, d)) + } + + subTests := []struct { + Data []byte + ExpectIsOrdered bool + }{ + { + Data: compressedData[0], + ExpectIsOrdered: true, + }, + { + Data: compressedData[1], + ExpectIsOrdered: true, + }, + { + Data: compressedData[2], + ExpectIsOrdered: false, + }, + { + Data: compressedData[3], + ExpectIsOrdered: false, + }, + } + for _, subTest := range subTests { + require.NoError(t, mockSrc.GetStorage().WriteFile(ctx, dataFileName, subTest.Data)) + sampledIndexRatio, isRowOrderedFromSample, err := ig.sampleDataFromTable(ctx, "db01", mdTblMeta, dbInfos["db01"].Tables["tbl01"].Core, nil, defaultImportantVariables) + require.NoError(t, err) + t.Logf("%v, %v", sampledIndexRatio, isRowOrderedFromSample) + require.Greater(t, sampledIndexRatio, 1.0) + require.Equal(t, subTest.ExpectIsOrdered, isRowOrderedFromSample) + } +} + func TestGetPreInfoEstimateSourceSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/br/pkg/lightning/restore/mock/mock.go b/br/pkg/lightning/restore/mock/mock.go index f43e6c022673e..5556e1caf3363 100644 --- a/br/pkg/lightning/restore/mock/mock.go +++ b/br/pkg/lightning/restore/mock/mock.go @@ -77,14 +77,19 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport tblMeta := mydump.NewMDTableMeta("binary") tblMeta.DB = dbName tblMeta.Name = tblName + compression := mydump.CompressionNone + if strings.HasSuffix(tblData.SchemaFile.FileName, ".gz") { + compression = mydump.CompressionGZ + } tblMeta.SchemaFile = mydump.FileInfo{ TableName: filter.Table{ Schema: dbName, Name: tblName, }, FileMeta: mydump.SourceFileMeta{ - Path: tblData.SchemaFile.FileName, - Type: mydump.SourceTypeTableSchema, + Path: tblData.SchemaFile.FileName, + Type: mydump.SourceTypeTableSchema, + Compression: compression, }, } tblMeta.DataFiles = []mydump.FileInfo{} @@ -108,12 +113,17 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport FileSize: int64(fileSize), }, } + fileName := tblDataFile.FileName + if strings.HasSuffix(fileName, ".gz") { + fileName = strings.TrimSuffix(tblDataFile.FileName, ".gz") + fileInfo.FileMeta.Compression = mydump.CompressionGZ + } switch { - case strings.HasSuffix(tblDataFile.FileName, ".csv"): + case strings.HasSuffix(fileName, ".csv"): fileInfo.FileMeta.Type = mydump.SourceTypeCSV - case strings.HasSuffix(tblDataFile.FileName, ".sql"): + case strings.HasSuffix(fileName, ".sql"): fileInfo.FileMeta.Type = mydump.SourceTypeSQL - case strings.HasSuffix(tblDataFile.FileName, ".parquet"): + case strings.HasSuffix(fileName, ".parquet"): fileInfo.FileMeta.Type = mydump.SourceTypeParquet default: return nil, errors.Errorf("unsupported file type: %s", tblDataFile.FileName) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 99b56d05414ce..210435640473f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2199,23 +2199,7 @@ func newChunkRestore( ) (*chunkRestore, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - var ( - reader storage.ReadSeekCloser - compressType storage.CompressType - err error - ) - switch { - case chunk.FileMeta.Type == mydump.SourceTypeParquet: - reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize) - case chunk.FileMeta.Compression != mydump.CompressionNone: - compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression) - if err != nil { - break - } - reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path) - default: - reader, err = store.Open(ctx, chunk.FileMeta.Path) - } + reader, err := openReader(ctx, chunk.FileMeta, store) if err != nil { return nil, errors.Trace(err) } @@ -2790,3 +2774,20 @@ func (cr *chunkRestore) restore( } return errors.Trace(firstErr(encodeErr, deliverErr)) } + +func openReader(ctx context.Context, fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) ( + reader storage.ReadSeekCloser, err error) { + switch { + case fileMeta.Type == mydump.SourceTypeParquet: + reader, err = mydump.OpenParquetReader(ctx, store, fileMeta.Path, fileMeta.FileSize) + case fileMeta.Compression != mydump.CompressionNone: + compressType, err2 := mydump.ToStorageCompressType(fileMeta.Compression) + if err2 != nil { + return nil, err2 + } + reader, err = storage.WithCompression(store, compressType).Open(ctx, fileMeta.Path) + default: + reader, err = store.Open(ctx, fileMeta.Path) + } + return +} diff --git a/br/tests/lightning_compress/config.toml b/br/tests/lightning_compress/config.toml new file mode 100644 index 0000000000000..000018c5c41d4 --- /dev/null +++ b/br/tests/lightning_compress/config.toml @@ -0,0 +1,18 @@ +[mydumper.csv] +separator = ',' +delimiter = '"' +header = true +not-null = false +null = '\N' +backslash-escape = true +trim-last-separator = false + +[checkpoint] +enable = true +schema = "tidb_lightning_checkpoint_test" +driver = "mysql" +keep-after-success = true + +[tikv-importer] +send-kv-pairs=10 +region-split-size = 1024 diff --git a/br/tests/lightning_compress/data.gzip/compress-schema-create.sql.gz b/br/tests/lightning_compress/data.gzip/compress-schema-create.sql.gz new file mode 100644 index 0000000000000..6571d2a15b507 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress-schema-create.sql.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.empty_strings-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.empty_strings-schema.sql.gz new file mode 100644 index 0000000000000..542898561bab1 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.empty_strings-schema.sql.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.empty_strings.000000000.csv.gz b/br/tests/lightning_compress/data.gzip/compress.empty_strings.000000000.csv.gz new file mode 100644 index 0000000000000..bfa13ed67b006 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.empty_strings.000000000.csv.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.escapes-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.escapes-schema.sql.gz new file mode 100644 index 0000000000000..bed4b7859ac92 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.escapes-schema.sql.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.escapes.000000000.csv.gz b/br/tests/lightning_compress/data.gzip/compress.escapes.000000000.csv.gz new file mode 100644 index 0000000000000..37028e36d9de8 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.escapes.000000000.csv.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.multi_rows-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.multi_rows-schema.sql.gz new file mode 100644 index 0000000000000..328fed9cb3df8 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.multi_rows-schema.sql.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.multi_rows.000000000.csv.gz b/br/tests/lightning_compress/data.gzip/compress.multi_rows.000000000.csv.gz new file mode 100644 index 0000000000000..c732af263d576 Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.multi_rows.000000000.csv.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.threads-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.threads-schema.sql.gz new file mode 100644 index 0000000000000..1782675bfc7fe Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.threads-schema.sql.gz differ diff --git a/br/tests/lightning_compress/data.gzip/compress.threads.000000000.csv.gz b/br/tests/lightning_compress/data.gzip/compress.threads.000000000.csv.gz new file mode 100644 index 0000000000000..683eade1cdb9f Binary files /dev/null and b/br/tests/lightning_compress/data.gzip/compress.threads.000000000.csv.gz differ diff --git a/br/tests/lightning_compress/data.snappy/compress-schema-create.sql.snappy b/br/tests/lightning_compress/data.snappy/compress-schema-create.sql.snappy new file mode 100644 index 0000000000000..afa2211c77475 Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress-schema-create.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.empty_strings-schema.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.empty_strings-schema.sql.snappy new file mode 100644 index 0000000000000..cab30d082385a Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.empty_strings-schema.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.empty_strings.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.empty_strings.000000000.sql.snappy new file mode 100644 index 0000000000000..9c81e8f78f234 Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.empty_strings.000000000.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.escapes-schema.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.escapes-schema.sql.snappy new file mode 100644 index 0000000000000..9e27befa522a0 Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.escapes-schema.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.escapes.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.escapes.000000000.sql.snappy new file mode 100644 index 0000000000000..1380b47d9881e Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.escapes.000000000.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.multi_rows-schema.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.multi_rows-schema.sql.snappy new file mode 100644 index 0000000000000..5cc0365d1c65d Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.multi_rows-schema.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.multi_rows.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.multi_rows.000000000.sql.snappy new file mode 100644 index 0000000000000..7f5bf585e106c Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.multi_rows.000000000.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.threads-schema.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.threads-schema.sql.snappy new file mode 100644 index 0000000000000..b1c8b89565bfb Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.threads-schema.sql.snappy differ diff --git a/br/tests/lightning_compress/data.snappy/compress.threads.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.threads.000000000.sql.snappy new file mode 100644 index 0000000000000..dc7c1ee8adc0b Binary files /dev/null and b/br/tests/lightning_compress/data.snappy/compress.threads.000000000.sql.snappy differ diff --git a/br/tests/lightning_compress/data.zstd/compress-schema-create.sql.zst b/br/tests/lightning_compress/data.zstd/compress-schema-create.sql.zst new file mode 100644 index 0000000000000..12bdbd710973e Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress-schema-create.sql.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.empty_strings-schema.sql.zst b/br/tests/lightning_compress/data.zstd/compress.empty_strings-schema.sql.zst new file mode 100644 index 0000000000000..f9b922954ff3d Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.empty_strings-schema.sql.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.empty_strings.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.empty_strings.000000000.csv.zst new file mode 100644 index 0000000000000..aa89918bb2cee Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.empty_strings.000000000.csv.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.escapes-schema.sql.zst b/br/tests/lightning_compress/data.zstd/compress.escapes-schema.sql.zst new file mode 100644 index 0000000000000..fa4b4e6b3497d Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.escapes-schema.sql.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.escapes.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.escapes.000000000.csv.zst new file mode 100644 index 0000000000000..40994e745bdf3 Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.escapes.000000000.csv.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.multi_rows-schema.sql.zst b/br/tests/lightning_compress/data.zstd/compress.multi_rows-schema.sql.zst new file mode 100644 index 0000000000000..d64a9a4a879d3 Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.multi_rows-schema.sql.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.multi_rows.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.multi_rows.000000000.csv.zst new file mode 100644 index 0000000000000..4db1bea4c69f9 Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.multi_rows.000000000.csv.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.threads-schema.sql.zst b/br/tests/lightning_compress/data.zstd/compress.threads-schema.sql.zst new file mode 100644 index 0000000000000..3a41c8de4816c Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.threads-schema.sql.zst differ diff --git a/br/tests/lightning_compress/data.zstd/compress.threads.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.threads.000000000.csv.zst new file mode 100644 index 0000000000000..13eef0ba83011 Binary files /dev/null and b/br/tests/lightning_compress/data.zstd/compress.threads.000000000.csv.zst differ diff --git a/br/tests/lightning_compress/run.sh b/br/tests/lightning_compress/run.sh new file mode 100755 index 0000000000000..bf48b09b2cccd --- /dev/null +++ b/br/tests/lightning_compress/run.sh @@ -0,0 +1,61 @@ +#!/bin/sh + +set -eu + +for BACKEND in tidb local; do + for compress in gzip snappy zstd; do + if [ "$BACKEND" = 'local' ]; then + check_cluster_version 4 0 0 'local backend' || continue + fi + + # Set minDeliverBytes to a small enough number to only write only 1 row each time + # Set the failpoint to kill the lightning instance as soon as one row is written + PKG="github.com/pingcap/tidb/br/pkg/lightning/restore" + export GO_FAILPOINTS="$PKG/SlowDownWriteRows=sleep(1000);$PKG/FailAfterWriteRows=panic;$PKG/SetMinDeliverBytes=return(1)" + + # Start importing the tables. + run_sql 'DROP DATABASE IF EXISTS compress' + run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test' + set +e + run_lightning --backend $BACKEND -d "tests/$TEST_NAME/data.$compress" --enable-checkpoint=1 2> /dev/null + set -e + + # restart lightning from checkpoint, the second line should be written successfully + export GO_FAILPOINTS= + set +e + run_lightning --backend $BACKEND -d "tests/$TEST_NAME/data.$compress" --enable-checkpoint=1 2> /dev/null + set -e + + run_sql 'SELECT count(*), sum(PROCESSLIST_TIME), sum(THREAD_OS_ID), count(PROCESSLIST_STATE) FROM compress.threads' + check_contains 'count(*): 43' + check_contains 'sum(PROCESSLIST_TIME): 322253' + check_contains 'sum(THREAD_OS_ID): 303775702' + check_contains 'count(PROCESSLIST_STATE): 3' + + run_sql 'SELECT count(*) FROM compress.threads WHERE PROCESSLIST_TIME IS NOT NULL' + check_contains 'count(*): 12' + + run_sql 'SELECT count(*) FROM compress.multi_rows WHERE a="aaaaaaaaaa"' + check_contains 'count(*): 100000' + + run_sql 'SELECT hex(t), j, hex(b) FROM compress.escapes WHERE i = 1' + check_contains 'hex(t): 5C' + check_contains 'j: {"?": []}' + check_contains 'hex(b): FFFFFFFF' + + run_sql 'SELECT hex(t), j, hex(b) FROM compress.escapes WHERE i = 2' + check_contains 'hex(t): 22' + check_contains 'j: "\n\n\n"' + check_contains 'hex(b): 0D0A0D0A' + + run_sql 'SELECT hex(t), j, hex(b) FROM compress.escapes WHERE i = 3' + check_contains 'hex(t): 0A' + check_contains 'j: [",,,"]' + check_contains 'hex(b): 5C2C5C2C' + + run_sql 'SELECT id FROM compress.empty_strings WHERE a = """"' + check_contains 'id: 3' + run_sql 'SELECT id FROM compress.empty_strings WHERE b <> ""' + check_not_contains 'id:' + done +done diff --git a/dumpling/tests/e2e/run.sh b/dumpling/tests/e2e/run.sh index f5da32acc33e0..73b580ca594d9 100644 --- a/dumpling/tests/e2e/run.sh +++ b/dumpling/tests/e2e/run.sh @@ -37,4 +37,24 @@ run_lightning $cur/conf/lightning.toml # check mysql and tidb data check_sync_diff $cur/conf/diff_config.toml +# test e2e with compress option again + +# drop database on tidb +export DUMPLING_TEST_PORT=4000 +run_sql "drop database if exists $DB_NAME;" + +export DUMPLING_TEST_PORT=3306 + +# dumping +export DUMPLING_TEST_DATABASE=$DB_NAME +rm -rf $DUMPLING_OUTPUT_DIR +run_dumpling --compress "snappy" + +cat "$cur/conf/lightning.toml" +# use lightning import data to tidb +run_lightning $cur/conf/lightning.toml + +# check mysql and tidb data +check_sync_diff $cur/conf/diff_config.toml + diff --git a/dumpling/tests/e2e_csv/run.sh b/dumpling/tests/e2e_csv/run.sh index d80e321d9294a..9c5afaca469d7 100644 --- a/dumpling/tests/e2e_csv/run.sh +++ b/dumpling/tests/e2e_csv/run.sh @@ -24,6 +24,7 @@ run_sql_file "$DUMPLING_TEST_DIR/data/e2e_csv.t.sql" run() { echo "*** running subtest case ***" + echo "compress is $compress" echo "escape_backslash is $escape_backslash" echo "csv_delimiter is $csv_delimiter" echo "csv_separator is $csv_separator" @@ -36,7 +37,11 @@ run() { # dumping export DUMPLING_TEST_PORT=3306 export DUMPLING_TEST_DATABASE=$DB_NAME - run_dumpling --filetype="csv" --escape-backslash=$escape_backslash --csv-delimiter="$csv_delimiter" --csv-separator="$csv_separator" + rm -rf "$DUMPLING_OUTPUT_DIR" + if [ $compress = "space" ]; then + compress="" + fi + run_dumpling --filetype="csv" --escape-backslash=$escape_backslash --csv-delimiter="$csv_delimiter" --csv-separator="$csv_separator" --compress="$compress" # construct lightning configuration mkdir -p $DUMPLING_TEST_DIR/conf @@ -67,18 +72,22 @@ run() { escape_backslash_arr="true false" csv_delimiter_arr="\" '" csv_separator_arr=', a aa |*|' +compress_arr='space gzip snappy zstd' -for escape_backslash in $escape_backslash_arr +for compress in $compress_arr do - for csv_separator in $csv_separator_arr + for escape_backslash in $escape_backslash_arr do - for csv_delimiter in $csv_delimiter_arr + for csv_separator in $csv_separator_arr do - run + for csv_delimiter in $csv_delimiter_arr + do + run + done + if [ "$escape_backslash" = "true" ]; then + csv_delimiter="" + run + fi done - if [ "$escape_backslash" = "true" ]; then - csv_delimiter="" - run - fi done done