Skip to content
This repository has been archived by the owner on Nov 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2072 from bandprotocol/db_handle_resolve_request
Browse files Browse the repository at this point in the history
Handle resolve request for emitter/flusher.
  • Loading branch information
Benzbeeb authored Jul 1, 2020
2 parents 410e0d4 + 6bf8b26 commit 3da9263
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_UNRELEASED.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Chain

- (impv) [\#2072](https://github.com/bandprotocol/bandchain/pull/2072) Handle resolve request for emitter/flusher.
- (feat) [\#2111](https://github.com/bandprotocol/bandchain/pull/2111) Introduce the notion of active validators who are performing oracle tasks.
- (bug) [\#2110](https://github.com/bandprotocol/bandchain/pull/2074) Set `bandoracled` max capacity of event subscription channel.
- (impv) [\#2106](https://github.com/bandprotocol/bandchain/pull/2106) Implement emitter handle MsgCreateDataSource/OracleScript.
Expand Down
6 changes: 5 additions & 1 deletion chain/emitter/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ func (app *App) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
"balance": app.BankKeeper.GetCoins(app.DeliverContext, acc).String(),
})
}
// TODO: Handle end block events

for _, event := range res.Events {
app.handleBeginBlockEndBlockEvent(event)
}

app.Write("COMMIT", JsDict{"height": req.Height})
return res
}
Expand Down
25 changes: 21 additions & 4 deletions chain/emitter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package emitter

import (
sdk "github.com/cosmos/cosmos-sdk/types"
abci "github.com/tendermint/tendermint/abci/types"

"github.com/bandprotocol/bandchain/chain/x/oracle"
)

// handleMsg handles the given message by publishing relevant events and populates accounts
// that need balance update in 'app.accs'. Also fills in extra info for this message.
func (app *App) handleMsg(txHash []byte, msg sdk.Msg, log sdk.ABCIMessageLog, extra JsDict) {
func parseEvents(events sdk.StringEvents) EvMap {
evMap := make(EvMap)
for _, event := range log.Events {
for _, event := range events {
for _, kv := range event.Attributes {
key := event.Type + "." + kv.Key
evMap[key] = append(evMap[key], kv.Value)
}
}
return evMap
}

// handleMsg handles the given message by publishing relevant events and populates accounts
// that need balance update in 'app.accs'. Also fills in extra info for this message.
func (app *App) handleMsg(txHash []byte, msg sdk.Msg, log sdk.ABCIMessageLog, extra JsDict) {
evMap := parseEvents(log.Events)
switch msg := msg.(type) {
case oracle.MsgRequestData:
app.handleMsgRequestData(txHash, msg, evMap, extra)
Expand All @@ -27,3 +33,14 @@ func (app *App) handleMsg(txHash []byte, msg sdk.Msg, log sdk.ABCIMessageLog, ex
app.handleMsgCreateOracleScript(txHash, msg, evMap, extra)
}
}

func (app *App) handleBeginBlockEndBlockEvent(event abci.Event) {
events := sdk.StringifyEvents([]abci.Event{event})
evMap := parseEvents(events)
switch event.Type {
case oracle.EventTypeRequestExecute:
app.handleEventRequestExecute(evMap)
default:
break
}
}
14 changes: 14 additions & 0 deletions chain/emitter/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (app *App) handleMsgRequestData(
"min_count": msg.MinCount,
"sender": msg.Sender.String(),
"client_id": msg.ClientID,
"resolve_status": types.ResolveStatus_Open,
})
es := evMap[types.EventTypeRawRequest+"."+types.AttributeKeyExternalID]
ds := evMap[types.EventTypeRawRequest+"."+types.AttributeKeyDataSourceID]
Expand Down Expand Up @@ -99,3 +100,16 @@ func (app *App) handleMsgCreateOracleScript(
})
extra["id"] = id
}

// handleEventRequestExecute implements emitter handler for EventRequestExecute.
func (app *App) handleEventRequestExecute(evMap EvMap) {
id := types.RequestID(atoi(evMap[types.EventTypeRequestExecute+"."+types.AttributeKeyRequestID][0]))
result := app.OracleKeeper.MustGetResult(app.DeliverContext, id)
app.Write("UPDATE_REQUEST", JsDict{
"id": id,
"request_time": result.ResponsePacketData.RequestTime,
"resolve_time": result.ResponsePacketData.ResolveTime,
"resolve_status": result.ResponsePacketData.ResolveStatus,
"result": result.ResponsePacketData.Result,
})
}
2 changes: 2 additions & 0 deletions chain/x/oracle/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (
NewQuerier = keeper.NewQuerier
ModuleCdc = types.ModuleCdc
RegisterCodec = types.RegisterCodec

EventTypeRequestExecute = types.EventTypeRequestExecute
)

type (
Expand Down
20 changes: 20 additions & 0 deletions flusher/flusher/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import base64 as b64
from datetime import datetime
import sqlalchemy as sa
import enum


class ResolveStatus(enum.Enum):
Open = 0
Success = 1
Failure = 2
Expired = 3


class CustomResolveStatus(sa.types.TypeDecorator):

impl = sa.Enum(ResolveStatus)

def process_bind_param(self, value, dialect):
return ResolveStatus(value)


class CustomDateTime(sa.types.TypeDecorator):
Expand Down Expand Up @@ -110,6 +126,10 @@ def Column(*args, **kwargs):
Column("min_count", sa.Integer),
Column("sender", sa.String),
Column("client_id", sa.String),
Column("request_time", sa.Integer, nullable=True),
Column("resolve_status", CustomResolveStatus),
Column("resolve_time", sa.Integer, nullable=True),
Column("result", CustomBase64, nullable=True),
)

raw_requests = sa.Table(
Expand Down
6 changes: 6 additions & 0 deletions flusher/flusher/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ def handle_new_oracle_script(self, msg):
def handle_new_request(self, msg):
self.conn.execute(requests.insert(), msg)

def handle_update_request(self, msg):
condition = True
for col in requests.primary_key.columns.values():
condition = (col == msg[col.name]) & condition
self.conn.execute(requests.update().where(condition).values(**msg))

def handle_new_raw_request(self, msg):
self.conn.execute(raw_requests.insert(), msg)

Expand Down

0 comments on commit 3da9263

Please sign in to comment.