Skip to content

Commit

Permalink
feat: support refresh and clear/reset events in connection_time (…
Browse files Browse the repository at this point in the history
…#13091)
  • Loading branch information
keyihuang committed Jul 16, 2024
1 parent 0409ce1 commit b4760e1
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 120 deletions.
1 change: 1 addition & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9105,6 +9105,7 @@ components:
- CONNECTION_DISABLED
- SCHEMA_UPDATE
- CONNECTOR_UPDATE
- UNKNOWN
UserReadInConnectionEvent:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.api.model.generated.BooleanRead;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.JobFailureRequest;
import io.airbyte.api.model.generated.JobStatusEnum;
import io.airbyte.api.model.generated.JobSuccessWithAttemptNumberRequest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.JobStatus;
Expand All @@ -26,8 +27,8 @@
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.data.services.ConnectionTimelineEventService;
import io.airbyte.data.services.shared.SyncFailedEvent;
import io.airbyte.data.services.shared.SyncSucceededEvent;
import io.airbyte.data.services.shared.FailedEvent;
import io.airbyte.data.services.shared.FinalStatusEvent;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.JobNotifier;
Expand Down Expand Up @@ -92,8 +93,8 @@ public InternalOperationResult jobFailure(final JobFailureRequest input) {
}
if (job.getConfigType().equals(JobConfig.ConfigType.SYNC)) {
jobNotifier.failJob(job, attemptStats);
storeSyncFailure(job, input.getConnectionId(), attemptStats);
}
logJobFailureEventInConnectionTimeline(job, input.getConnectionId(), attemptStats);

jobCreationAndStatusUpdateHelper.emitJobToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, job, input);

Expand Down Expand Up @@ -179,8 +180,8 @@ public InternalOperationResult jobSuccessWithAttemptNumber(final JobSuccessWithA
}
if (job.getConfigType().equals(JobConfig.ConfigType.SYNC)) {
jobNotifier.successJob(job, attemptStats);
storeSyncSuccess(job, input.getConnectionId(), attemptStats);
}
logJobSuccessEventInConnectionTimeline(job, input.getConnectionId(), attemptStats);
jobCreationAndStatusUpdateHelper.emitJobToReleaseStagesMetric(OssMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, job, input);
jobCreationAndStatusUpdateHelper.trackCompletion(job, JobStatus.SUCCEEDED);

Expand All @@ -192,19 +193,26 @@ public InternalOperationResult jobSuccessWithAttemptNumber(final JobSuccessWithA
}
}

