Skip to content

Commit

Permalink
send over the system chunk events
Browse files Browse the repository at this point in the history
  • Loading branch information
bjartek committed Jun 15, 2023
1 parent 5714b7c commit ab0a8af
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 23 deletions.
27 changes: 18 additions & 9 deletions mocks/OverflowBetaClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type OverflowBetaClient interface {
OverflowClient
GetTransactionById(ctx context.Context, id flow.Identifier) (*flow.Transaction, error)
GetOverflowTransactionById(ctx context.Context, id flow.Identifier) (*OverflowTransaction, error)
GetTransactions(ctx context.Context, id flow.Identifier) ([]OverflowTransaction, error)
GetTransactions(ctx context.Context, id flow.Identifier) ([]OverflowTransaction, OverflowEvents, error)
StreamTransactions(ctx context.Context, poll time.Duration, height uint64, logger *zap.Logger, channel chan<- BlockResult) error
}

Expand Down
26 changes: 13 additions & 13 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
"github.com/onflow/flow-go-sdk"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/sanity-io/litter"
"go.uber.org/zap"
)

type FilterFunction func(OverflowTransaction) bool

type BlockResult struct {
Transactions []OverflowTransaction
Block flow.Block
Error error
Logger *zap.Logger
Transactions []OverflowTransaction
SystemChunkEvents OverflowEvents
Block flow.Block
Error error
Logger *zap.Logger
}

type Argument struct {
Expand Down Expand Up @@ -157,7 +157,7 @@ func (o *OverflowState) GetTransactionById(ctx context.Context, id flow.Identifi
}

// this is get from block, needs to return system chunk information
func (o *OverflowState) GetTransactions(ctx context.Context, id flow.Identifier) ([]OverflowTransaction, error) {
func (o *OverflowState) GetTransactions(ctx context.Context, id flow.Identifier) ([]OverflowTransaction, OverflowEvents, error) {

//sometimes this will become too complex.

Expand All @@ -171,19 +171,19 @@ func (o *OverflowState) GetTransactions(ctx context.Context, id flow.Identifier)
time.Sleep(time.Millisecond * 200)
tx, txR, err = o.Flowkit.GetTransactionsByBlockID(ctx, id)
if err != nil {
return nil, errors.Wrap(err, "getting transaction results")
return nil, nil, errors.Wrap(err, "getting transaction results")
}
}

var systemChunkEvents OverflowEvents
result := lo.FlatMap(txR, func(rp *flow.TransactionResult, i int) []OverflowTransaction {
r := *rp

t := *tx[i]
if r.TransactionID.String() == "f31815934bff124e332b3c8be5e1c7a949532707251a9f2f81def8cc9f3d1458" {
litter.Dump(r)
litter.Dump(t)
systemChunkEvents, _ = parseEvents(r.Events)
return []OverflowTransaction{}
}
t := *tx[i]

//for some reason we get epoch heartbeat
if len(t.EnvelopeSignatures) == 0 {
Expand All @@ -197,7 +197,7 @@ func (o *OverflowState) GetTransactions(ctx context.Context, id flow.Identifier)
return []OverflowTransaction{*ot}
})

return result, nil
return result, systemChunkEvents, nil

}

Expand Down Expand Up @@ -248,13 +248,13 @@ func (o *OverflowState) StreamTransactions(ctx context.Context, poll time.Durati
} else {
block = latestKnownBlock
}
tx, err := o.GetTransactions(ctx, block.ID)
tx, systemChunkEvents, err := o.GetTransactions(ctx, block.ID)
if err != nil {
logg.Debug("getting transaction", zap.Error(err))
if strings.Contains(err.Error(), "could not retrieve collection: key not found") {
continue
}
channel <- BlockResult{Block: *block, Error: errors.Wrap(err, "getting transactions"), Logger: logg}
channel <- BlockResult{Block: *block, SystemChunkEvents: systemChunkEvents, Error: errors.Wrap(err, "getting transactions"), Logger: logg}
height = nextBlockToProcess
continue
}
Expand Down

0 comments on commit ab0a8af

Please sign in to comment.