Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockService redirects when does not have the round #2002

Merged
7 changes: 2 additions & 5 deletions catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"errors"
"fmt"
"net/http"
"path"
"strconv"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -80,7 +78,7 @@ func (uf *universalBlockFetcher) fetchBlock(ctx context.Context, round basics.Ro
} else {
return nil, nil, time.Duration(0), fmt.Errorf("fetchBlock: UniversalFetcher only supports HTTPPeer and UnicastPeer")
}
downloadDuration = time.Now().Sub(blockDownloadStartTime)
downloadDuration = time.Now().Sub(blockDownloadStartTime)
if err != nil {
return nil, nil, time.Duration(0), err
}
Expand Down Expand Up @@ -211,7 +209,7 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
return nil, err
}

parsedURL.Path = hf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/block/"+strconv.FormatUint(uint64(r), 36)))
parsedURL.Path = rpcs.FormatBlockQuery(uint64(r), parsedURL.Path, hf.net)
blockURL := parsedURL.String()
hf.log.Debugf("block GET %#v peer %#v %T", blockURL, hf.peer, hf.peer)
request, err := http.NewRequest("GET", blockURL, nil)
Expand Down Expand Up @@ -272,4 +270,3 @@ func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data
func (hf *HTTPFetcher) address() string {
return hf.rootURL
}

