Skip to content

Commit

Permalink
Kaleido - logging, 128Kb Tx, Azure RAFT
Browse files Browse the repository at this point in the history
* custom debug logs for consensus, increase transaction size limit to
128kb
* do not create wal dir in quorum code (for it to work in azure)
  • Loading branch information
Vinod Damle authored and Vinod Damle committed Dec 17, 2020
1 parent af75251 commit c6ed3ed
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 12 deletions.
4 changes: 3 additions & 1 deletion consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package core

import (
"reflect"

"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
Expand All @@ -33,6 +33,7 @@ func (c *core) sendCommitForOldBlock(view *istanbul.View, digest common.Hash) {
View: view,
Digest: digest,
}
c.logger.Debug(fmt.Sprintf("=====> Sending commmit for old block, address: %v, hash: %v, block round: %v, block sequence: %v", c.Address(), digest, view.Round, view.Sequence))
c.broadcastCommit(sub)
}

Expand Down Expand Up @@ -74,6 +75,7 @@ func (c *core) handleCommit(msg *message, src istanbul.Validator) error {
// by committing the proposal without PREPARE messages.
if c.current.Commits.Size() >= c.QuorumSize() && c.state.Cmp(StateCommitted) < 0 {
// Still need to call LockHash here since state can skip Prepared state and jump directly to the Committed state.
c.logger.Debug(fmt.Sprintf("=====> Committing to a proposal and locking hash due to 2f+1 commits msgs, address: %v, hash: %v, state: %v", c.Address(), commit.Digest, c.state.String()))
c.current.LockHash()
c.commit()
}
Expand Down
7 changes: 6 additions & 1 deletion consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"math/big"
"sync"
"time"

"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -259,6 +259,8 @@ func (c *core) startNewRound(round *big.Int) {
c.sendPreprepare(r)
} else if c.current.pendingRequest != nil {
c.sendPreprepare(c.current.pendingRequest)
} else {
logger.Info("=====> No locked proposal and pending request")
}
}
c.newRoundChangeTimer()
Expand Down Expand Up @@ -287,11 +289,14 @@ func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.Valid
// Lock only if both roundChange is true and it is locked
if roundChange && c.current != nil {
if c.current.IsHashLocked() {
c.logger.Debug(fmt.Sprintf("=====> Setting new round state with locked hash: %v, and pending request", c.current.GetLockedHash()))
c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal)
} else {
c.logger.Debug(fmt.Sprintf("=====> Setting new round state with pending request"))
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest, c.backend.HasBadProposal)
}
} else {
c.logger.Debug(fmt.Sprintf("=====> Setting new round state with no pending request"))
c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil, c.backend.HasBadProposal)
}
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ func (c *core) handleTimeoutMsg() {
if !c.waitingForRoundChange {
maxRound := c.roundChangeSet.MaxRound(c.valSet.F() + 1)
if maxRound != nil && maxRound.Cmp(c.current.Round()) > 0 {
c.logger.Info(fmt.Sprintf("=====>catching up to max round, calling sendRoundChange. max round - %v, current round - %v, seq - %v", maxRound, c.current.Round(), c.current.Sequence()))
c.sendRoundChange(maxRound)
return
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ package core

import (
"reflect"

"fmt"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)

func (c *core) sendPrepare() {
logger := c.logger.New("state", c.state)

sub := c.current.Subject()
encodedSubject, err := Encode(sub)
if err != nil {
logger.Error("Failed to encode", "subject", sub)
return
}
logger.Debug(fmt.Sprintf("=====>Broadcast Prepare, address: %v", c.Address()))
c.broadcast(&message{
Code: msgPrepare,
Msg: encodedSubject,
Expand Down Expand Up @@ -61,6 +61,7 @@ func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
// and we are in earlier state before Prepared state.
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() >= c.QuorumSize()) &&
c.state.Cmp(StatePrepared) < 0 {
c.logger.Debug(fmt.Sprintf("=====>Prepared, due to 2f+1 prepare or commit msgs, address: %v", c.Address()))
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()
Expand Down
6 changes: 5 additions & 1 deletion consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package core

import (
"time"

"fmt"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
)
Expand All @@ -36,6 +36,7 @@ func (c *core) sendPreprepare(request *istanbul.Request) {
logger.Error("Failed to encode", "view", curView)
return
}
logger.Debug(fmt.Sprintf("=====> Broadcast preprepare, address: %v", c.Address()))
c.broadcast(&message{
Code: msgPreprepare,
Msg: preprepare,
Expand Down Expand Up @@ -103,17 +104,20 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
if c.current.IsHashLocked() {
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
// Broadcast COMMIT and enters Prepared state directly
logger.Debug(fmt.Sprintf("=====> Prepared, preprepare proposal == locked hash, address: %v, hash: %v", c.Address(), c.current.Proposal()))
c.acceptPreprepare(preprepare)
c.setState(StatePrepared)
c.sendCommit()
} else {
// Send round change
logger.Info(fmt.Sprintf("====>Pre-prepare: proposed hash not matching current locked hash. round - %v, seq - %v", c.current.Round(), c.current.Sequence()))
c.sendNextRoundChange()
}
} else {
// Either
// 1. the locked proposal and the received proposal match
// 2. we have no locked proposal
logger.Debug(fmt.Sprintf("=====> Prepared, no locked proposal, address: %v", c.Address()))
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/core/roundchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"fmt"
"math/big"
"sync"

Expand Down Expand Up @@ -90,6 +91,7 @@ func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
return err
}

logger.Debug(fmt.Sprintf("=====> Handling round change msg, current round: %v, current sequence: %v, total number of round change msg for the round %v, sequence %v, is %v, waiting for round change? %v, current F() is %v", cv.Round, cv.Sequence, roundView.Round, roundView.Sequence, num, c.waitingForRoundChange, c.valSet.F()))
// Once we received f+1 ROUND CHANGE messages, those messages form a weak certificate.
// If our round number is smaller than the certificate's round number, we would
// try to catch up the round number.
Expand Down
1 change: 0 additions & 1 deletion core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ func (st *StateTransition) TransitionDb() (ret []byte, usedGas uint64, failed bo
st.state.AddBalance(st.evm.Coinbase, new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.gasPrice))
return nil, 0, false, nil
}

ret, leftoverGas, vmerr = evm.Call(sender, to, data, st.gas, st.value)
}
if vmerr != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ var DefaultTxPoolConfig = TxPoolConfig{
Lifetime: 3 * time.Hour,

// Quorum
TransactionSizeLimit: 64,
TransactionSizeLimit: 128,
MaxCodeSize: 24,
}

Expand Down
17 changes: 12 additions & 5 deletions raft/wal.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package raft

import (
"os"

"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
Expand All @@ -11,9 +9,18 @@ import (

func (pm *ProtocolManager) openWAL(maybeRaftSnapshot *raftpb.Snapshot) *wal.WAL {
if !wal.Exist(pm.waldir) {
if err := os.Mkdir(pm.waldir, 0750); err != nil {
fatalf("cannot create waldir: %s", err)
}
// On a CIFS filesytem we need to use the 'wal_windows' implementation in the
// version of Raft included in Quorum. This performs a rename of raft-wal.tmp
// to raft-wal as art of wal.Create()
// As of Go 1.8, in UNIX/Docker you cannot rename a directory to replace a directory
// see - https://golang.org/doc/go1.8
//
// As such, we do not create the waldir in the Quorum layer, but rather leave
// it to the wal code itself to create it.
//
// if err := os.Mkdir(pm.waldir, 0750); err != nil {
// fatalf("cannot create waldir: %s", err)
// }

wal, err := wal.Create(pm.waldir, nil)
if err != nil {
Expand Down

0 comments on commit c6ed3ed

Please sign in to comment.