Skip to content

Commit

Permalink
fix: pagination issue in listing connection timeline events (#13148)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyihuang committed Jul 18, 2024
1 parent 9fc5b86 commit 5dd3643
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1078,8 +1078,8 @@ public ConnectionEventList listConnectionEvents(final ConnectionEventsRequestBod
final int pageSize = (connectionEventsRequestBody.getPagination() != null && connectionEventsRequestBody.getPagination().getPageSize() != null)
? connectionEventsRequestBody.getPagination().getPageSize()
: DEFAULT_PAGE_SIZE;
final int rowOffset = (connectionEventsRequestBody.getPagination() != null && connectionEventsRequestBody.getPagination().getPageSize() != null)
? connectionEventsRequestBody.getPagination().getPageSize()
final int rowOffset = (connectionEventsRequestBody.getPagination() != null && connectionEventsRequestBody.getPagination().getRowOffset() != null)
? connectionEventsRequestBody.getPagination().getRowOffset()
: DEFAULT_ROW_OFFSET;
// 2. get list of events
final List<ConnectionTimelineEvent> events = connectionTimelineEventService.listEvents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1773,15 +1773,13 @@ void testCancelJob() throws IOException {

when(eventRunner.startNewCancellation(connectionId))
.thenReturn(manualOperationResult);

doReturn(new JobInfoRead()
final JobInfoRead jobInfo = new JobInfoRead()
.job(new JobRead()
.id(jobId)
.createdAt(123L)
.updatedAt(321L)))
.when(jobConverter).getJobInfoRead(any());
.updatedAt(321L));
doReturn(jobInfo).when(jobConverter).getJobInfoRead(any());

// todo(@keyi): test different type (sync, clear, refresh)
when(job.getConfigType()).thenReturn(ConfigType.SYNC);

schedulerHandler.cancelJob(new JobIdRequestBody().id(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ package io.airbyte.commons.server.handlers
import io.airbyte.api.model.generated.ActorDefinitionVersionRead
import io.airbyte.api.model.generated.ConnectionStream
import io.airbyte.api.model.generated.DestinationIdRequestBody
import io.airbyte.api.model.generated.JobConfigType
import io.airbyte.api.model.generated.JobInfoRead
import io.airbyte.api.model.generated.JobRead
import io.airbyte.api.model.generated.JobRefreshConfig
import io.airbyte.api.model.generated.JobStatus
import io.airbyte.api.model.generated.RefreshMode
import io.airbyte.commons.server.converters.JobConverter
import io.airbyte.commons.server.handlers.StreamRefreshesHandler.Companion.connectionStreamsToStreamDescriptors
import io.airbyte.commons.server.scheduler.EventRunner
import io.airbyte.commons.server.support.CurrentUserService
import io.airbyte.config.JobConfig
import io.airbyte.config.StandardSync
import io.airbyte.config.persistence.StreamRefreshesRepository
import io.airbyte.config.persistence.domain.StreamRefresh
import io.airbyte.data.services.ConnectionService
import io.airbyte.data.services.ConnectionTimelineEventService
import io.airbyte.persistence.job.JobPersistence
import io.airbyte.persistence.job.models.Job
import io.airbyte.protocol.models.StreamDescriptor
import io.mockk.clearAllMocks
import io.mockk.every
Expand Down Expand Up @@ -85,9 +92,19 @@ internal class StreamRefreshesHandlerTest {

every { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) } returns listOf()
every { eventRunner.startNewManualSync(connectionId) } returns null
// todo(@keyi): mock a real job and test saving on connection timeline event
every { jobPersistence.getJob(any()) } returns null

every { jobPersistence.getJob(any()) } returns
Job(
0L, JobConfig.ConfigType.REFRESH, "scope_id",
null, listOf(), io.airbyte.persistence.job.models.JobStatus.SUCCEEDED, 0L, 0L, 0L,
)
every { jobConverter.getJobInfoRead(any()) } returns
JobInfoRead().job(
JobRead()
.id(0L)
.configType(JobConfigType.REFRESH)
.createdAt(0L)
.refreshConfig(JobRefreshConfig().streamsToRefresh(listOf())),
)
val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, connectionStream)

assertTrue(result)
Expand All @@ -105,8 +122,19 @@ internal class StreamRefreshesHandlerTest {
every { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) } returns listOf()
every { eventRunner.startNewManualSync(connectionId) } returns null
every { connectionService.getAllStreamsForConnection(connectionId) } returns streamDescriptors
// todo(@keyi): mock a real job and test saving on connection timeline event
every { jobPersistence.getJob(any()) } returns null
every { jobPersistence.getJob(any()) } returns
Job(
0L, JobConfig.ConfigType.REFRESH, "scope_id",
null, listOf(), io.airbyte.persistence.job.models.JobStatus.SUCCEEDED, 0L, 0L, 0L,
)
every { jobConverter.getJobInfoRead(any()) } returns
JobInfoRead().job(
JobRead()
.id(0L)
.configType(JobConfigType.REFRESH)
.createdAt(0L)
.refreshConfig(JobRefreshConfig().streamsToRefresh(listOf())),
)

val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, listOf())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.airbyte.data.services.shared

import com.fasterxml.jackson.annotation.JsonIgnore
import io.micronaut.data.annotation.TypeDef
import io.micronaut.data.model.DataType

Expand Down Expand Up @@ -31,5 +32,6 @@ interface ConnectionEvent {
UNKNOWN,
}

@JsonIgnore
fun getEventType(): Type
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,45 @@ internal class ConnectionTimelineEventRepositoryTest : AbstractConfigRepositoryT
)
assert(res.size == 2)
}

@Test
fun `should list events with all combined restrictions`() {
val allEvents = // sorted by createdAtStart DESC: [event4, e3, e2, e1]
connectionTimelineEventRepository.findByConnectionIdWithFilters(
connectionId = connectionId,
eventTypes = null,
createdAtStart = null,
createdAtEnd = null,
pageSize = 200,
rowOffset = 0,
)
val res =
connectionTimelineEventRepository.findByConnectionIdWithFilters(
connectionId = connectionId,
eventTypes =
listOf(
// e1
ConnectionEvent.Type.SYNC_STARTED,
// e2
ConnectionEvent.Type.SYNC_CANCELLED,
// e3
ConnectionEvent.Type.REFRESH_STARTED,
// e4
ConnectionEvent.Type.REFRESH_SUCCEEDED,
// no event
ConnectionEvent.Type.CLEAR_STARTED,
),
// e1.createdAt
createdAtStart = allEvents[3].createdAt,
// e3.createdAt
createdAtEnd = allEvents[1].createdAt,
// now we should list total 3 events: [e3, e2, e1]
pageSize = 200,
// now we should list total 2 events: [e2, e1]
rowOffset = 1,
)
assert(res.size == 2)
assert(res[0].id == event2.id)
}
}
}

0 comments on commit 5dd3643

Please sign in to comment.