Skip to content

Commit

Permalink
Graceful handling of bad events
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 6, 2024
1 parent 2d7c074 commit 5b3e24d
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,30 +240,31 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
case repomgr.EvtKindCreateRecord:
if op.Cid == nil {
log.Error("update record op missing cid")
break
continue
}

rcid, recB, err := rr.GetRecordBytes(ctx, op.Path)
if err != nil {
log.Error("failed to get record bytes", "error", err)
break
continue
}

recCid := rcid.String()
if recCid != op.Cid.String() {
log.Error("record cid mismatch", "expected", *op.Cid, "actual", rcid)
break
continue
}

rec, err := data.UnmarshalCBOR(*recB)
if err != nil {
return fmt.Errorf("failed to unmarshal record: %w", err)
log.Error("failed to unmarshal record", "error", err)
continue
}

recJSON, err := json.Marshal(rec)
if err != nil {
log.Error("failed to marshal record to json", "error", err)
break
continue
}

e.Commit = &models.Commit{
Expand All @@ -277,30 +278,31 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
case repomgr.EvtKindUpdateRecord:
if op.Cid == nil {
log.Error("update record op missing cid")
break
continue
}

rcid, recB, err := rr.GetRecordBytes(ctx, op.Path)
if err != nil {
log.Error("failed to get record bytes", "error", err)
break
continue
}

recCid := rcid.String()
if recCid != op.Cid.String() {
log.Error("record cid mismatch", "expected", *op.Cid, "actual", rcid)
break
continue
}

rec, err := data.UnmarshalCBOR(*recB)
if err != nil {
return fmt.Errorf("failed to unmarshal record: %w", err)
log.Error("failed to unmarshal record", "error", err)
continue
}

recJSON, err := json.Marshal(rec)
if err != nil {
log.Error("failed to marshal record to json", "error", err)
break
continue
}

e.Commit = &models.Commit{
Expand Down

0 comments on commit 5b3e24d

Please sign in to comment.