From 5a20f0095ae3e391e79e1aacb6d0db66079e3888 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 20 Dec 2024 20:35:05 +0000 Subject: [PATCH] mycdc: cleanup tests for MySQL CDC --- internal/impl/mysql/input_mysql_stream.go | 13 +- internal/impl/mysql/integration_test.go | 169 +++++++++++----------- 2 files changed, 97 insertions(+), 85 deletions(-) diff --git a/internal/impl/mysql/input_mysql_stream.go b/internal/impl/mysql/input_mysql_stream.go index 0de203f017..a8c7463ec1 100644 --- a/internal/impl/mysql/input_mysql_stream.go +++ b/internal/impl/mysql/input_mysql_stream.go @@ -419,8 +419,17 @@ func prepSnapshotScannerAndMappers(cols []*sql.ColumnType) (values []any, mapper return s.Time, nil } case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT", "YEAR": - val = new(sql.Null[int]) - mapper = snapshotValueMapper[int] + val = new(sql.NullInt64) + mapper = func(v any) (any, error) { + s, ok := v.(*sql.NullInt64) + if !ok { + return nil, fmt.Errorf("expected %T got %T", int64(0), v) + } + if !s.Valid { + return nil, nil + } + return int(s.Int64), nil + } case "DECIMAL", "NUMERIC": val = new(sql.NullString) mapper = stringMapping(func(s string) (any, error) { diff --git a/internal/impl/mysql/integration_test.go b/internal/impl/mysql/integration_test.go index 5dca1a1870..cc30254b0b 100644 --- a/internal/impl/mysql/integration_test.go +++ b/internal/impl/mysql/integration_test.go @@ -40,6 +40,7 @@ func (db *testDB) Exec(query string, args ...any) { } func setupTestWithMySQLVersion(t *testing.T, version string) (string, *testDB) { + t.Parallel() integration.CheckSkip(t) pool, err := dockertest.NewPool("") require.NoError(t, err) @@ -105,14 +106,15 @@ func TestIntegrationMySQLCDC(t *testing.T) { integration.CheckSkip(t) var mysqlTestVersions = []string{"8.0", "9.0", "9.1"} for _, version := range mysqlTestVersions { - dsn, db := setupTestWithMySQLVersion(t, version) - // Create table - db.Exec(` + t.Run(version, func(t *testing.T) { + dsn, db := setupTestWithMySQLVersion(t, version) + // Create table + db.Exec(` CREATE TABLE IF NOT EXISTS foo ( a INT PRIMARY KEY ) `) - template := fmt.Sprintf(` + template := fmt.Sprintf(` mysql_cdc: dsn: %s stream_snapshot: false @@ -121,84 +123,85 @@ mysql_cdc: - foo `, dsn) - cacheConf := fmt.Sprintf(` + cacheConf := fmt.Sprintf(` label: foocache file: directory: %s`, t.TempDir()) - streamOutBuilder := service.NewStreamBuilder() - require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) - require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) - require.NoError(t, streamOutBuilder.AddInputYAML(template)) - - var outBatches []string - var outBatchMut sync.Mutex - require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { - msgBytes, err := mb[0].AsBytes() + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + var outBatches []string + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() require.NoError(t, err) - outBatchMut.Lock() - outBatches = append(outBatches, string(msgBytes)) - outBatchMut.Unlock() - return nil - })) - - streamOut, err := streamOutBuilder.Build() - require.NoError(t, err) - go func() { - err = streamOut.Run(context.Background()) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(time.Second * 5) + for i := 0; i < 1000; i++ { + // Insert 10000 rows + db.Exec("INSERT INTO foo VALUES (?)", i) + } + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1000 + }, time.Minute*5, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + + streamOutBuilder = service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) + require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + + outBatches = nil + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + msgBytes, err := mb[0].AsBytes() + require.NoError(t, err) + outBatchMut.Lock() + outBatches = append(outBatches, string(msgBytes)) + outBatchMut.Unlock() + return nil + })) + + streamOut, err = streamOutBuilder.Build() require.NoError(t, err) - }() - - time.Sleep(time.Second * 5) - for i := 0; i < 1000; i++ { - // Insert 10000 rows - db.Exec("INSERT INTO foo VALUES (?)", i) - } - assert.Eventually(t, func() bool { - outBatchMut.Lock() - defer outBatchMut.Unlock() - return len(outBatches) == 1000 - }, time.Minute*5, time.Millisecond*100) + time.Sleep(time.Second) + for i := 1001; i < 2001; i++ { + db.Exec("INSERT INTO foo VALUES (?)", i) + } - require.NoError(t, streamOut.StopWithin(time.Second*10)) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() - streamOutBuilder = service.NewStreamBuilder() - require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`)) - require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf)) - require.NoError(t, streamOutBuilder.AddInputYAML(template)) + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1000 + }, time.Minute*5, time.Millisecond*100) - outBatches = nil - require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { - msgBytes, err := mb[0].AsBytes() - require.NoError(t, err) - outBatchMut.Lock() - outBatches = append(outBatches, string(msgBytes)) - outBatchMut.Unlock() - return nil - })) - - streamOut, err = streamOutBuilder.Build() - require.NoError(t, err) - - time.Sleep(time.Second) - for i := 1001; i < 2001; i++ { - db.Exec("INSERT INTO foo VALUES (?)", i) - } - - go func() { - err = streamOut.Run(context.Background()) - require.NoError(t, err) - }() - - assert.Eventually(t, func() bool { - outBatchMut.Lock() - defer outBatchMut.Unlock() - return len(outBatches) == 1000 - }, time.Minute*5, time.Millisecond*100) - - require.NoError(t, streamOut.StopWithin(time.Second*10)) + require.NoError(t, streamOut.StopWithin(time.Second*10)) + }) } } @@ -273,12 +276,13 @@ func TestIntegrationMySQLCDCWithCompositePrimaryKeys(t *testing.T) { dsn, db := setupTestWithMySQLVersion(t, "8.0") // Create table db.Exec(` - CREATE TABLE IF NOT EXISTS foo ( - a INT, - b INT, - v JSON, - size ENUM('x-small', 'small', 'medium', 'large', 'x-large'), - PRIMARY KEY (a, b) + CREATE TABLE IF NOT EXISTS ` + "`Foo`" + ` ( + ` + "`A`" + ` INT, + ` + "`B`" + ` INT, + PRIMARY KEY ( + ` + "`A`" + `, + ` + "`B`" + ` + ) ) `) // Create control table to ensure we don't stream it @@ -286,15 +290,14 @@ func TestIntegrationMySQLCDCWithCompositePrimaryKeys(t *testing.T) { CREATE TABLE IF NOT EXISTS foo_non_streamed ( a INT, b INT, - v JSON, PRIMARY KEY (a, b) ) `) // Insert 1000 rows for initial snapshot streaming for i := 0; i < 1000; i++ { - db.Exec("INSERT INTO foo VALUES (?, ?, ?, ?)", i, i, `{"json":"data"}`, `large`) - db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?, ?)", i, i, `{"json":"data"}`) + db.Exec("INSERT INTO `Foo` VALUES (?, ?)", i, i) + db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?)", i, i) } template := fmt.Sprintf(` @@ -304,7 +307,7 @@ mysql_cdc: snapshot_max_batch_size: 500 checkpoint_cache: foocache tables: - - foo + - Foo `, dsn) cacheConf := fmt.Sprintf(` @@ -339,8 +342,8 @@ file: time.Sleep(time.Second * 5) for i := 1000; i < 2000; i++ { // Insert 10000 rows - db.Exec("INSERT INTO foo VALUES (?, ?, ?, ?)", i, i, `{"json":"data"}`, `x-small`) - db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?, ?)", i, i, `{"json":"data"}`) + db.Exec("INSERT INTO `Foo` VALUES (?, ?)", i, i) + db.Exec("INSERT INTO foo_non_streamed VALUES (?, ?)", i, i) } assert.Eventually(t, func() bool {