Skip to content

Commit

Permalink
added a way to store events in something that is not files (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjartek authored Feb 9, 2023
1 parent 7fc1ae0 commit f28b188
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
58 changes: 55 additions & 3 deletions event_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,25 @@ import (
// A function to customize the transaction builder
type OverflowEventFetcherOption func(*OverflowEventFetcherBuilder)

type ProgressReaderWriter interface {
/// can return 0 if we do not have any progress thus far
ReadProgress() (int64, error)
WriteProgress(progress int64) error
}

type InMemoryProgressKeeper struct {
Progress int64
}

func (self *InMemoryProgressKeeper) ReadProgress() (int64, error) {
return self.Progress, nil
}

func (self *InMemoryProgressKeeper) WriteProgress(progress int64) error {
self.Progress = progress
return nil
}

// OverflowEventFetcherBuilder builder to hold info about eventhook context.
type OverflowEventFetcherBuilder struct {
OverflowState *OverflowState
Expand All @@ -21,6 +40,7 @@ type OverflowEventFetcherBuilder struct {
EndAtCurrentHeight bool
EndIndex uint64
ProgressFile string
ProgressRW ProgressReaderWriter
NumberOfWorkers int
EventBatchSize uint64
ReturnWriterFunction bool
Expand Down Expand Up @@ -96,6 +116,13 @@ func (o *OverflowState) FetchEventsWithResult(opts ...OverflowEventFetcherOption
}
e.FromIndex = oldHeight
}
} else if e.ProgressRW != nil {
oldHeight, err := e.ProgressRW.ReadProgress()
if err != nil {
res.Error = fmt.Errorf("could not parse progress file as block height %v", err)
return res
}
e.FromIndex = oldHeight
}

endIndex := e.EndIndex
Expand Down Expand Up @@ -148,10 +175,25 @@ func (o *OverflowState) FetchEventsWithResult(opts ...OverflowEventFetcherOption
}
}

progressWriter := func() error {
return writeProgressToFile(e.ProgressFile, endIndex+1)
}
if e.ProgressFile != "" {

progressWriter := func() error {
return writeProgressToFile(e.ProgressFile, int64(endIndex+1))
}
if e.ReturnWriterFunction {
res.ProgressWriteFunction = progressWriter
} else {
err := progressWriter()
if err != nil {
res.Error = fmt.Errorf("could not write progress to file %v", err)
return res
}
}
} else if e.ProgressRW != nil {
progressWriter := func() error {
return e.ProgressRW.WriteProgress(int64(endIndex + 1))
}

if e.ReturnWriterFunction {
res.ProgressWriteFunction = progressWriter
} else {
Expand Down Expand Up @@ -263,6 +305,16 @@ func WithTrackProgressIn(fileName string) OverflowEventFetcherOption {
}
}

// track what block we have read since last run in a file
func WithTrackProgress(progressReaderWriter ProgressReaderWriter) OverflowEventFetcherOption {
return func(e *OverflowEventFetcherBuilder) {
e.ProgressRW = progressReaderWriter
e.EndIndex = 0
e.FromIndex = 0
e.EndAtCurrentHeight = true
}
}

// track what block we have read since last run in a file
func WithReturnProgressWriter() OverflowEventFetcherOption {
return func(e *OverflowEventFetcherBuilder) {
Expand Down
26 changes: 26 additions & 0 deletions event_fetcher_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ func TestIntegrationEventFetcher(t *testing.T) {
assert.Equal(t, float64(10), marshalTo.BlockEventData.Amount)
})

t.Run("Fetch last write progress in memory that exists and marshal events", func(t *testing.T) {

imr := &InMemoryProgressKeeper{Progress: 1}

ev, err := startOverflowAndMintTokens(t).FetchEvents(
WithEvent("A.0ae53cb6e3f42a79.FlowToken.TokensMinted"),
WithTrackProgress(imr),
)
assert.NoError(t, err)
assert.Equal(t, 3, len(ev))
event := ev[0]

graffleEvent := event.ToGraffleEvent()

var eventMarshal map[string]interface{}
assert.NoError(t, event.MarshalAs(&eventMarshal))
assert.NotEmpty(t, eventMarshal)

autogold.Equal(t, graffleEvent.BlockEventData, autogold.Name("graffle-event"))
var marshalTo MarketEvent
assert.NoError(t, graffleEvent.MarshalAs(&marshalTo))
assert.Equal(t, float64(10), marshalTo.BlockEventData.Amount)

assert.Equal(t, int64(9), imr.Progress)
})

t.Run("Return progress writer ", func(t *testing.T) {
progressFile := "progress"

Expand Down
2 changes: 1 addition & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func exists(path string) (bool, error) {
return true, err
}

func writeProgressToFile(fileName string, blockHeight uint64) error {
func writeProgressToFile(fileName string, blockHeight int64) error {

err := os.WriteFile(fileName, []byte(fmt.Sprintf("%d", blockHeight)), 0644)

Expand Down

0 comments on commit f28b188

Please sign in to comment.