diff --git a/sdks/go/test/integration/internal/containers/containers.go b/sdks/go/test/integration/internal/containers/containers.go index 5987897fd412..d897c59fc52c 100644 --- a/sdks/go/test/integration/internal/containers/containers.go +++ b/sdks/go/test/integration/internal/containers/containers.go @@ -19,23 +19,45 @@ package containers import ( "context" "testing" + "time" "github.com/docker/go-connections/nat" "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + "gopkg.in/retry.v1" ) type ContainerOptionFn func(*testcontainers.ContainerRequest) +func WithEnv(env map[string]string) ContainerOptionFn { + return func(option *testcontainers.ContainerRequest) { + option.Env = env + } +} + +func WithHostname(hostname string) ContainerOptionFn { + return func(option *testcontainers.ContainerRequest) { + option.Hostname = hostname + } +} + func WithPorts(ports []string) ContainerOptionFn { return func(option *testcontainers.ContainerRequest) { option.ExposedPorts = ports } } +func WithWaitStrategy(waitStrategy wait.Strategy) ContainerOptionFn { + return func(option *testcontainers.ContainerRequest) { + option.WaitingFor = waitStrategy + } +} + func NewContainer( ctx context.Context, t *testing.T, image string, + maxRetries int, opts ...ContainerOptionFn, ) testcontainers.Container { t.Helper() @@ -51,9 +73,26 @@ func NewContainer( Started: true, } - container, err := testcontainers.GenericContainer(ctx, genericRequest) - if err != nil { - t.Fatalf("error creating container: %v", err) + strategy := retry.LimitCount( + maxRetries, + retry.Exponential{ + Initial: time.Second, + Factor: 2, + }, + ) + + var container testcontainers.Container + var err error + + for attempt := retry.Start(strategy, nil); attempt.Next(); { + container, err = testcontainers.GenericContainer(ctx, genericRequest) + if err == nil { + break + } + + if attempt.Count() == maxRetries { + t.Fatalf("failed to start container with %v retries: %v", maxRetries, err) + } } t.Cleanup(func() { diff --git a/sdks/go/test/integration/io/mongodbio/helper_test.go b/sdks/go/test/integration/io/mongodbio/helper_test.go index 751f9e56787e..0551be62225a 100644 --- a/sdks/go/test/integration/io/mongodbio/helper_test.go +++ b/sdks/go/test/integration/io/mongodbio/helper_test.go @@ -29,7 +29,8 @@ import ( const ( mongoImage = "mongo:6.0.3" - mongoPort = "27017" + mongoPort = "27017/tcp" + maxRetries = 5 ) func setUpTestContainer(ctx context.Context, t *testing.T) string { @@ -39,7 +40,8 @@ func setUpTestContainer(ctx context.Context, t *testing.T) string { ctx, t, mongoImage, - containers.WithPorts([]string{mongoPort + "/tcp"}), + maxRetries, + containers.WithPorts([]string{mongoPort}), ) return containers.Port(ctx, t, container, mongoPort) diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go index d347e4436f2e..24c2b513b2b2 100644 --- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go +++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go @@ -29,9 +29,14 @@ import ( _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/apache/beam/sdks/v2/go/test/integration" - "github.com/docker/go-connections/nat" + "github.com/apache/beam/sdks/v2/go/test/integration/internal/containers" _ "github.com/lib/pq" - "github.com/testcontainers/testcontainers-go" +) + +const ( + debeziumImage = "debezium/example-postgres:latest" + debeziumPort = "5432/tcp" + maxRetries = 5 ) var expansionAddr string // Populate with expansion address labelled "debeziumio". @@ -42,35 +47,25 @@ func checkFlags(t *testing.T) { } } -func setupTestContainer(t *testing.T, dbname, username, password string) string { +func setupTestContainer(ctx context.Context, t *testing.T, dbname, username, password string) string { t.Helper() - var env = map[string]string{ + env := map[string]string{ "POSTGRES_PASSWORD": password, "POSTGRES_USER": username, "POSTGRES_DB": dbname, } - var port = "5432/tcp" - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "debezium/example-postgres:latest", - ExposedPorts: []string{port}, - Env: env, - }, - Started: true, - } - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, req) - if err != nil { - t.Fatalf("failed to start container: %v", err) - } + container := containers.NewContainer( + ctx, + t, + debeziumImage, + maxRetries, + containers.WithEnv(env), + containers.WithPorts([]string{debeziumPort}), + ) - mappedPort, err := container.MappedPort(ctx, nat.Port(port)) - if err != nil { - t.Fatalf("failed to get container external port: %v", err) - } - return mappedPort.Port() + return containers.Port(ctx, t, container, debeziumPort) } // TestDebeziumIO_BasicRead tests basic read transform from Debezium. @@ -78,10 +73,11 @@ func TestDebeziumIO_BasicRead(t *testing.T) { integration.CheckFilters(t) checkFlags(t) + ctx := context.Background() dbname := "inventory" username := "debezium" password := "dbz" - port := setupTestContainer(t, dbname, username, password) + port := setupTestContainer(ctx, t, dbname, username, password) host := "localhost" connectionProperties := []string{ "database.dbname=inventory", diff --git a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go index 27021308a781..0eddc3e788d2 100644 --- a/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go +++ b/sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go @@ -30,16 +30,20 @@ import ( _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/apache/beam/sdks/v2/go/test/integration" + "github.com/apache/beam/sdks/v2/go/test/integration/internal/containers" "github.com/docker/go-connections/nat" _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" - "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" - "gopkg.in/retry.v1" +) + +const ( + postgresImage = "postgres" + postgresPort = "5432/tcp" + maxRetries = 5 ) var expansionAddr string // Populate with expansion address labelled "schemaio". -const maxRetryCount = 5 func checkFlags(t *testing.T) { if expansionAddr == "" { @@ -47,55 +51,35 @@ func checkFlags(t *testing.T) { } } -func setupTestContainer(t *testing.T, ctx context.Context, dbname, username, password string) (testcontainers.Container, int) { +func setupTestContainer(ctx context.Context, t *testing.T, dbname, username, password string) string { t.Helper() - var env = map[string]string{ + env := map[string]string{ "POSTGRES_PASSWORD": password, "POSTGRES_USER": username, "POSTGRES_DB": dbname, } + hostname := "localhost" - var port = "5432/tcp" dbURL := func(host string, port nat.Port) string { return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", username, password, host, port.Port(), dbname) } - - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "postgres", - ExposedPorts: []string{port}, - Env: env, - Hostname: "localhost", - WaitingFor: wait.ForSQL(nat.Port(port), "postgres", dbURL).Timeout(time.Second * 5), - }, - Started: true, - } - - strategy := retry.LimitCount(maxRetryCount, - retry.Exponential{ - Initial: time.Second, - Factor: 2, - }, + waitStrategy := wait.ForSQL(postgresPort, "postgres", dbURL).WithStartupTimeout(time.Second * 5) + + container := containers.NewContainer( + ctx, + t, + postgresImage, + maxRetries, + containers.WithPorts([]string{postgresPort}), + containers.WithEnv(env), + containers.WithHostname(hostname), + containers.WithWaitStrategy(waitStrategy), ) - var container testcontainers.Container - var err error - for r := retry.Start(strategy, nil); r.Next(); { - container, err = testcontainers.GenericContainer(ctx, req) - if err == nil { - break - } - if r.Count() == maxRetryCount { - t.Fatalf("failed to start container with %v retries: %v", maxRetryCount, err) - } - } - mappedPort, err := container.MappedPort(ctx, nat.Port(port)) - if err != nil { - t.Fatalf("failed to get container external port: %s", err) - } + mappedPort := containers.Port(ctx, t, container, postgresPort) - url := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, password, mappedPort.Port(), dbname) + url := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, password, mappedPort, dbname) db, err := sql.Open("postgres", url) if err != nil { t.Fatalf("failed to establish database connection: %s", err) @@ -106,7 +90,8 @@ func setupTestContainer(t *testing.T, ctx context.Context, dbname, username, pas if err != nil { t.Fatalf("can't create table, check command and access level") } - return container, mappedPort.Int() + + return mappedPort } // TestJDBCIO_BasicReadWrite tests basic read and write transform from JDBC. @@ -119,11 +104,10 @@ func TestJDBCIO_BasicReadWrite(t *testing.T) { username := "newuser" password := "password" - cont, port := setupTestContainer(t, ctx, dbname, username, password) - defer cont.Terminate(ctx) + port := setupTestContainer(ctx, t, dbname, username, password) tableName := "roles" host := "localhost" - jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname) + jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%s/%s", host, port, dbname) write := WritePipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password) ptest.RunAndValidate(t, write) @@ -141,11 +125,10 @@ func TestJDBCIO_PostgresReadWrite(t *testing.T) { username := "newuser" password := "password" ctx := context.Background() - cont, port := setupTestContainer(t, ctx, dbname, username, password) - defer cont.Terminate(ctx) + port := setupTestContainer(ctx, t, dbname, username, password) tableName := "roles" host := "localhost" - jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname) + jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%s/%s", host, port, dbname) write := WriteToPostgres(expansionAddr, tableName, jdbcUrl, username, password) ptest.RunAndValidate(t, write)