Skip to content

Commit

Permalink
BlockService redirects when does not have the round (algorand#2002)
Browse files Browse the repository at this point in the history
Block service will redirect the http block request to another http peer if it does not have the round.
  • Loading branch information
algonautshant authored Mar 26, 2021
1 parent a5cf664 commit b27b1cc
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 79 deletions.
32 changes: 19 additions & 13 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ func MakeResumedCatchpointCatchupService(ctx context.Context, node CatchpointCat
net: net,
ledger: l,
config: cfg,
blocksDownloadPeerSelector: makePeerSelector(
net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
}),
}
service.lastBlockHeader, err = l.BlockHdr(l.Latest())
if err != nil {
Expand All @@ -116,7 +110,7 @@ func MakeResumedCatchpointCatchupService(ctx context.Context, node CatchpointCat
if err != nil {
return nil, err
}

service.initDownloadPeerSelector()
return service, nil
}

Expand All @@ -138,17 +132,12 @@ func MakeNewCatchpointCatchupService(catchpoint string, node CatchpointCatchupNo
net: net,
ledger: l,
config: cfg,
blocksDownloadPeerSelector: makePeerSelector(
net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
}),
}
service.lastBlockHeader, err = l.BlockHdr(l.Latest())
if err != nil {
return nil, err
}
service.initDownloadPeerSelector()
return service, nil
}

Expand Down Expand Up @@ -713,3 +702,20 @@ func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(aquiredBlocks
cs.stats.AcquiredBlocks = uint64(int64(cs.stats.AcquiredBlocks) + aquiredBlocksDelta)
cs.stats.VerifiedBlocks = uint64(int64(cs.stats.VerifiedBlocks) + verifiedBlocksDelta)
}