3 changes: 1 addition & 2 deletions catchup/universalFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
func TestUGetBlockWs(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableCatchupFromArchiveServers = true

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
Expand Down Expand Up @@ -76,7 +75,6 @@ func TestUGetBlockWs(t *testing.T) {
func TestUGetBlockHttp(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableCatchupFromArchiveServers = true

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
Expand All @@ -86,6 +84,7 @@ func TestUGetBlockHttp(t *testing.T) {

blockServiceConfig := config.GetDefaultLocal()
blockServiceConfig.EnableBlockService = true
blockServiceConfig.EnableBlockServiceFallbackToArchiver = false

net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(blockServiceConfig, ledger, net, "test genesisID")
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,17 @@ type Local struct {
// connections that are originating from the local machine. Setting this to "true", allow to create large
// local-machine networks that won't trip the incoming connection limit observed by relays.
DisableLocalhostConnectionRateLimit bool `version[16]:"true"`

// BlockServiceCustomFallbackEndpoints is a comma delimited list of endpoints which the block service uses to
// redirect the http requests to in case it does not have the round. If it is not specified, will check
// EnableBlockServiceFallbackToArchiver.
BlockServiceCustomFallbackEndpoints string `version[16]:""`

// EnableBlockServiceFallbackToArchiver controls whether the block service redirects the http requests to
// an archiver or return StatusNotFound (404) when in does not have the requested round, and
// BlockServiceCustomFallbackEndpoints is empty.
// The archiver is randomly selected, if none is available, will return StatusNotFound (404).
EnableBlockServiceFallbackToArchiver bool `version[16]:"true"`
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
}

// Filenames of config files within the configdir (e.g. ~/.algorand)
Expand Down
2 changes: 2 additions & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var defaultLocal = Local{
AnnounceParticipationKey: true,
Archival: false,
BaseLoggerDebugLevel: 4,
BlockServiceCustomFallbackEndpoints: "",
BroadcastConnectionsLimit: -1,
CadaverSizeTarget: 1073741824,
CatchpointFileHistoryLength: 365,
Expand All @@ -47,6 +48,7 @@ var defaultLocal = Local{
EnableAgreementTimeMetrics: false,
EnableAssembleStats: false,
EnableBlockService: false,
EnableBlockServiceFallbackToArchiver: true,
EnableCatchupFromArchiveServers: false,
EnableDeveloperAPI: false,
EnableGossipBlockService: true,
Expand Down
2 changes: 2 additions & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"AnnounceParticipationKey": true,
"Archival": false,
"BaseLoggerDebugLevel": 4,
"BlockServiceCustomFallbackEndpoints": "",
"BroadcastConnectionsLimit": -1,
"CadaverSizeTarget": 1073741824,
"CatchpointFileHistoryLength": 365,
Expand All @@ -26,6 +27,7 @@
"EnableAgreementTimeMetrics": false,
"EnableAssembleStats": false,
"EnableBlockService": false,
"EnableBlockServiceFallbackToArchiver": true,
"EnableCatchupFromArchiveServers": false,
"EnableDeveloperAPI": false,
"EnableGossipBlockService": true,
Expand Down
92 changes: 90 additions & 2 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"context"
"encoding/binary"
"net/http"
"path"
"strconv"
"strings"

"github.com/gorilla/mux"

"github.com/algorand/go-codec/codec"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
Expand Down Expand Up @@ -66,6 +69,8 @@ type BlockService struct {
net network.GossipNode
enableService bool
enableServiceOverGossip bool
fallbackEndpoints fallbackEndpoints
enableArchiverFallback bool
}

// EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate
Expand All @@ -84,6 +89,11 @@ type PreEncodedBlockCert struct {
Certificate codec.Raw `codec:"cert"`
}

type fallbackEndpoints struct {
endpoints []string
lastUsed int
}

// MakeBlockService creates a BlockService around the provider Ledger and registers it for HTTP callback on the block serving path
func MakeBlockService(config config.Local, ledger *data.Ledger, net network.GossipNode, genesisID string) *BlockService {
service := &BlockService{
Expand All @@ -93,6 +103,8 @@ func MakeBlockService(config config.Local, ledger *data.Ledger, net network.Goss
net: net,
enableService: config.EnableBlockService,
enableServiceOverGossip: config.EnableGossipBlockService,
fallbackEndpoints: makeFallbackEndpoints(config.BlockServiceCustomFallbackEndpoints),
enableArchiverFallback: config.EnableBlockServiceFallbackToArchiver,
}
if service.enableService {
net.RegisterHTTPHandler(BlockServiceBlockPath, service)
Expand Down Expand Up @@ -189,8 +201,11 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
switch err.(type) {
case ledgercore.ErrNoEntry:
// entry cound not be found.
response.Header().Set("Cache-Control", blockResponseMissingBlockCacheControl)
response.WriteHeader(http.StatusNotFound)
ok := bs.redirectRequest(round, response, request)
if !ok {
response.Header().Set("Cache-Control", blockResponseMissingBlockCacheControl)
response.WriteHeader(http.StatusNotFound)
}
return
default:
// unexpected error.
Expand Down Expand Up @@ -285,6 +300,58 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc
return
}

// redirectRequest redirects the request to the next round robin fallback endpoing if available, otherwise,
// if EnableBlockServiceFallbackToArchiver is enabled, redirects to a random archiver.
func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWriter, request *http.Request) (ok bool) {

algonautshant marked this conversation as resolved.
Show resolved Hide resolved
peerAddress := bs.getNextCustomFallbackEndpoint()
if peerAddress == "" && bs.enableArchiverFallback {
peerAddress = bs.getRandomArchiver()
}
if peerAddress == "" {
return false
}

parsedURL, err := network.ParseHostOrURL(peerAddress)
if err != nil {
logging.Base().Debugf("redirectRequest: %s", err.Error())
return false
}
parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net)
http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect)
logging.Base().Debugf("redirectRequest: redirected block request to %s", parsedURL.String())
return true
}

// getNextCustomFallbackEndpoint returns the next custorm fallback endpoint in RR ordering
func (bs *BlockService) getNextCustomFallbackEndpoint() (endpointAddress string) {
if len(bs.fallbackEndpoints.endpoints) == 0 {
return
}
endpointAddress = bs.fallbackEndpoints.endpoints[bs.fallbackEndpoints.lastUsed]
bs.fallbackEndpoints.lastUsed = (bs.fallbackEndpoints.lastUsed + 1) % len(bs.fallbackEndpoints.endpoints)
return
}

// getRandomArchiver returns a random archiver address
func (bs *BlockService) getRandomArchiver() (endpointAddress string) {
peers := bs.net.GetPeers(network.PeersPhonebookArchivers)
httpPeers := make([]network.HTTPPeer, 0, len(peers))

for _, peer := range peers {
httpPeer, validHTTPPeer := peer.(network.HTTPPeer)
if validHTTPPeer {
httpPeers = append(httpPeers, httpPeer)
}
}
if len(httpPeers) == 0 {
return
}
randIndex := crypto.RandUint64() % uint64(len(httpPeers))
endpointAddress = httpPeers[randIndex].GetAddress()
return
}

func topicBlockBytes(dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics {
blk, cert, err := dataLedger.EncodedBlockCert(round)
if err != nil {
Expand Down Expand Up @@ -326,3 +393,24 @@ func RawBlockBytes(l *data.Ledger, round basics.Round) ([]byte, error) {
Certificate: cert,
}), nil
}

// FormatBlockQuery formats a block request query for the given network and round number
func FormatBlockQuery(round uint64, parsedURL string, net network.GossipNode) string {
return net.SubstituteGenesisID(path.Join(parsedURL, "/v1/{genesisID}/block/"+strconv.FormatUint(uint64(round), 36)))
}

func makeFallbackEndpoints(customFallbackEndpoints string) (fe fallbackEndpoints) {
if customFallbackEndpoints == "" {
return
}
endpoints := strings.Split(customFallbackEndpoints, ",")
for _, ep := range endpoints {
parsed, err := network.ParseHostOrURL(ep)
if err != nil {
logging.Base().Warnf("makeFallbackEndpoints: error parsing %s %s", ep, err.Error())
continue
}
fe.endpoints = append(fe.endpoints, parsed.String())
}
return
}
Loading