Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
cwgoes committed Jun 23, 2018
1 parent f62d665 commit 226de3b
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 202 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## TBD

BREAKING CHANGES:
- [abci]
- \#272 ResponseCheckTx and ResponseDeliverTx now include an error string
- \#273 ResponseCheckTx, ResponseDeliverTx, ResponseBeginBlock, and ResponseEndBlock
now include a list of events instead of a list of tags. Each event is itself
a list of tags, allowing for inclusion of multiple distinct events in each response.

BUG FIXES:

- [rpc] limited number of HTTP/WebSocket connections
Expand Down
15 changes: 1 addition & 14 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (app *KVStoreApplication) DeliverTx(tx []byte) types.ResponseDeliverTx {
{[]byte("app.creator"), []byte("jae")},
{[]byte("app.key"), key},
}
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Tags: tags}
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: []types.Event{types.Event{Tags: tags}}}
}

func (app *KVStoreApplication) CheckTx(tx []byte) types.ResponseCheckTx {
Expand Down
8 changes: 4 additions & 4 deletions abci/types/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func TestMarshalJSON(t *testing.T) {
Code: 1,
Data: []byte("hello"),
GasWanted: 43,
Tags: []cmn.KVPair{
Events: []Event{Event{[]cmn.KVPair{
{[]byte("pho"), []byte("bo")},
},
}}},
}
b, err = json.Marshal(&r1)
assert.Nil(t, err)
Expand Down Expand Up @@ -82,9 +82,9 @@ func TestWriteReadMessage2(t *testing.T) {
Data: []byte(phrase),
Log: phrase,
GasWanted: 10,
Tags: []cmn.KVPair{
Events: []Event{Event{[]cmn.KVPair{
cmn.KVPair{[]byte("abc"), []byte("def")},
},
}}},
// Fee: cmn.KI64Pair{
},
// TODO: add the rest
Expand Down
317 changes: 177 additions & 140 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

38 changes: 22 additions & 16 deletions abci/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,37 @@ message ResponseQuery {
}

message ResponseBeginBlock {
repeated common.KVPair tags = 1 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
repeated Event events = 1 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
}

message ResponseCheckTx {
uint32 code = 1;
bytes data = 2;
string log = 3; // nondeterministic
string info = 4; // nondeterministic
int64 gas_wanted = 5;
int64 gas_used = 6;
repeated common.KVPair tags = 7 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
common.KI64Pair fee = 8 [(gogoproto.nullable)=false];
string error = 2;
bytes data = 3;
string log = 4; // nondeterministic
string info = 5; // nondeterministic
int64 gas_wanted = 6;
int64 gas_used = 7;
repeated Event events = 8 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
common.KI64Pair fee = 9 [(gogoproto.nullable)=false];
}

message ResponseDeliverTx {
uint32 code = 1;
bytes data = 2;
string log = 3; // nondeterministic
string info = 4; // nondeterministic
int64 gas_wanted = 5;
int64 gas_used = 6;
repeated common.KVPair tags = 7 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
common.KI64Pair fee = 8 [(gogoproto.nullable)=false];
string error = 2;
bytes data = 3;
string log = 4; // nondeterministic
string info = 5; // nondeterministic
int64 gas_wanted = 6;
int64 gas_used = 7;
repeated Event events = 8 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
common.KI64Pair fee = 9 [(gogoproto.nullable)=false];
}

message ResponseEndBlock {
repeated Validator validator_updates = 1 [(gogoproto.nullable)=false];
ConsensusParams consensus_param_updates = 2;
repeated common.KVPair tags = 3 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
repeated Event events = 3 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
}

message ResponseCommit {
Expand Down Expand Up @@ -264,6 +266,10 @@ message Evidence {
int64 total_voting_power = 5;
}

message Event {
repeated common.KVPair tags = 1 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
}

//----------------------------------------
// Service Definition

Expand Down
20 changes: 12 additions & 8 deletions state/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
hash := result.Tx.Hash()

// index tx by tags
for _, tag := range result.Result.Tags {
if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
storeBatch.Set(keyForTag(tag, result), hash)
for index, evt := range result.Result.Events {
for _, tag := range evt.Tags {
if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
storeBatch.Set(keyForTag(tag, result, index), hash)
}
}
}

Expand All @@ -108,9 +110,11 @@ func (txi *TxIndex) Index(result *types.TxResult) error {
hash := result.Tx.Hash()

// index tx by tags
for _, tag := range result.Result.Tags {
if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
b.Set(keyForTag(tag, result), hash)
for index, evt := range result.Result.Events {
for _, tag := range evt.Tags {
if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
b.Set(keyForTag(tag, result, index), hash)
}
}
}

Expand Down Expand Up @@ -417,8 +421,8 @@ func extractValueFromKey(key []byte) string {
return parts[1]
}

func keyForTag(tag cmn.KVPair, result *types.TxResult) []byte {
return []byte(fmt.Sprintf("%s/%s/%d/%d", tag.Key, tag.Value, result.Height, result.Index))
func keyForTag(tag cmn.KVPair, result *types.TxResult, index int) []byte {
return []byte(fmt.Sprintf("%s/%s/%d/%d/%d", tag.Key, tag.Value, result.Height, result.Index, index))
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
41 changes: 22 additions & 19 deletions types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,36 +85,39 @@ func (b *EventBus) PublishEventVote(event EventDataVote) error {
return b.Publish(EventVote, event)
}

// PublishEventTx publishes tx event with tags from Result. Note it will add
// PublishEventTx publishes tx events with tags from Result. Note it will add
// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
// will be overwritten.
func (b *EventBus) PublishEventTx(event EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

tags := make(map[string]string)

// validate and fill tags from tx result
for _, tag := range event.Result.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx)
continue
for _, evt := range event.Result.Events {
tags := make(map[string]string)

// validate and fill tags from tx result
for _, tag := range evt.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx)
continue
}
tags[string(tag.Key)] = string(tag.Value)
}
tags[string(tag.Key)] = string(tag.Value)
}

// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventTx
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventTx

logIfTagExists(TxHashKey, tags, b.Logger)
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())
logIfTagExists(TxHashKey, tags, b.Logger)
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())

logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", event.Height)
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", event.Height)

b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags))
}

b.pubsub.PublishWithTags(ctx, event, tmpubsub.NewTagMap(tags))
return nil
}

Expand Down

0 comments on commit 226de3b

Please sign in to comment.