func (cs *CatchpointCatchupService) initDownloadPeerSelector() {
if cs.config.EnableCatchupFromArchiveServers {
cs.blocksDownloadPeerSelector = makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
})
} else {
cs.blocksDownloadPeerSelector = makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays},
})
}
}
10 changes: 5 additions & 5 deletions catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {

// Create a network and block service
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(config.GetDefaultLocal(), remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.TestingLog(b), config.GetDefaultLocal(), remote, net, "test genesisID")
nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
Expand All @@ -60,11 +60,11 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {

for i := 0; i < b.N; i++ {
inMem := true
local, err := data.LoadLedger(logging.Base(), b.Name()+"empty"+strconv.Itoa(i), inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
local, err := data.LoadLedger(logging.TestingLog(b), b.Name()+"empty"+strconv.Itoa(i), inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
require.NoError(b, err)

// Make Service
syncer := MakeService(logging.Base(), defaultConfig, net, local, new(mockedAuthenticator), nil)
syncer := MakeService(logging.TestingLog(b), defaultConfig, net, local, new(mockedAuthenticator), nil)
b.StartTimer()
syncer.Start()
for w := 0; w < 1000; w++ {
Expand Down Expand Up @@ -146,10 +146,10 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
emptyLedger, err = data.LoadLedger(logging.Base(), t.Name()+"empty", inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), t.Name()+"empty", inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
require.NoError(t, err)

ledger, err = datatest.FabricateLedger(logging.Base(), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks))
ledger, err = datatest.FabricateLedger(logging.TestingLog(t), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks))
require.NoError(t, err)
require.Equal(t, ledger.LastRound(), emptyLedger.LastRound()+basics.Round(numBlocks))
return ledger, emptyLedger, release, genesisBalances
Expand Down
74 changes: 52 additions & 22 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
peer, getPeerErr := peerSelector.GetNextPeer()
if getPeerErr != nil {
s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from")
s.net.RequestConnectOutgoing(true, s.ctx.Done())
s.net.RequestConnectOutgoing(true, s.ctx.Done())
continue
}

Expand Down Expand Up @@ -657,34 +657,64 @@ func (s *Service) handleUnsupportedRound(nextUnsupportedRound basics.Round) {

func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
var peerClasses []peerClass
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialFourthPriority, peerClass: network.PeersConnectedIn},
if s.cfg.EnableCatchupFromArchiveServers {
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialFourthPriority, peerClass: network.PeersConnectedIn},
}
} else {
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
}
}
} else {
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
if s.cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialFourthPriority, peerClass: network.PeersPhonebookArchivers},
}
} else {
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookArchivers},
}
}
}
} else {
if s.cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialFourthPriority, peerClass: network.PeersPhonebookArchivers},
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersConnectedIn},
}
} else {
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
}
}
} else {
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookArchivers},
if s.cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
{initialRank: peerRankInitialThirdPriority, peerClass: network.PeersPhonebookRelays},
}
} else {
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
}
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestServiceFetchBlocksSameRange(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestPeriodicSync(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestServiceFetchBlocksOneBlock(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestAbruptWrites(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestServiceFetchBlocksMalformed(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -541,7 +541,7 @@ func helperTestOnSwitchToUnSupportedProtocol(

// Create a network and block service
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(config, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), config, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down Expand Up @@ -729,7 +729,7 @@ func TestCatchupUnmatchedCertificate(t *testing.T) {
// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, remote, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down
7 changes: 2 additions & 5 deletions catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"errors"
"fmt"
"net/http"
"path"
"strconv"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -80,7 +78,7 @@ func (uf *universalBlockFetcher) fetchBlock(ctx context.Context, round basics.Ro
} else {
return nil, nil, time.Duration(0), fmt.Errorf("fetchBlock: UniversalFetcher only supports HTTPPeer and UnicastPeer")
}
downloadDuration = time.Now().Sub(blockDownloadStartTime)
downloadDuration = time.Now().Sub(blockDownloadStartTime)
if err != nil {
return nil, nil, time.Duration(0), err
}
Expand Down Expand Up @@ -211,7 +209,7 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
return nil, err
}

parsedURL.Path = hf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/block/"+strconv.FormatUint(uint64(r), 36)))
parsedURL.Path = rpcs.FormatBlockQuery(uint64(r), parsedURL.Path, hf.net)
blockURL := parsedURL.String()
hf.log.Debugf("block GET %#v peer %#v %T", blockURL, hf.peer, hf.peer)
request, err := http.NewRequest("GET", blockURL, nil)
Expand Down Expand Up @@ -272,4 +270,3 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
func (hf *HTTPFetcher) address() string {
return hf.rootURL
}

7 changes: 3 additions & 4 deletions catchup/universalFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
func TestUGetBlockWs(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableCatchupFromArchiveServers = true

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
Expand All @@ -48,7 +47,7 @@ func TestUGetBlockWs(t *testing.T) {
net := &httpTestPeerSource{}

up := makeTestUnicastPeer(net, t)
ls := rpcs.MakeBlockService(blockServiceConfig, ledger, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")
ls.Start()

fetcher := makeUniversalBlockFetcher(logging.TestingLog(t), net, cfg)
Expand Down Expand Up @@ -76,7 +75,6 @@ func TestUGetBlockWs(t *testing.T) {
func TestUGetBlockHttp(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableCatchupFromArchiveServers = true

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
Expand All @@ -86,9 +84,10 @@ func TestUGetBlockHttp(t *testing.T) {

blockServiceConfig := config.GetDefaultLocal()
blockServiceConfig.EnableBlockService = true
blockServiceConfig.EnableBlockServiceFallbackToArchiver = false

net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, ledger, net, "test genesisID")
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,17 @@ type Local struct {
// connections that are originating from the local machine. Setting this to "true", allow to create large
// local-machine networks that won't trip the incoming connection limit observed by relays.
DisableLocalhostConnectionRateLimit bool `version[16]:"true"`

// BlockServiceCustomFallbackEndpoints is a comma delimited list of endpoints which the block service uses to
// redirect the http requests to in case it does not have the round. If it is not specified, will check
// EnableBlockServiceFallbackToArchiver.
BlockServiceCustomFallbackEndpoints string `version[16]:""`

// EnableBlockServiceFallbackToArchiver controls whether the block service redirects the http requests to
// an archiver or return StatusNotFound (404) when in does not have the requested round, and
// BlockServiceCustomFallbackEndpoints is empty.
// The archiver is randomly selected, if none is available, will return StatusNotFound (404).
EnableBlockServiceFallbackToArchiver bool `version[16]:"true"`
}

// Filenames of config files within the configdir (e.g. ~/.algorand)
Expand Down
2 changes: 2 additions & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var defaultLocal = Local{
AnnounceParticipationKey: true,
Archival: false,
BaseLoggerDebugLevel: 4,
BlockServiceCustomFallbackEndpoints: "",
BroadcastConnectionsLimit: -1,
CadaverSizeTarget: 1073741824,
CatchpointFileHistoryLength: 365,
Expand All @@ -47,6 +48,7 @@ var defaultLocal = Local{
EnableAgreementTimeMetrics: false,
EnableAssembleStats: false,
EnableBlockService: false,
EnableBlockServiceFallbackToArchiver: true,
EnableCatchupFromArchiveServers: false,
EnableDeveloperAPI: false,
EnableGossipBlockService: true,
Expand Down
2 changes: 2 additions & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"AnnounceParticipationKey": true,
"Archival": false,
"BaseLoggerDebugLevel": 4,
"BlockServiceCustomFallbackEndpoints": "",
"BroadcastConnectionsLimit": -1,
"CadaverSizeTarget": 1073741824,
"CatchpointFileHistoryLength": 365,
Expand All @@ -26,6 +27,7 @@
"EnableAgreementTimeMetrics": false,
"EnableAssembleStats": false,
"EnableBlockService": false,
"EnableBlockServiceFallbackToArchiver": true,
"EnableCatchupFromArchiveServers": false,
"EnableDeveloperAPI": false,
"EnableGossipBlockService": true,
Expand Down
2 changes: 1 addition & 1 deletion network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ func (wn *WebsocketNetwork) getDNSAddrs(dnsBootstrap string) (relaysAddresses []
}
relaysAddresses = nil
}
if wn.config.EnableCatchupFromArchiveServers {
if wn.config.EnableCatchupFromArchiveServers || wn.config.EnableBlockServiceFallbackToArchiver {
archiverAddresses, err = tools_network.ReadFromSRV("archive", "tcp", dnsBootstrap, wn.config.FallbackDNSResolverAddress, wn.config.DNSSecuritySRVEnforced())
if err != nil {
// only log this warning on testnet or devnet
Expand Down
Loading

0 comments on commit b27b1cc

Please sign in to comment.