diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java index a21a18daea8..1d56c6e911d 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java @@ -1078,8 +1078,8 @@ public ConnectionEventList listConnectionEvents(final ConnectionEventsRequestBod final int pageSize = (connectionEventsRequestBody.getPagination() != null && connectionEventsRequestBody.getPagination().getPageSize() != null) ? connectionEventsRequestBody.getPagination().getPageSize() : DEFAULT_PAGE_SIZE; - final int rowOffset = (connectionEventsRequestBody.getPagination() != null && connectionEventsRequestBody.getPagination().getPageSize() != null) - ? connectionEventsRequestBody.getPagination().getPageSize() + final int rowOffset = (connectionEventsRequestBody.getPagination() != null && connectionEventsRequestBody.getPagination().getRowOffset() != null) + ? connectionEventsRequestBody.getPagination().getRowOffset() : DEFAULT_ROW_OFFSET; // 2. get list of events final List events = connectionTimelineEventService.listEvents( diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java index 3c888281afc..5ddadf899d8 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java @@ -1773,15 +1773,13 @@ void testCancelJob() throws IOException { when(eventRunner.startNewCancellation(connectionId)) .thenReturn(manualOperationResult); - - doReturn(new JobInfoRead() + final JobInfoRead jobInfo = new JobInfoRead() .job(new JobRead() .id(jobId) .createdAt(123L) - .updatedAt(321L))) - .when(jobConverter).getJobInfoRead(any()); + .updatedAt(321L)); + doReturn(jobInfo).when(jobConverter).getJobInfoRead(any()); - // todo(@keyi): test different type (sync, clear, refresh) when(job.getConfigType()).thenReturn(ConfigType.SYNC); schedulerHandler.cancelJob(new JobIdRequestBody().id(jobId)); diff --git a/airbyte-commons-server/src/test/kotlin/io/airbyte/commons/server/handlers/StreamRefreshesHandlerTest.kt b/airbyte-commons-server/src/test/kotlin/io/airbyte/commons/server/handlers/StreamRefreshesHandlerTest.kt index 68e52b0f1f1..59d59d8e713 100644 --- a/airbyte-commons-server/src/test/kotlin/io/airbyte/commons/server/handlers/StreamRefreshesHandlerTest.kt +++ b/airbyte-commons-server/src/test/kotlin/io/airbyte/commons/server/handlers/StreamRefreshesHandlerTest.kt @@ -3,17 +3,24 @@ package io.airbyte.commons.server.handlers import io.airbyte.api.model.generated.ActorDefinitionVersionRead import io.airbyte.api.model.generated.ConnectionStream import io.airbyte.api.model.generated.DestinationIdRequestBody +import io.airbyte.api.model.generated.JobConfigType +import io.airbyte.api.model.generated.JobInfoRead +import io.airbyte.api.model.generated.JobRead +import io.airbyte.api.model.generated.JobRefreshConfig +import io.airbyte.api.model.generated.JobStatus import io.airbyte.api.model.generated.RefreshMode import io.airbyte.commons.server.converters.JobConverter import io.airbyte.commons.server.handlers.StreamRefreshesHandler.Companion.connectionStreamsToStreamDescriptors import io.airbyte.commons.server.scheduler.EventRunner import io.airbyte.commons.server.support.CurrentUserService +import io.airbyte.config.JobConfig import io.airbyte.config.StandardSync import io.airbyte.config.persistence.StreamRefreshesRepository import io.airbyte.config.persistence.domain.StreamRefresh import io.airbyte.data.services.ConnectionService import io.airbyte.data.services.ConnectionTimelineEventService import io.airbyte.persistence.job.JobPersistence +import io.airbyte.persistence.job.models.Job import io.airbyte.protocol.models.StreamDescriptor import io.mockk.clearAllMocks import io.mockk.every @@ -85,9 +92,19 @@ internal class StreamRefreshesHandlerTest { every { streamRefreshesRepository.saveAll(any>()) } returns listOf() every { eventRunner.startNewManualSync(connectionId) } returns null - // todo(@keyi): mock a real job and test saving on connection timeline event - every { jobPersistence.getJob(any()) } returns null - + every { jobPersistence.getJob(any()) } returns + Job( + 0L, JobConfig.ConfigType.REFRESH, "scope_id", + null, listOf(), io.airbyte.persistence.job.models.JobStatus.SUCCEEDED, 0L, 0L, 0L, + ) + every { jobConverter.getJobInfoRead(any()) } returns + JobInfoRead().job( + JobRead() + .id(0L) + .configType(JobConfigType.REFRESH) + .createdAt(0L) + .refreshConfig(JobRefreshConfig().streamsToRefresh(listOf())), + ) val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, connectionStream) assertTrue(result) @@ -105,8 +122,19 @@ internal class StreamRefreshesHandlerTest { every { streamRefreshesRepository.saveAll(any>()) } returns listOf() every { eventRunner.startNewManualSync(connectionId) } returns null every { connectionService.getAllStreamsForConnection(connectionId) } returns streamDescriptors - // todo(@keyi): mock a real job and test saving on connection timeline event - every { jobPersistence.getJob(any()) } returns null + every { jobPersistence.getJob(any()) } returns + Job( + 0L, JobConfig.ConfigType.REFRESH, "scope_id", + null, listOf(), io.airbyte.persistence.job.models.JobStatus.SUCCEEDED, 0L, 0L, 0L, + ) + every { jobConverter.getJobInfoRead(any()) } returns + JobInfoRead().job( + JobRead() + .id(0L) + .configType(JobConfigType.REFRESH) + .createdAt(0L) + .refreshConfig(JobRefreshConfig().streamsToRefresh(listOf())), + ) val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, listOf()) diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt index 068037723bb..c2ae2f00c65 100644 --- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt +++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/ConnectionEvent.kt @@ -1,5 +1,6 @@ package io.airbyte.data.services.shared +import com.fasterxml.jackson.annotation.JsonIgnore import io.micronaut.data.annotation.TypeDef import io.micronaut.data.model.DataType @@ -31,5 +32,6 @@ interface ConnectionEvent { UNKNOWN, } + @JsonIgnore fun getEventType(): Type } diff --git a/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt b/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt index e5d7cfd65f5..d4b0b49f318 100644 --- a/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt +++ b/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt @@ -184,5 +184,45 @@ internal class ConnectionTimelineEventRepositoryTest : AbstractConfigRepositoryT ) assert(res.size == 2) } + + @Test + fun `should list events with all combined restrictions`() { + val allEvents = // sorted by createdAtStart DESC: [event4, e3, e2, e1] + connectionTimelineEventRepository.findByConnectionIdWithFilters( + connectionId = connectionId, + eventTypes = null, + createdAtStart = null, + createdAtEnd = null, + pageSize = 200, + rowOffset = 0, + ) + val res = + connectionTimelineEventRepository.findByConnectionIdWithFilters( + connectionId = connectionId, + eventTypes = + listOf( + // e1 + ConnectionEvent.Type.SYNC_STARTED, + // e2 + ConnectionEvent.Type.SYNC_CANCELLED, + // e3 + ConnectionEvent.Type.REFRESH_STARTED, + // e4 + ConnectionEvent.Type.REFRESH_SUCCEEDED, + // no event + ConnectionEvent.Type.CLEAR_STARTED, + ), + // e1.createdAt + createdAtStart = allEvents[3].createdAt, + // e3.createdAt + createdAtEnd = allEvents[1].createdAt, + // now we should list total 3 events: [e3, e2, e1] + pageSize = 200, + // now we should list total 2 events: [e2, e1] + rowOffset = 1, + ) + assert(res.size == 2) + assert(res[0].id == event2.id) + } } }