private void storeSyncSuccess(final Job job, final UUID connectionId, final List<JobPersistence.AttemptStats> attemptStats) {
private void logJobSuccessEventInConnectionTimeline(final Job job, final UUID connectionId, final List<JobPersistence.AttemptStats> attemptStats) {
final long jobId = job.getId();
try {
final LoadedStats stats = buildLoadedStats(job, attemptStats);
final SyncSucceededEvent event = new SyncSucceededEvent(jobId, job.getCreatedAtInSecond(),
job.getUpdatedAtInSecond(), stats.bytes, stats.records, job.getAttemptsCount());
final FinalStatusEvent event = new FinalStatusEvent(
jobId,
job.getCreatedAtInSecond(),
job.getUpdatedAtInSecond(),
stats.bytes,
stats.records,
job.getAttemptsCount(),
job.getConfigType().name(),
JobStatus.SUCCEEDED.name());
connectionEventService.writeEvent(connectionId, event, null);
} catch (final Exception e) {
log.warn("Failed to persist timeline event for job: {}", jobId, e);
}
}

private void storeSyncFailure(final Job job, final UUID connectionId, final List<JobPersistence.AttemptStats> attemptStats) {
private void logJobFailureEventInConnectionTimeline(final Job job, final UUID connectionId, final List<JobPersistence.AttemptStats> attemptStats) {
final long jobId = job.getId();
try {
final LoadedStats stats = buildLoadedStats(job, attemptStats);
Expand All @@ -213,8 +221,24 @@ private void storeSyncFailure(final Job job, final UUID connectionId, final List
final Optional<FailureReason> firstFailureReasonOfLastAttempt =
lastAttemptFailureSummary.flatMap(summary -> summary.getFailures().stream().findFirst());

final SyncFailedEvent event = new SyncFailedEvent(jobId, job.getCreatedAtInSecond(),
job.getUpdatedAtInSecond(), stats.bytes, stats.records, job.getAttemptsCount(), firstFailureReasonOfLastAttempt);
final String jobStatus;
if (stats.bytes > 0) {
// Sync is incomplete (partial succeeded).
jobStatus = JobStatusEnum.INCOMPLETE.name();
} else {
// Sync is failed.
jobStatus = JobStatusEnum.FAILED.name();
}
final FailedEvent event = new FailedEvent(
jobId,
job.getCreatedAtInSecond(),
job.getUpdatedAtInSecond(),
stats.bytes,
stats.records,
job.getAttemptsCount(),
job.getConfigType().name(),
jobStatus,
firstFailureReasonOfLastAttempt);
connectionEventService.writeEvent(connectionId, event, null);
} catch (final Exception e) {
log.warn("Failed to persist timeline event for job: {}", jobId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.NotificationSettings;
import io.airbyte.config.ResourceRequirements;
Expand All @@ -87,8 +86,8 @@
import io.airbyte.data.services.ConnectionTimelineEventService;
import io.airbyte.data.services.SecretPersistenceConfigService;
import io.airbyte.data.services.WorkspaceService;
import io.airbyte.data.services.shared.SyncCancelledEvent;
import io.airbyte.data.services.shared.SyncStartedEvent;
import io.airbyte.data.services.shared.FinalStatusEvent;
import io.airbyte.data.services.shared.ManuallyStartedEvent;
import io.airbyte.featureflag.DiscoverPostprocessInTemporal;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Organization;
Expand All @@ -105,13 +104,15 @@
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.factory.SyncJobFactory;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobStatus;
import io.airbyte.persistence.job.tracker.JobTracker;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.inject.Singleton;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -157,7 +158,7 @@ public class SchedulerHandler {
private final ConnectorDefinitionSpecificationHandler connectorDefinitionSpecificationHandler;
private final WorkspaceService workspaceService;
private final SecretPersistenceConfigService secretPersistenceConfigService;
private final ConnectionTimelineEventService connectionEventService;
private final ConnectionTimelineEventService connectionTimelineEventService;
private final CurrentUserService currentUserService;
private final StreamRefreshesHandler streamRefreshesHandler;
private final NotificationHelper notificationHelper;
Expand Down Expand Up @@ -209,7 +210,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.connectorDefinitionSpecificationHandler = connectorDefinitionSpecificationHandler;
this.workspaceService = workspaceService;
this.secretPersistenceConfigService = secretPersistenceConfigService;
this.connectionEventService = connectionEventService;
this.connectionTimelineEventService = connectionEventService;
this.currentUserService = currentUserService;
this.jobCreationAndStatusUpdateHelper = new JobCreationAndStatusUpdateHelper(
jobPersistence,
Expand Down Expand Up @@ -791,6 +792,22 @@ private UUID getCurrentUserIdIfExist() {
}
}

private long getRecordsLoaded(final List<io.airbyte.api.model.generated.@Valid StreamStats> streamAggregatedStats) {
long totalRecordsLoaded = 0;
for (final io.airbyte.api.model.generated.@Valid StreamStats streamStats : streamAggregatedStats) {
totalRecordsLoaded += streamStats.getRecordsCommitted();
}
return totalRecordsLoaded;
}

private long getBytesLoaded(final List<io.airbyte.api.model.generated.@Valid StreamStats> streamAggregatedStats) {
long totalBytesLoaded = 0;
for (final io.airbyte.api.model.generated.@Valid StreamStats streamStats : streamAggregatedStats) {
totalBytesLoaded += streamStats.getBytesCommitted();
}
return totalBytesLoaded;
}

private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOException {
final Job job = jobPersistence.getJob(jobId);

Expand All @@ -799,13 +816,19 @@ private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOExcept
throw new IllegalStateException(cancellationResult.getFailingReason().get());
}
final JobInfoRead jobInfo = readJobFromResult(cancellationResult);
if (job.getConfigType() != null && job.getConfigType().equals(JobConfig.ConfigType.SYNC)) {
// Store connection timeline event (cancel a sync).
final SyncCancelledEvent event = new SyncCancelledEvent(
jobInfo.getJob().getId(),
jobInfo.getJob().getCreatedAt());
connectionEventService.writeEvent(UUID.fromString(job.getScope()), event, getCurrentUserIdIfExist());
}

// Store connection timeline event (job cancellation).
final @Valid List<io.airbyte.api.model.generated.@Valid StreamStats> streamAggregatedStats = jobInfo.getJob().getStreamAggregatedStats();
final FinalStatusEvent event = new FinalStatusEvent(
jobInfo.getJob().getId(),
jobInfo.getJob().getCreatedAt(),
jobInfo.getJob().getUpdatedAt(),
getBytesLoaded(streamAggregatedStats),
getRecordsLoaded(streamAggregatedStats),
job.getAttemptsCount(),
job.getConfigType().name(), JobStatus.CANCELLED.name());
connectionTimelineEventService.writeEvent(UUID.fromString(job.getScope()), event, getCurrentUserIdIfExist());

// query same job ID again to get updated job info after cancellation
return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}
Expand All @@ -819,14 +842,19 @@ private JobInfoRead submitManualSyncToWorker(final UUID connectionId)
}
final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId);
final JobInfoRead jobInfo = readJobFromResult(manualSyncResult);
if (jobInfo != null && jobInfo.getJob() != null && jobInfo.getJob().getConfigType().equals(JobConfigType.SYNC)) {
// Store connection timeline event (start a manual sync).
final SyncStartedEvent event = new SyncStartedEvent(
logManuallyStartedEventInConnectionTimeline(connectionId, jobInfo, null);
return jobInfo;
}

private void logManuallyStartedEventInConnectionTimeline(final UUID connectionId, final JobInfoRead jobInfo, final List<StreamDescriptor> streams) {
if (jobInfo != null && jobInfo.getJob() != null && jobInfo.getJob().getConfigType() != null) {
final ManuallyStartedEvent event = new ManuallyStartedEvent(
jobInfo.getJob().getId(),
jobInfo.getJob().getCreatedAt());
connectionEventService.writeEvent(connectionId, event, getCurrentUserIdIfExist());
jobInfo.getJob().getCreatedAt(),
jobInfo.getJob().getConfigType().name(),
streams);
connectionTimelineEventService.writeEvent(connectionId, event, getCurrentUserIdIfExist());
}
return jobInfo;
}

private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, IllegalStateException, ConfigNotFoundException {
Expand All @@ -840,7 +868,9 @@ private JobInfoRead submitResetConnectionToWorker(final UUID connectionId,
connectionId,
streamsToReset);

return readJobFromResult(resetConnectionResult);
final JobInfoRead jobInfo = readJobFromResult(resetConnectionResult);
logManuallyStartedEventInConnectionTimeline(connectionId, jobInfo, streamsToReset);
return jobInfo;
}

private JobInfoRead submitResetConnectionStreamsToWorker(final UUID connectionId, final List<ConnectionStream> streams)
Expand All @@ -851,7 +881,7 @@ private JobInfoRead submitResetConnectionStreamsToWorker(final UUID connectionId
return submitResetConnectionToWorker(connectionId, actualStreamsToReset);
}

private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
public JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
if (manualOperationResult.getFailingReason().isPresent()) {
if (VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET.contains(manualOperationResult.getErrorCode().get())) {
throw new ValueConflictKnownException(manualOperationResult.getFailingReason().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package io.airbyte.commons.server.handlers
import io.airbyte.api.model.generated.ConnectionStream
import io.airbyte.api.model.generated.DestinationIdRequestBody
import io.airbyte.api.model.generated.RefreshMode
import io.airbyte.commons.server.converters.JobConverter
import io.airbyte.commons.server.scheduler.EventRunner
import io.airbyte.commons.server.support.CurrentUserService
import io.airbyte.config.JobConfig.ConfigType
import io.airbyte.config.RefreshStream
import io.airbyte.config.persistence.StreamRefreshesRepository
import io.airbyte.config.persistence.domain.StreamRefresh
import io.airbyte.config.persistence.saveStreamsToRefresh
import io.airbyte.data.services.ConnectionService
import io.airbyte.data.services.ConnectionTimelineEventService
import io.airbyte.data.services.shared.ManuallyStartedEvent
import io.airbyte.persistence.job.JobPersistence
import io.airbyte.protocol.models.StreamDescriptor
import jakarta.inject.Singleton
import java.util.UUID
Expand All @@ -19,6 +25,10 @@ class StreamRefreshesHandler(
private val streamRefreshesRepository: StreamRefreshesRepository,
private val eventRunner: EventRunner,
private val actorDefinitionVersionHandler: ActorDefinitionVersionHandler,
private val currentUserService: CurrentUserService,
private val jobPersistence: JobPersistence,
private val jobConverter: JobConverter,
private val connectionTimelineEventService: ConnectionTimelineEventService,
) {
fun deleteRefreshesForConnection(connectionId: UUID) {
streamRefreshesRepository.deleteByConnectionId(connectionId)
Expand Down Expand Up @@ -49,7 +59,27 @@ class StreamRefreshesHandler(

createRefreshesForStreams(connectionId, refreshMode, streamDescriptors)

eventRunner.startNewManualSync(connectionId)
// Store connection timeline event (start a refresh).
val manualSyncResult = eventRunner.startNewManualSync(connectionId)
val job = manualSyncResult?.jobId?.let { jobPersistence.getJob(it.get()) }
val jobInfo = job?.let { jobConverter.getJobInfoRead(job) }
if (jobInfo?.job != null && jobInfo.job.configType != null) {
val userId = currentUserService.currentUser?.userId
val refreshStartedEvent =
ManuallyStartedEvent(
jobId = jobInfo.job.id,
startTimeEpochSeconds = jobInfo.job.createdAt,
jobType = ConfigType.REFRESH.name,
streams =
jobInfo.job.refreshConfig.streamsToRefresh.map {
streamDescriptor ->
StreamDescriptor()
.withName(streamDescriptor.name)
.withNamespace(streamDescriptor.namespace)
},
)
connectionTimelineEventService.writeEvent(connectionId, refreshStartedEvent, userId)
}

return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1774,8 +1774,15 @@ void testCancelJob() throws IOException {
when(eventRunner.startNewCancellation(connectionId))
.thenReturn(manualOperationResult);

doReturn(new JobInfoRead())
.when(jobConverter).getJobInfoRead(any());
doReturn(new JobInfoRead()
.job(new JobRead()
.id(jobId)
.createdAt(123L)
.updatedAt(321L)))
.when(jobConverter).getJobInfoRead(any());

// todo(@keyi): test different type (sync, clear, refresh)
when(job.getConfigType()).thenReturn(ConfigType.SYNC);

schedulerHandler.cancelJob(new JobIdRequestBody().id(jobId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import io.airbyte.api.model.generated.ActorDefinitionVersionRead
import io.airbyte.api.model.generated.ConnectionStream
import io.airbyte.api.model.generated.DestinationIdRequestBody
import io.airbyte.api.model.generated.RefreshMode
import io.airbyte.commons.server.converters.JobConverter
import io.airbyte.commons.server.handlers.StreamRefreshesHandler.Companion.connectionStreamsToStreamDescriptors
import io.airbyte.commons.server.scheduler.EventRunner
import io.airbyte.commons.server.support.CurrentUserService
import io.airbyte.config.StandardSync
import io.airbyte.config.persistence.StreamRefreshesRepository
import io.airbyte.config.persistence.domain.StreamRefresh
import io.airbyte.data.services.ConnectionService
import io.airbyte.data.services.ConnectionTimelineEventService
import io.airbyte.persistence.job.JobPersistence
import io.airbyte.protocol.models.StreamDescriptor
import io.mockk.clearAllMocks
import io.mockk.every
Expand All @@ -28,13 +32,21 @@ internal class StreamRefreshesHandlerTest {
private val streamRefreshesRepository: StreamRefreshesRepository = mockk()
private val eventRunner: EventRunner = mockk()
private val actorDefinitionVersionHandler: ActorDefinitionVersionHandler = mockk()
private val currentUserService: CurrentUserService = mockk()
private val jobPersistence: JobPersistence = mockk()
private val jobConverter: JobConverter = mockk()
private val connectionTimelineEventService: ConnectionTimelineEventService = mockk()

private val streamRefreshesHandler =
StreamRefreshesHandler(
connectionService,
streamRefreshesRepository,
eventRunner,
actorDefinitionVersionHandler,
currentUserService,
jobPersistence,
jobConverter,
connectionTimelineEventService,
)

private val connectionId = UUID.randomUUID()
Expand Down Expand Up @@ -73,6 +85,8 @@ internal class StreamRefreshesHandlerTest {

every { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) } returns listOf()
every { eventRunner.startNewManualSync(connectionId) } returns null
// todo(@keyi): mock a real job and test saving on connection timeline event
every { jobPersistence.getJob(any()) } returns null

val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, connectionStream)

Expand All @@ -91,6 +105,8 @@ internal class StreamRefreshesHandlerTest {
every { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) } returns listOf()
every { eventRunner.startNewManualSync(connectionId) } returns null
every { connectionService.getAllStreamsForConnection(connectionId) } returns streamDescriptors
// todo(@keyi): mock a real job and test saving on connection timeline event
every { jobPersistence.getJob(any()) } returns null

val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, listOf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ interface ConnectionEvent {
CONNECTION_DISABLED,
SCHEMA_UPDATE,
CONNECTOR_UPDATE,
UNKNOWN,
}

fun getEventType(): Type
Expand Down
Loading

0 comments on commit b4760e1

Please sign in to comment.