diff --git a/changelog/21460.txt b/changelog/21460.txt new file mode 100644 index 000000000000..a43a4dac17d8 --- /dev/null +++ b/changelog/21460.txt @@ -0,0 +1,3 @@ +```release-note:feature +storage/raft: Add experimental support for raft-wal, a new backend engine for integrated storage. +``` diff --git a/go.mod b/go.mod index 06e2583d25ec..bcf3a8cf008b 100644 --- a/go.mod +++ b/go.mod @@ -119,14 +119,15 @@ require ( github.com/hashicorp/golang-lru v1.0.2 github.com/hashicorp/hcl v1.0.1-vault-5 github.com/hashicorp/hcl/v2 v2.16.2 - github.com/hashicorp/hcp-link v0.1.0 - github.com/hashicorp/hcp-scada-provider v0.2.1 + github.com/hashicorp/hcp-link v0.2.1 + github.com/hashicorp/hcp-scada-provider v0.2.2 github.com/hashicorp/hcp-sdk-go v0.75.0 github.com/hashicorp/nomad/api v0.0.0-20230519153805-2275a83cbfdf github.com/hashicorp/raft v1.6.0 github.com/hashicorp/raft-autopilot v0.2.0 github.com/hashicorp/raft-boltdb/v2 v2.3.0 github.com/hashicorp/raft-snapshot v1.0.4 + github.com/hashicorp/raft-wal v0.4.0 github.com/hashicorp/vault-hcp-lib v0.0.0-20231208101417-1123df6d540b github.com/hashicorp/vault-plugin-auth-alicloud v0.16.1 github.com/hashicorp/vault-plugin-auth-azure v0.16.2 @@ -293,6 +294,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.18.7 // indirect github.com/aws/smithy-go v1.13.5 // indirect github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect + github.com/benbjohnson/immutable v0.4.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/speakeasy v0.1.0 // indirect github.com/boltdb/bolt v1.3.1 // indirect @@ -312,10 +314,12 @@ require ( github.com/containerd/containerd v1.7.12 // indirect github.com/containerd/continuity v0.4.2 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/coreos/go-oidc v2.2.1+incompatible // indirect github.com/coreos/go-oidc/v3 v3.5.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf // indirect github.com/couchbase/gocb/v2 v2.6.3 // indirect github.com/couchbase/gocbcore/v10 v10.2.3 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect @@ -472,6 +476,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/segmentio/fasthash v1.0.3 // indirect github.com/sergi/go-diff v1.1.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/go.sum b/go.sum index c447d5648041..8c7e21243a42 100644 --- a/go.sum +++ b/go.sum @@ -1216,6 +1216,8 @@ github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/immutable v0.4.0 h1:CTqXbEerYso8YzVPxmWxh2gnoRQbbB9X1quUC8+vGZA= +github.com/benbjohnson/immutable v0.4.0/go.mod h1:iAr8OjJGLnLmVUr9MZ/rz4PWUy6Ouc2JLYuMArmvAJM= github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -1462,6 +1464,8 @@ github.com/containers/ocicrypt v1.1.6/go.mod h1:WgjxPWdTJMqYMjf3M6cuIFFA1/MpyyhI github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= +github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-iptables v0.4.5/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= github.com/coreos/go-iptables v0.5.0/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= @@ -1487,6 +1491,8 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf h1:GOPo6vn/vTN+3IwZBvXX0y5doJfSC7My0cdzelyOCsQ= +github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/couchbase/gocb/v2 v2.6.3 h1:5RsMo+RRfK0mVxHLAfpBz3/tHlgXZb1WBNItLk9Ab+c= github.com/couchbase/gocb/v2 v2.6.3/go.mod h1:yF5F6BHTZ/ZowhEuZbySbXrlI4rHd1TIhm5azOaMbJU= github.com/couchbase/gocbcore/v10 v10.2.3 h1:PEkRSNSkKjUBXx82Ucr094+anoiCG5GleOOQZOHo6D4= @@ -2217,10 +2223,10 @@ github.com/hashicorp/hcl v1.0.1-vault-5 h1:kI3hhbbyzr4dldA8UdTb7ZlVVlI2DACdCfz31 github.com/hashicorp/hcl v1.0.1-vault-5/go.mod h1:XYhtn6ijBSAj6n4YqAaf7RBPS4I06AItNorpy+MoQNM= github.com/hashicorp/hcl/v2 v2.16.2 h1:mpkHZh/Tv+xet3sy3F9Ld4FyI2tUpWe9x3XtPx9f1a0= github.com/hashicorp/hcl/v2 v2.16.2/go.mod h1:JRmR89jycNkrrqnMmvPDMd56n1rQJ2Q6KocSLCMCXng= -github.com/hashicorp/hcp-link v0.1.0 h1:F6F1cpADc+o5EBI5CbJn5RX4qdFSLpuA4fN69eeE5lQ= -github.com/hashicorp/hcp-link v0.1.0/go.mod h1:BWVDuJDHrKJtWc5qI07bX5xlLjSgWq6kYLQUeG1g5dM= -github.com/hashicorp/hcp-scada-provider v0.2.1 h1:yr+Uxini7SWTZ2t49d3Xi+6+X/rbsSFx8gq6WVcC91c= -github.com/hashicorp/hcp-scada-provider v0.2.1/go.mod h1:Q0WpS2RyhBKOPD4X/8oW7AJe7jA2HXB09EwDzwRTao0= +github.com/hashicorp/hcp-link v0.2.1 h1:8w4YVJxRb2C7oXN+hCPSyDbBeo7RQsIYTR6nQXJt6f8= +github.com/hashicorp/hcp-link v0.2.1/go.mod h1:6otT7bD+nBW1cyzgz8Z4BPziZfwxTtAEkYUrF/MOT8o= +github.com/hashicorp/hcp-scada-provider v0.2.2 h1:S4Kz+Vc02XOz/5Sm9Gug6ivfyfgchM6qv48cgz0uRls= +github.com/hashicorp/hcp-scada-provider v0.2.2/go.mod h1:Q0WpS2RyhBKOPD4X/8oW7AJe7jA2HXB09EwDzwRTao0= github.com/hashicorp/hcp-sdk-go v0.75.0 h1:5SLvNpcTeZnG7YnwWIaZlqCottFCGKldEIQnaYjOIq8= github.com/hashicorp/hcp-sdk-go v0.75.0/go.mod h1:5GwdT+HGhEQsh4n1yK+RADnQkfOo6vHgr2BpYUt2t9U= github.com/hashicorp/jsonapi v0.0.0-20210826224640-ee7dae0fb22d h1:9ARUJJ1VVynB176G1HCwleORqCaXm/Vx0uUi0dL26I0= @@ -2252,6 +2258,8 @@ github.com/hashicorp/raft-boltdb/v2 v2.3.0 h1:fPpQR1iGEVYjZ2OELvUHX600VAK5qmdnDE github.com/hashicorp/raft-boltdb/v2 v2.3.0/go.mod h1:YHukhB04ChJsLHLJEUD6vjFyLX2L3dsX3wPBZcX4tmc= github.com/hashicorp/raft-snapshot v1.0.4 h1:EuDuayAJPdiDmVk1ygTDnG2zDzrs0/6/yBuma1IYSow= github.com/hashicorp/raft-snapshot v1.0.4/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic= +github.com/hashicorp/raft-wal v0.4.0 h1:oHCQLPa3gBTrfuBVHaDg2b/TVXpU0RIyeH/mU9ovk3Y= +github.com/hashicorp/raft-wal v0.4.0/go.mod h1:A6vP5o8hGOs1LHfC1Okh9xPwWDcmb6Vvuz/QyqUXlOE= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY= github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= @@ -2990,6 +2998,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= +github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= +github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sethvargo/go-limiter v0.7.1 h1:wWNhTj0pxjyJ7wuJHpRJpYwJn+bUnjYfw2a85eu5w9U= diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 3323f987df8b..19329b67d5f7 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -6,6 +6,7 @@ package raft import ( "bytes" "context" + "encoding/binary" "encoding/hex" "errors" "fmt" @@ -26,6 +27,7 @@ import ( "github.com/hashicorp/go-raftchunking" "github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/raft" + "github.com/hashicorp/raft-wal/verifier" "github.com/hashicorp/vault/sdk/helper/jsonutil" "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/plugin/pb" @@ -36,6 +38,7 @@ const ( putOp restoreCallbackOp getOp + verifierCheckpointOp chunkingPrefix = "raftchunking/" databaseFilename = "vault.db" @@ -58,6 +61,12 @@ var ( _ raft.BatchingFSM = (*FSM)(nil) ) +var logVerifierMagicBytes [8]byte + +func init() { + binary.LittleEndian.PutUint64(logVerifierMagicBytes[:], verifier.ExtensionMagicPrefix) +} + type restoreCallback func(context.Context) error type FSMEntry struct { @@ -76,6 +85,69 @@ type FSMApplyResponse struct { EntrySlice []*FSMEntry } +type logVerificationChunkingShim struct { + chunker *raftchunking.ChunkingBatchingFSM +} + +// Apply implements raft.BatchingFSM. +func (s *logVerificationChunkingShim) Apply(l *raft.Log) interface{} { + return s.ApplyBatch([]*raft.Log{l})[0] +} + +// ApplyBatch implements raft.BatchingFSM +func (s *logVerificationChunkingShim) ApplyBatch(logs []*raft.Log) []interface{} { + // This is a hack because raftchunking doesn't play nicely with lower-level + // usage of Extensions field like we need for LogStore verification. + + // When we write a verifier log, we write a single byte that consists of the verifierCheckpointOp, + // and then we encode the verifier.ExtensionMagicPrefix into the raft log + // Extensions field. Both of those together should ensure that verifier + // raft logs can never be mistaken for chunked protobufs. See the docs on + // verifier.ExtensionMagicPrefix for the reasoning behind the specific value + // that was chosen, and how it ensures this property. + + // So here, we need to check for the exact conditions that we encoded when we wrote the + // verifier log out. If they match, we're going to insert a dummy raft log. We do this because 1) we + // don't want the chunking FSM to blow up on our verifier op that it won't understand and + // 2) we need to preserve the length of the incoming slice of raft logs because raft expects + // the length of the return value to match 1:1 to the length of the input operations. + newBatch := make([]*raft.Log, 0, len(logs)) + + for _, l := range logs { + if s.isVerifierLog(l) { + // Replace checkpoint with an empty op, but keep the index and term so + // downstream FSMs don't get confused about having a 0 index suddenly. + newBatch = append(newBatch, &raft.Log{ + Index: l.Index, + Term: l.Term, + AppendedAt: l.AppendedAt, + }) + } else { + newBatch = append(newBatch, l) + } + } + + return s.chunker.ApplyBatch(newBatch) +} + +// Snapshot implements raft.BatchingFSM +func (s *logVerificationChunkingShim) Snapshot() (raft.FSMSnapshot, error) { + return s.chunker.Snapshot() +} + +// Restore implements raft.BatchingFSM +func (s *logVerificationChunkingShim) Restore(snapshot io.ReadCloser) error { + return s.chunker.Restore(snapshot) +} + +func (s *logVerificationChunkingShim) RestoreState(state *raftchunking.State) error { + return s.chunker.RestoreState(state) +} + +func (s *logVerificationChunkingShim) isVerifierLog(l *raft.Log) bool { + return isRaftLogVerifyCheckpoint(l) +} + // FSM is Vault's primary state storage. It writes updates to a bolt db file // that lives on local disk. FSM implements raft.FSM and physical.Backend // interfaces. @@ -103,7 +175,7 @@ type FSM struct { // retoreCb is called after we've restored a snapshot restoreCb restoreCallback - chunker *raftchunking.ChunkingBatchingFSM + chunker *logVerificationChunkingShim localID string desiredSuffrage string @@ -134,10 +206,12 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) { localID: localID, } - f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{ - f: f, - ctx: context.Background(), - }) + f.chunker = &logVerificationChunkingShim{ + chunker: raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{ + f: f, + ctx: context.Background(), + }), + } dbPath := filepath.Join(path, databaseFilename) f.l.Lock() @@ -608,11 +682,16 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} { switch l.Type { case raft.LogCommand: command := &LogData{} - err := proto.Unmarshal(l.Data, command) - if err != nil { - f.logger.Error("error proto unmarshaling log data", "error", err) - panic("error proto unmarshaling log data") + + // explicitly check for zero length Data, which will be the case for verifier no-ops + if len(l.Data) > 0 { + err := proto.Unmarshal(l.Data, command) + if err != nil { + f.logger.Error("error proto unmarshaling log data", "error", err, "data", l.Data) + panic("error proto unmarshaling log data") + } } + commands = append(commands, command) case raft.LogConfiguration: configuration := raft.DecodeConfiguration(l.Data) @@ -659,6 +738,7 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} { entrySlice := make([]*FSMEntry, 0) switch command := commandRaw.(type) { case *LogData: + // empty logs will have a zero length slice of Operations, so this loop will be a no-op for _, op := range command.Operations { var err error switch op.OpType { diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 86b6b3958fec..55d6fd3c96f2 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -4,12 +4,12 @@ package raft import ( + "bytes" "context" "crypto/tls" "errors" "fmt" "io" - "io/ioutil" "math/rand" "net/url" "os" @@ -32,6 +32,9 @@ import ( autopilot "github.com/hashicorp/raft-autopilot" raftboltdb "github.com/hashicorp/raft-boltdb/v2" snapshot "github.com/hashicorp/raft-snapshot" + raftwal "github.com/hashicorp/raft-wal" + walmetrics "github.com/hashicorp/raft-wal/metrics" + "github.com/hashicorp/raft-wal/verifier" "github.com/hashicorp/vault/helper/metricsutil" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/jsonutil" @@ -89,10 +92,13 @@ var ( // This is used to reduce disk I/O for the recently committed entries. raftLogCacheSize = 512 - raftState = "raft/" - peersFileName = "peers.json" - restoreOpDelayDuration = 5 * time.Second - defaultMaxEntrySize = uint64(2 * raftchunking.ChunkSize) + raftState = "raft/" + raftWalDir = "wal/" + peersFileName = "peers.json" + restoreOpDelayDuration = 5 * time.Second + defaultMaxEntrySize = uint64(2 * raftchunking.ChunkSize) + defaultRaftLogVerificationInterval = 60 * time.Second + minimumRaftLogVerificationInterval = 10 * time.Second GetInTxnDisabledError = errors.New("get operations inside transactions are disabled in raft backend") ) @@ -142,6 +148,18 @@ type RaftBackend struct { // startup. bootstrapConfig *raft.Configuration + // closers is a list of managed resource (such as stores above or wrapper + // layers around them). That should have Close called on them when the backend + // is closed. We need to take care that each distinct object is closed only + // once which might involve knowing how wrappers to stores work. For example + // raft wal verifier wraps LogStore and is an io.Closer but it also closes the + // underlying LogStore so if we add it here we shouldn't also add the actual + // LogStore or StableStore if it's the same underlying instance. We could use + // a map[io.Closer]bool to prevent double registrations, but that doesn't + // solve the problem of "knowing" whether or not calling Close on some wrapper + // also calls "Close" on it's underlying. + closers []io.Closer + // dataDir is the location on the local filesystem that raft and FSM data // will be stored. dataDir string @@ -218,6 +236,11 @@ type RaftBackend struct { effectiveSDKVersion string failGetInTxn *uint32 + + // raftLogVerifierEnabled and raftLogVerificationInterval control enabling the raft log verifier and how often + // it writes checkpoints. + raftLogVerifierEnabled bool + raftLogVerificationInterval time.Duration } // LeaderJoinInfo contains information required by a node to join itself as a @@ -275,6 +298,25 @@ type LeaderJoinInfo struct { TLSConfig *tls.Config `json:"-"` } +type RaftBackendConfig struct { + Path string + NodeId string + ApplyDelay time.Duration + RaftWal bool + RaftLogVerifierEnabled bool + RaftLogVerificationInterval time.Duration + SnapshotDelay time.Duration + MaxEntrySize uint64 + MaxBatchEntries int + MaxBatchSize int + AutopilotReconcileInterval time.Duration + AutopilotUpdateInterval time.Duration + AutopilotUpgradeVersion string + AutopilotRedundancyZone string + RaftNonVoter bool + RetryJoin string +} + // JoinConfig returns a list of information about possible leader nodes that // this node can join as a follower func (b *RaftBackend) JoinConfig() ([]*LeaderJoinInfo, error) { @@ -396,219 +438,159 @@ func batchLimitsFromEnv(logger log.Logger) (int, int) { // NewRaftBackend constructs a RaftBackend using the given directory func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { - path := os.Getenv(EnvVaultRaftPath) - if path == "" { - pathFromConfig, ok := conf["path"] - if !ok { - return nil, fmt.Errorf("'path' must be set") - } - path = pathFromConfig - } - - var localID string - { - // Determine the local node ID from the environment. - if raftNodeID := os.Getenv(EnvVaultRaftNodeID); raftNodeID != "" { - localID = raftNodeID - } - - // If not set in the environment check the configuration file. - if len(localID) == 0 { - localID = conf["node_id"] - } - - // If not set in the config check the "node-id" file. - if len(localID) == 0 { - localIDRaw, err := ioutil.ReadFile(filepath.Join(path, "node-id")) - switch { - case err == nil: - if len(localIDRaw) > 0 { - localID = string(localIDRaw) - } - case os.IsNotExist(err): - default: - return nil, err - } - } - - // If all of the above fails generate a UUID and persist it to the - // "node-id" file. - if len(localID) == 0 { - id, err := uuid.GenerateUUID() - if err != nil { - return nil, err - } - - if err := ioutil.WriteFile(filepath.Join(path, "node-id"), []byte(id), 0o600); err != nil { - return nil, err - } - - localID = id - } + // parse the incoming map into a proper config struct + backendConfig, err := parseRaftBackendConfig(conf, logger) + if err != nil { + return nil, fmt.Errorf("error parsing config: %w", err) } // Create the FSM. - fsm, err := NewFSM(path, localID, logger.Named("fsm")) + fsm, err := NewFSM(backendConfig.Path, backendConfig.NodeId, logger.Named("fsm")) if err != nil { return nil, fmt.Errorf("failed to create fsm: %v", err) } - if delayRaw, ok := conf["apply_delay"]; ok { - delay, err := parseutil.ParseDurationSecond(delayRaw) - if err != nil { - return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err) - } + if backendConfig.ApplyDelay > 0 { fsm.applyCallback = func() { - time.Sleep(delay) + time.Sleep(backendConfig.ApplyDelay) } } // Build an all in-memory setup for dev mode, otherwise prepare a full // disk-based setup. - var log raft.LogStore - var stable raft.StableStore - var snap raft.SnapshotStore + var logStore raft.LogStore + var stableStore raft.StableStore + var snapStore raft.SnapshotStore + var closers []io.Closer var devMode bool if devMode { store := raft.NewInmemStore() - stable = store - log = store - snap = raft.NewInmemSnapshotStore() + stableStore = store + logStore = store + snapStore = raft.NewInmemSnapshotStore() } else { // Create the base raft path. - path := filepath.Join(path, raftState) - if err := EnsurePath(path, true); err != nil { + raftBasePath := filepath.Join(backendConfig.Path, raftState) + if err := EnsurePath(raftBasePath, true); err != nil { return nil, err } + dbPath := filepath.Join(raftBasePath, "raft.db") - // Create the backend raft store for logs and stable storage. - dbPath := filepath.Join(path, "raft.db") - opts := etcdboltOptions(dbPath) - raftOptions := raftboltdb.Options{ - Path: dbPath, - BoltOptions: opts, - MsgpackUseNewTimeFormat: true, - } - store, err := raftboltdb.New(raftOptions) + // If the existing raft db exists from a previous use of BoltDB, warn about this and continue to use BoltDB + raftDbExists, err := fileExists(dbPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to check if raft.db already exists: %w", err) } - stable = store - - // Wrap the store in a LogCache to improve performance. - cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) - if err != nil { - return nil, err + if backendConfig.RaftWal && raftDbExists { + logger.Warn("raft is configured to use raft-wal for storage but existing raft.db detected. raft-wal config will be ignored.") + backendConfig.RaftWal = false } - log = cacheStore - // Create the snapshot store. - snapshots, err := NewBoltSnapshotStore(path, logger.Named("snapshot"), fsm) - if err != nil { - return nil, err - } - snap = snapshots - } + if backendConfig.RaftWal { + raftWalPath := filepath.Join(raftBasePath, raftWalDir) + if err := EnsurePath(raftWalPath, true); err != nil { + return nil, err + } - if delayRaw, ok := conf["snapshot_delay"]; ok { - delay, err := parseutil.ParseDurationSecond(delayRaw) - if err != nil { - return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err) - } - snap = newSnapshotStoreDelay(snap, delay, logger) - } + mc := walmetrics.NewGoMetricsCollector([]string{"raft", "wal"}, nil, nil) + wal, err := raftwal.Open(raftWalPath, raftwal.WithMetricsCollector(mc)) + if err != nil { + return nil, fmt.Errorf("fail to open write-ahead-log: %w", err) + } + // We need to Close the store but don't register it in closers yet because + // if we are going to wrap it with a verifier we need to close through + // that instead. - maxEntrySize := defaultMaxEntrySize - if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 { - i, err := strconv.Atoi(maxEntrySizeCfg) - if err != nil { - return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err) - } + stableStore = wal + logStore = wal + } else { + // use the traditional BoltDB setup + opts := etcdboltOptions(dbPath) + raftOptions := raftboltdb.Options{ + Path: dbPath, + BoltOptions: opts, + MsgpackUseNewTimeFormat: true, + } - maxEntrySize = uint64(i) - } + store, err := raftboltdb.New(raftOptions) + if err != nil { + return nil, err + } + // We need to Close the store but don't register it in closers yet because + // if we are going to wrap it with a verifier we need to close through + // that instead. - var reconcileInterval time.Duration - if interval := conf["autopilot_reconcile_interval"]; interval != "" { - interval, err := parseutil.ParseDurationSecond(interval) - if err != nil { - return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err) + stableStore = store + logStore = store } - reconcileInterval = interval - } - var updateInterval time.Duration - if interval := conf["autopilot_update_interval"]; interval != "" { - interval, err := parseutil.ParseDurationSecond(interval) + // Create the snapshot store. + snapshots, err := NewBoltSnapshotStore(raftBasePath, logger.Named("snapshot"), fsm) if err != nil { - return nil, fmt.Errorf("autopilot_update_interval does not parse as a duration: %w", err) + return nil, err } - updateInterval = interval + snapStore = snapshots } - effectiveReconcileInterval := autopilot.DefaultReconcileInterval - effectiveUpdateInterval := autopilot.DefaultUpdateInterval - - if reconcileInterval != 0 { - effectiveReconcileInterval = reconcileInterval - } - if updateInterval != 0 { - effectiveUpdateInterval = updateInterval + // Hook up the verifier if it's enabled + if backendConfig.RaftLogVerifierEnabled { + mc := walmetrics.NewGoMetricsCollector([]string{"raft", "logstore", "verifier"}, nil, nil) + reportFn := makeLogVerifyReportFn(logger.Named("raft.logstore.verifier")) + v := verifier.NewLogStore(logStore, isLogVerifyCheckpoint, reportFn, mc) + logStore = v } - if effectiveReconcileInterval < effectiveUpdateInterval { - return nil, fmt.Errorf("autopilot_reconcile_interval (%v) should be larger than autopilot_update_interval (%v)", effectiveReconcileInterval, effectiveUpdateInterval) + // Register the logStore as a closer whether or not it's wrapped in a verifier + // (which is a closer). We do this before the LogCache since that is _not_ an + // io.Closer. + if closer, ok := logStore.(io.Closer); ok { + closers = append(closers, closer) } + // Note that we DON'T register the stableStore as a closer because right now + // we always use the same underlying object as the logStore and we don't want + // to call close on it twice. If we ever support different stable store and + // log store then this logic will get even more complex! We don't register + // snapStore because none of our snapshot stores are io.Closers. - var upgradeVersion string - if uv, ok := conf["autopilot_upgrade_version"]; ok && uv != "" { - upgradeVersion = uv - _, err := goversion.NewVersion(upgradeVersion) - if err != nil { - return nil, fmt.Errorf("autopilot_upgrade_version does not parse as a semantic version: %w", err) - } - } + // Close the FSM + closers = append(closers, fsm) - var nonVoter bool - if v := os.Getenv(EnvVaultRaftNonVoter); v != "" { - // Consistent with handling of other raft boolean env vars - // VAULT_RAFT_AUTOPILOT_DISABLE and VAULT_RAFT_FREELIST_SYNC - nonVoter = true - } else if v, ok := conf[raftNonVoterConfigKey]; ok { - nonVoter, err = strconv.ParseBool(v) - if err != nil { - return nil, fmt.Errorf("failed to parse %s config value %q as a boolean: %w", raftNonVoterConfigKey, v, err) - } + // Wrap the store in a LogCache to improve performance. + cacheStore, err := raft.NewLogCache(raftLogCacheSize, logStore) + if err != nil { + return nil, err } + logStore = cacheStore - if nonVoter && conf["retry_join"] == "" { - return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey) + if backendConfig.SnapshotDelay > 0 { + snapStore = newSnapshotStoreDelay(snapStore, backendConfig.SnapshotDelay, logger) } - maxBatchEntries, maxBatchSize := batchLimitsFromEnv(logger) - return &RaftBackend{ - logger: logger, - fsm: fsm, - raftInitCh: make(chan struct{}), - conf: conf, - logStore: log, - stableStore: stable, - snapStore: snap, - dataDir: path, - localID: localID, - permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), - maxEntrySize: maxEntrySize, - maxBatchEntries: maxBatchEntries, - maxBatchSize: maxBatchSize, - followerHeartbeatTicker: time.NewTicker(time.Second), - autopilotReconcileInterval: reconcileInterval, - autopilotUpdateInterval: updateInterval, - redundancyZone: conf["autopilot_redundancy_zone"], - nonVoter: nonVoter, - upgradeVersion: upgradeVersion, - failGetInTxn: new(uint32), + logger: logger, + fsm: fsm, + raftInitCh: make(chan struct{}), + conf: conf, + logStore: logStore, + stableStore: stableStore, + snapStore: snapStore, + closers: closers, + dataDir: backendConfig.Path, + localID: backendConfig.NodeId, + permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), + maxEntrySize: backendConfig.MaxEntrySize, + maxBatchEntries: backendConfig.MaxBatchEntries, + maxBatchSize: backendConfig.MaxBatchSize, + followerHeartbeatTicker: time.NewTicker(time.Second), + autopilotReconcileInterval: backendConfig.AutopilotReconcileInterval, + autopilotUpdateInterval: backendConfig.AutopilotUpdateInterval, + redundancyZone: backendConfig.AutopilotRedundancyZone, + nonVoter: backendConfig.RaftNonVoter, + upgradeVersion: backendConfig.AutopilotUpgradeVersion, + failGetInTxn: new(uint32), + raftLogVerifierEnabled: backendConfig.RaftLogVerifierEnabled, + raftLogVerificationInterval: backendConfig.RaftLogVerificationInterval, }, nil } @@ -649,14 +631,11 @@ func (b *RaftBackend) Close() error { b.l.Lock() defer b.l.Unlock() - if err := b.fsm.Close(); err != nil { - return err - } - - if err := b.stableStore.(*raftboltdb.BoltStore).Close(); err != nil { - return err + for _, cl := range b.closers { + if err := cl.Close(); err != nil { + return err + } } - return nil } @@ -699,6 +678,20 @@ func (b *RaftBackend) EffectiveVersion() string { return version.GetVersion().Version } +func (b *RaftBackend) verificationInterval() time.Duration { + b.l.RLock() + defer b.l.RUnlock() + + return b.raftLogVerificationInterval +} + +func (b *RaftBackend) verifierEnabled() bool { + b.l.RLock() + defer b.l.RUnlock() + + return b.raftLogVerifierEnabled +} + // DisableUpgradeMigration returns the state of the DisableUpgradeMigration config flag and whether it was set or not func (b *RaftBackend) DisableUpgradeMigration() (bool, bool) { b.l.RLock() @@ -711,16 +704,132 @@ func (b *RaftBackend) DisableUpgradeMigration() (bool, bool) { return b.autopilotConfig.DisableUpgradeMigration, true } +// StartRaftWalVerifier runs a raft log store verifier in the background, if configured to do so. +// This periodically writes out special raft logs to verify that the log store is not corrupting data. +// This is only safe to run on the raft leader. +func (b *RaftBackend) StartRaftWalVerifier(ctx context.Context) { + if !b.verifierEnabled() { + return + } + + go func() { + ticker := time.NewTicker(b.verificationInterval()) + defer ticker.Stop() + + logger := b.logger.Named("raft-wal-verifier") + + for { + select { + case <-ticker.C: + err := b.applyVerifierCheckpoint() + if err != nil { + logger.Error("error applying verification checkpoint", "error", err) + } + logger.Debug("sent verification checkpoint") + case <-ctx.Done(): + return + } + } + }() +} + +func (b *RaftBackend) applyVerifierCheckpoint() error { + data := make([]byte, 1) + data[0] = byte(verifierCheckpointOp) + + b.permitPool.Acquire() + b.l.RLock() + + var err error + applyFuture := b.raft.Apply(data, 0) + if e := applyFuture.Error(); e != nil { + err = e + } + + b.l.RUnlock() + b.permitPool.Release() + + return err +} + +// isLogVerifyCheckpoint is the verifier.IsCheckpointFn that can decode our raft logs for +// their type. +func isLogVerifyCheckpoint(l *raft.Log) (bool, error) { + return isRaftLogVerifyCheckpoint(l), nil +} + +func makeLogVerifyReportFn(logger log.Logger) verifier.ReportFn { + return func(r verifier.VerificationReport) { + if r.SkippedRange != nil { + logger.Warn("verification skipped range, consider decreasing validation interval if this is frequent", + "rangeStart", int64(r.SkippedRange.Start), + "rangeEnd", int64(r.SkippedRange.End), + ) + } + + l2 := logger.With( + "rangeStart", int64(r.Range.Start), + "rangeEnd", int64(r.Range.End), + "leaderChecksum", fmt.Sprintf("%08x", r.ExpectedSum), + "elapsed", r.Elapsed, + ) + + if r.Err == nil { + l2.Info("verification checksum OK", + "readChecksum", fmt.Sprintf("%08x", r.ReadSum), + ) + return + } + + if errors.Is(r.Err, verifier.ErrRangeMismatch) { + l2.Warn("verification checksum skipped as we don't have all logs in range") + return + } + + var csErr verifier.ErrChecksumMismatch + if errors.As(r.Err, &csErr) { + if r.WrittenSum > 0 && r.WrittenSum != r.ExpectedSum { + // The failure occurred before the follower wrote to the log so it + // must be corrupted in flight from the leader! + l2.Error("verification checksum FAILED: in-flight corruption", + "followerWriteChecksum", fmt.Sprintf("%08x", r.WrittenSum), + "readChecksum", fmt.Sprintf("%08x", r.ReadSum), + ) + } else { + l2.Error("verification checksum FAILED: storage corruption", + "followerWriteChecksum", fmt.Sprintf("%08x", r.WrittenSum), + "readChecksum", fmt.Sprintf("%08x", r.ReadSum), + ) + } + return + } + + // Some other unknown error occurred + l2.Error(r.Err.Error()) + } +} + func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) { var stats map[string]string + var logStoreStats *etcdbolt.Stats + b.l.RLock() - logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats() - fsmStats := b.fsm.Stats() + if boltStore, ok := b.stableStore.(*raftboltdb.BoltStore); ok { + bss := boltStore.Stats() + logStoreStats = &bss + } + if b.raft != nil { stats = b.raft.Stats() } + + fsmStats := b.fsm.Stats() b.l.RUnlock() - b.collectEtcdBoltMetricsWithStats(logstoreStats, sink, "logstore") + + if logStoreStats != nil { + b.collectEtcdBoltMetricsWithStats(*logStoreStats, sink, "logstore") + } + b.collectMetricsWithStats(fsmStats, sink, "fsm") labels := []metrics.Label{ { @@ -728,6 +837,7 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) { Value: b.localID, }, } + if stats != nil { for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} { n, err := strconv.ParseUint(stats[key], 10, 64) @@ -1996,6 +2106,188 @@ func (l *RaftLock) Value() (bool, string, error) { return true, value, nil } +func fileExists(name string) (bool, error) { + _, err := os.Stat(name) + if err == nil { + // File exists! + return true, nil + } + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + // We hit some other error trying to stat the file which leaves us in an + // unknown state so we can't proceed. + return false, err +} + +func parseRaftBackendConfig(conf map[string]string, logger log.Logger) (*RaftBackendConfig, error) { + c := &RaftBackendConfig{} + + c.Path = conf["path"] + envPath := os.Getenv(EnvVaultRaftPath) + if envPath != "" { + c.Path = envPath + } + + if c.Path == "" { + return nil, fmt.Errorf("'path' must be set") + } + + c.NodeId = conf["node_id"] + envNodeId := os.Getenv(EnvVaultRaftNodeID) + if envNodeId != "" { + c.NodeId = envNodeId + } + + if c.NodeId == "" { + localIDRaw, err := os.ReadFile(filepath.Join(c.Path, "node-id")) + if err == nil && len(localIDRaw) > 0 { + c.NodeId = string(localIDRaw) + } + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, err + } + } + + if c.NodeId == "" { + id, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + + if err = os.WriteFile(filepath.Join(c.Path, "node-id"), []byte(id), 0o600); err != nil { + return nil, err + } + + c.NodeId = id + } + + if delayRaw, ok := conf["apply_delay"]; ok { + delay, err := parseutil.ParseDurationSecond(delayRaw) + if err != nil { + return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err) + } + + c.ApplyDelay = delay + } + + if walRaw, ok := conf["raft_wal"]; ok { + useRaftWal, err := strconv.ParseBool(walRaw) + if err != nil { + return nil, fmt.Errorf("raft_wal does not parse as a boolean: %w", err) + } + + c.RaftWal = useRaftWal + } + + if rlveRaw, ok := conf["raft_log_verifier_enabled"]; ok { + rlve, err := strconv.ParseBool(rlveRaw) + if err != nil { + return nil, fmt.Errorf("raft_log_verifier_enabled does not parse as a boolean: %w", err) + } + c.RaftLogVerifierEnabled = rlve + + c.RaftLogVerificationInterval = defaultRaftLogVerificationInterval + if rlviRaw, ok := conf["raft_log_verification_interval"]; ok { + rlvi, err := parseutil.ParseDurationSecond(rlviRaw) + if err != nil { + return nil, fmt.Errorf("raft_log_verification_interval does not parse as a duration: %w", err) + } + + // Make sure our interval is capped to a reasonable value, so e.g. people don't use 0s or 1s + if rlvi >= minimumRaftLogVerificationInterval { + c.RaftLogVerificationInterval = rlvi + } else { + logger.Warn("raft_log_verification_interval is less than the minimum allowed, using default instead", + "given", rlveRaw, + "minimum", minimumRaftLogVerificationInterval, + "default", defaultRaftLogVerificationInterval) + } + } + } + + if delayRaw, ok := conf["snapshot_delay"]; ok { + delay, err := parseutil.ParseDurationSecond(delayRaw) + if err != nil { + return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err) + } + c.SnapshotDelay = delay + } + + c.MaxEntrySize = defaultMaxEntrySize + if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 { + i, err := strconv.Atoi(maxEntrySizeCfg) + if err != nil { + return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err) + } + + c.MaxEntrySize = uint64(i) + } + + c.MaxBatchEntries, c.MaxBatchSize = batchLimitsFromEnv(logger) + + if interval := conf["autopilot_reconcile_interval"]; interval != "" { + interval, err := parseutil.ParseDurationSecond(interval) + if err != nil { + return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err) + } + c.AutopilotReconcileInterval = interval + } + + if interval := conf["autopilot_update_interval"]; interval != "" { + interval, err := parseutil.ParseDurationSecond(interval) + if err != nil { + return nil, fmt.Errorf("autopilot_update_interval does not parse as a duration: %w", err) + } + c.AutopilotUpdateInterval = interval + } + + effectiveReconcileInterval := autopilot.DefaultReconcileInterval + effectiveUpdateInterval := autopilot.DefaultUpdateInterval + + if c.AutopilotReconcileInterval != 0 { + effectiveReconcileInterval = c.AutopilotReconcileInterval + } + if c.AutopilotUpdateInterval != 0 { + effectiveUpdateInterval = c.AutopilotUpdateInterval + } + + if effectiveReconcileInterval < effectiveUpdateInterval { + return nil, fmt.Errorf("autopilot_reconcile_interval (%v) should be larger than autopilot_update_interval (%v)", effectiveReconcileInterval, effectiveUpdateInterval) + } + + if uv, ok := conf["autopilot_upgrade_version"]; ok && uv != "" { + _, err := goversion.NewVersion(uv) + if err != nil { + return nil, fmt.Errorf("autopilot_upgrade_version does not parse as a semantic version: %w", err) + } + + c.AutopilotUpgradeVersion = uv + } + + c.RaftNonVoter = false + if v := os.Getenv(EnvVaultRaftNonVoter); v != "" { + // Consistent with handling of other raft boolean env vars + // VAULT_RAFT_AUTOPILOT_DISABLE and VAULT_RAFT_FREELIST_SYNC + c.RaftNonVoter = true + } else if v, ok := conf[raftNonVoterConfigKey]; ok { + nonVoter, err := strconv.ParseBool(v) + if err != nil { + return nil, fmt.Errorf("failed to parse %s config value %q as a boolean: %w", raftNonVoterConfigKey, v, err) + } + + c.RaftNonVoter = nonVoter + } + + if c.RaftNonVoter && conf["retry_join"] == "" { + return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey) + } + + c.AutopilotRedundancyZone = conf["autopilot_redundancy_zone"] + + return c, nil +} + // boltOptions returns a bolt.Options struct, suitable for passing to // bolt.Open(), pre-configured with all of our preferred defaults. func boltOptions(path string) *bolt.Options { @@ -2069,3 +2361,26 @@ func etcdboltOptions(path string) *etcdbolt.Options { return o } + +func isRaftLogVerifyCheckpoint(l *raft.Log) bool { + if !bytes.Equal(l.Data, []byte{byte(verifierCheckpointOp)}) { + return false + } + + // Single byte log with that byte value can only be a checkpoint or + // the last byte of a chunked message. If it's chunked it will have + // chunking metadata. + if len(l.Extensions) == 0 { + // No metadata, must be a checkpoint on the leader with no + // verifier metadata yet. + return true + } + + if bytes.HasPrefix(l.Extensions, logVerifierMagicBytes[:]) { + // Has verifier metadata so must be a replicated checkpoint on a follower + return true + } + + // Must be the last chunk of a chunked object that has chunking meta + return false +} diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index c634d1d8c97d..26bf0076036a 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -11,7 +11,6 @@ import ( "encoding/hex" "fmt" "io" - "io/ioutil" "math/rand" "os" "path/filepath" @@ -24,6 +23,7 @@ import ( bolt "github.com/hashicorp-forge/bbolt" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/base62" + "github.com/hashicorp/go-secure-stdlib/parseutil" "github.com/hashicorp/go-uuid" "github.com/hashicorp/raft" "github.com/hashicorp/vault/sdk/helper/jsonutil" @@ -31,6 +31,54 @@ import ( "github.com/stretchr/testify/require" ) +func testBothRaftBackends(t *testing.T, f func(raftWALValue string)) { + t.Helper() + + testCases := []struct { + name string + useWAL string + }{ + { + name: "use wal", + useWAL: "true", + }, + { + name: "use boltdb", + useWAL: "false", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f(tc.useWAL) + }) + } +} + +func testBothRaftBackendsBenchmark(b *testing.B, f func(raftWALValue string)) { + testCases := []struct { + name string + useWAL string + }{ + { + name: "use wal", + useWAL: "true", + }, + { + name: "use boltdb", + useWAL: "false", + }, + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + f(tc.useWAL) + }) + } +} + func connectPeers(nodes ...*RaftBackend) { for _, node := range nodes { for _, peer := range nodes { @@ -156,26 +204,221 @@ func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB, dataOnly bool) error { } func TestRaft_Backend(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() - physical.ExerciseBackend(t, b) + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } + + b, _ := GetRaftWithConfig(t, true, true, conf) + physical.ExerciseBackend(t, b) + }) } -func TestRaft_ParseAutopilotUpgradeVersion(t *testing.T) { - raftDir, err := ioutil.TempDir("", "vault-raft-") +// TestRaft_SwitchFromBoltDBToRaftWal is testing that we don't use raft-wal, even if configured to do so, +// if there is an existing raft.db file on disk (meaning BoltDB was previously in use). +func TestRaft_SwitchFromBoltDBToRaftWal(t *testing.T) { + tmpDir := t.TempDir() + + // configured to use raft-wal + conf := map[string]string{ + "path": tmpDir, + "trailing_logs": "100", + "raft_wal": "true", + } + + // raftBaseDir will end up looking like $tmpDir/raft + raftBaseDir := filepath.Join(tmpDir, raftState) + err := os.MkdirAll(raftBaseDir, 0o777) + if err != nil { + t.Fatal(err) + } + + // create a bogus $tmpDir/raft/raft.db file + db, err := bolt.Open(filepath.Join(raftBaseDir, "raft.db"), 0o777, nil) + if err != nil { + t.Fatal(err) + } + err = db.Close() + if err != nil { + t.Fatal(err) + } + + _, err = NewRaftBackend(conf, hclog.NewNullLogger()) + if err != nil { + t.Fatal(err) + } + + // Check to see if $tmpDir/raft/raft-wal exists. It should not, because we only create that if raft-wal is in use. + // And since raft.db already existed, we should've skipped all the raft-wal setup code. + raftWalExists, err := fileExists(filepath.Join(raftBaseDir, raftWalDir)) if err != nil { t.Fatal(err) } - defer os.RemoveAll(raftDir) + if raftWalExists { + t.Fatal("expected raft-wal dir to not exist, but it does") + } +} + +// TestRaft_VerifierEnabled is not checking to ensure that the verifier works correctly - the verifier has +// its own unit tests for that. What we're checking for here is that we've plumbed everything through correctly, +// i.e. we can stand up a raft cluster with the verifier enabled, do a bunch of raft things, let the verifier +// do its thing, and nothing blows up. +func TestRaft_VerifierEnabled(t *testing.T) { + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + "raft_log_verifier_enabled": "true", + } + + b, _ := GetRaftWithConfig(t, true, true, conf) + physical.ExerciseBackend(t, b) + + err := b.applyVerifierCheckpoint() + if err != nil { + t.Fatal(err) + } + physical.ExerciseBackend(t, b) + }) +} + +// TestRaft_ParseRaftWalBackend ensures that the raft_wal config option parses correctly and returns an error if not +func TestRaft_ParseRaftWalBackend(t *testing.T) { + raftDir := t.TempDir() + conf := map[string]string{ + "path": raftDir, + "node_id": "abc123", + "raft_wal": "notabooleanlol", + } + + _, err := NewRaftBackend(conf, hclog.NewNullLogger()) + if err == nil { + t.Fatal("expected an error but got none") + } + + if !strings.Contains(err.Error(), "does not parse as a boolean") { + t.Fatal("expected an error about parsing config keys but got none") + } +} + +// TestRaft_ParseRaftWalVerifierEnabled checks to make sure we error correctly if raft_log_verifier_enabled is not a boolean +func TestRaft_ParseRaftWalVerifierEnabled(t *testing.T) { + raftDir := t.TempDir() + conf := map[string]string{ + "path": raftDir, + "node_id": "abc123", + "raft_wal": "true", + "raft_log_verifier_enabled": "notabooleanlol", + } + + _, err := NewRaftBackend(conf, hclog.NewNullLogger()) + if err == nil { + t.Fatal("expected an error but got none") + } + + if !strings.Contains(err.Error(), "does not parse as a boolean") { + t.Fatal("expected an error about parsing config keys but got none") + } +} + +// TestRaft_ParseRaftWalVerifierInterval checks to make sure we handle various intervals correctly and have a default +func TestRaft_ParseRaftWalVerifierInterval(t *testing.T) { + testCases := []struct { + name string + givenInterval string + expectedInterval string + shouldError bool + }{ + { + "zero", + "0s", + defaultRaftLogVerificationInterval.String(), + false, + }, + { + "one", + "1s", + defaultRaftLogVerificationInterval.String(), + false, + }, + { + "nothing", + "", + defaultRaftLogVerificationInterval.String(), + false, + }, + { + "default", + "60s", + defaultRaftLogVerificationInterval.String(), + false, + }, + { + "more than the default", + "75s", + "75s", + false, + }, + { + "obviously wrong", + "notadurationlol", + "", + true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + raftDir := t.TempDir() + conf := map[string]string{ + "path": raftDir, + "node_id": "abc123", + "raft_wal": "true", + "raft_log_verifier_enabled": "true", + "raft_log_verification_interval": tc.givenInterval, + } + + rbRaw, err := NewRaftBackend(conf, hclog.NewNullLogger()) + if tc.shouldError { + if err == nil { + t.Fatal("expected an error but got none") + } + + // return early, since we got the error we wanted + return + } + if !tc.shouldError && err != nil { + t.Fatal(err) + } + + rb := rbRaw.(*RaftBackend) + + parsedExpectedInterval, err := parseutil.ParseDurationSecond(tc.expectedInterval) + if err != nil { + t.Fatal(err) + } + + if parsedExpectedInterval != rb.verificationInterval() { + t.Fatal("expected intervals to match but they didn't") + } + }) + } +} + +// TestRaft_ParseAutopilotUpgradeVersion tests that autopilot_upgrade_version parses correctly and returns an error if not +func TestRaft_ParseAutopilotUpgradeVersion(t *testing.T) { + raftDir := t.TempDir() conf := map[string]string{ "path": raftDir, "node_id": "abc123", "autopilot_upgrade_version": "hahano", } - _, err = NewRaftBackend(conf, hclog.NewNullLogger()) + _, err := NewRaftBackend(conf, hclog.NewNullLogger()) if err == nil { t.Fatal("expected an error but got none") } @@ -214,12 +457,7 @@ func TestRaft_ParseNonVoter(t *testing.T) { if tc.envValue != nil { t.Setenv(EnvVaultRaftNonVoter, *tc.envValue) } - raftDir, err := ioutil.TempDir("", "vault-raft-") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(raftDir) - + raftDir := t.TempDir() conf := map[string]string{ "path": raftDir, "node_id": "abc123", @@ -252,387 +490,477 @@ func TestRaft_ParseNonVoter(t *testing.T) { } func TestRaft_Backend_LargeKey(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() - key, err := base62.Random(bolt.MaxKeySize + 1) - if err != nil { - t.Fatal(err) - } - entry := &physical.Entry{Key: key, Value: []byte(key)} + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - err = b.Put(context.Background(), entry) - if err == nil { - t.Fatal("expected error for put entry") - } + b, _ := GetRaftWithConfig(t, true, true, conf) + key, err := base62.Random(bolt.MaxKeySize + 1) + if err != nil { + t.Fatal(err) + } + entry := &physical.Entry{Key: key, Value: []byte(key)} - if !strings.Contains(err.Error(), physical.ErrKeyTooLarge) { - t.Fatalf("expected %q, got %v", physical.ErrKeyTooLarge, err) - } + err = b.Put(context.Background(), entry) + if err == nil { + t.Fatal("expected error for put entry") + } - out, err := b.Get(context.Background(), entry.Key) - if err != nil { - t.Fatalf("unexpected error after failed put: %v", err) - } - if out != nil { - t.Fatal("expected response entry to be nil after a failed put") - } + if !strings.Contains(err.Error(), physical.ErrKeyTooLarge) { + t.Fatalf("expected %q, got %v", physical.ErrKeyTooLarge, err) + } + + out, err := b.Get(context.Background(), entry.Key) + if err != nil { + t.Fatalf("unexpected error after failed put: %v", err) + } + if out != nil { + t.Fatal("expected response entry to be nil after a failed put") + } + }) } func TestRaft_Backend_LargeValue(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() - value := make([]byte, defaultMaxEntrySize+1) - rand.Read(value) - entry := &physical.Entry{Key: "foo", Value: value} + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - err := b.Put(context.Background(), entry) - if err == nil { - t.Fatal("expected error for put entry") - } + b, _ := GetRaftWithConfig(t, true, true, conf) + value := make([]byte, defaultMaxEntrySize+1) + rand.Read(value) + entry := &physical.Entry{Key: "foo", Value: value} - if !strings.Contains(err.Error(), physical.ErrValueTooLarge) { - t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err) - } + err := b.Put(context.Background(), entry) + if err == nil { + t.Fatal("expected error for put entry") + } - out, err := b.Get(context.Background(), entry.Key) - if err != nil { - t.Fatalf("unexpected error after failed put: %v", err) - } - if out != nil { - t.Fatal("expected response entry to be nil after a failed put") - } + if !strings.Contains(err.Error(), physical.ErrValueTooLarge) { + t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err) + } + + out, err := b.Get(context.Background(), entry.Key) + if err != nil { + t.Fatalf("unexpected error after failed put: %v", err) + } + if out != nil { + t.Fatal("expected response entry to be nil after a failed put") + } + }) } // TestRaft_TransactionalBackend_GetTransactions tests that passing a slice of transactions to the // raft backend will populate values for any transactions that are Get operations. func TestRaft_TransactionalBackend_GetTransactions(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } + + b, _ := GetRaftWithConfig(t, true, true, conf) + ctx := context.Background() + txns := make([]*physical.TxnEntry, 0) - ctx := context.Background() - txns := make([]*physical.TxnEntry, 0) + // Add some seed values to our FSM, and prepare our slice of transactions at the same time + for i := 0; i < 5; i++ { + key := fmt.Sprintf("foo/%d", i) + err := b.fsm.Put(ctx, &physical.Entry{Key: key, Value: []byte(fmt.Sprintf("value-%d", i))}) + if err != nil { + t.Fatal(err) + } - // Add some seed values to our FSM, and prepare our slice of transactions at the same time - for i := 0; i < 5; i++ { - key := fmt.Sprintf("foo/%d", i) - err := b.fsm.Put(ctx, &physical.Entry{Key: key, Value: []byte(fmt.Sprintf("value-%d", i))}) - if err != nil { - t.Fatal(err) + txns = append(txns, &physical.TxnEntry{ + Operation: physical.GetOperation, + Entry: &physical.Entry{ + Key: key, + }, + }) } - txns = append(txns, &physical.TxnEntry{ - Operation: physical.GetOperation, - Entry: &physical.Entry{ - Key: key, - }, - }) - } + // Add some additional transactions, so we have a mix of operations + for i := 0; i < 10; i++ { + txnEntry := &physical.TxnEntry{ + Entry: &physical.Entry{ + Key: fmt.Sprintf("lol-%d", i), + }, + } - // Add some additional transactions, so we have a mix of operations - for i := 0; i < 10; i++ { - txnEntry := &physical.TxnEntry{ - Entry: &physical.Entry{ - Key: fmt.Sprintf("lol-%d", i), - }, - } + if i%2 == 0 { + txnEntry.Operation = physical.PutOperation + txnEntry.Entry.Value = []byte("lol") + } else { + txnEntry.Operation = physical.DeleteOperation + } - if i%2 == 0 { - txnEntry.Operation = physical.PutOperation - txnEntry.Entry.Value = []byte("lol") - } else { - txnEntry.Operation = physical.DeleteOperation + txns = append(txns, txnEntry) } - txns = append(txns, txnEntry) - } - - err := b.Transaction(ctx, txns) - if err != nil { - t.Fatal(err) - } + err := b.Transaction(ctx, txns) + if err != nil { + t.Fatal(err) + } - // Check that our Get operations were populated with their values - for i, txn := range txns { - if txn.Operation == physical.GetOperation { - val := []byte(fmt.Sprintf("value-%d", i)) - if !bytes.Equal(val, txn.Entry.Value) { - t.Fatalf("expected %s to equal %s but it didn't", hex.EncodeToString(val), hex.EncodeToString(txn.Entry.Value)) + // Check that our Get operations were populated with their values + for i, txn := range txns { + if txn.Operation == physical.GetOperation { + val := []byte(fmt.Sprintf("value-%d", i)) + if !bytes.Equal(val, txn.Entry.Value) { + t.Fatalf("expected %s to equal %s but it didn't", hex.EncodeToString(val), hex.EncodeToString(txn.Entry.Value)) + } } } - } + }) } func TestRaft_TransactionalBackend_LargeKey(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - value := make([]byte, defaultMaxEntrySize+1) - rand.Read(value) + b, _ := GetRaftWithConfig(t, true, true, conf) + value := make([]byte, defaultMaxEntrySize+1) + rand.Read(value) - key, err := base62.Random(bolt.MaxKeySize + 1) - if err != nil { - t.Fatal(err) - } - txns := []*physical.TxnEntry{ - { - Operation: physical.PutOperation, - Entry: &physical.Entry{ - Key: key, - Value: []byte(key), + key, err := base62.Random(bolt.MaxKeySize + 1) + if err != nil { + t.Fatal(err) + } + txns := []*physical.TxnEntry{ + { + Operation: physical.PutOperation, + Entry: &physical.Entry{ + Key: key, + Value: []byte(key), + }, }, - }, - } + } - err = b.Transaction(context.Background(), txns) - if err == nil { - t.Fatal("expected error for transactions") - } + err = b.Transaction(context.Background(), txns) + if err == nil { + t.Fatal("expected error for transactions") + } - if !strings.Contains(err.Error(), physical.ErrKeyTooLarge) { - t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err) - } + if !strings.Contains(err.Error(), physical.ErrKeyTooLarge) { + t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err) + } - out, err := b.Get(context.Background(), txns[0].Entry.Key) - if err != nil { - t.Fatalf("unexpected error after failed put: %v", err) - } - if out != nil { - t.Fatal("expected response entry to be nil after a failed put") - } + out, err := b.Get(context.Background(), txns[0].Entry.Key) + if err != nil { + t.Fatalf("unexpected error after failed put: %v", err) + } + if out != nil { + t.Fatal("expected response entry to be nil after a failed put") + } + }) } func TestRaft_TransactionalBackend_LargeValue(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) - - value := make([]byte, defaultMaxEntrySize+1) - rand.Read(value) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - txns := []*physical.TxnEntry{ - { - Operation: physical.PutOperation, - Entry: &physical.Entry{ - Key: "foo", - Value: value, + b, _ := GetRaftWithConfig(t, true, true, conf) + value := make([]byte, defaultMaxEntrySize+1) + rand.Read(value) + + txns := []*physical.TxnEntry{ + { + Operation: physical.PutOperation, + Entry: &physical.Entry{ + Key: "foo", + Value: value, + }, }, - }, - } + } - err := b.Transaction(context.Background(), txns) - if err == nil { - t.Fatal("expected error for transactions") - } + err := b.Transaction(context.Background(), txns) + if err == nil { + t.Fatal("expected error for transactions") + } - if !strings.Contains(err.Error(), physical.ErrValueTooLarge) { - t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err) - } + if !strings.Contains(err.Error(), physical.ErrValueTooLarge) { + t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err) + } - out, err := b.Get(context.Background(), txns[0].Entry.Key) - if err != nil { - t.Fatalf("unexpected error after failed put: %v", err) - } - if out != nil { - t.Fatal("expected response entry to be nil after a failed put") - } + out, err := b.Get(context.Background(), txns[0].Entry.Key) + if err != nil { + t.Fatalf("unexpected error after failed put: %v", err) + } + if out != nil { + t.Fatal("expected response entry to be nil after a failed put") + } + }) } func TestRaft_Backend_ListPrefix(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - physical.ExerciseBackend_ListPrefix(t, b) + b, _ := GetRaftWithConfig(t, true, true, conf) + physical.ExerciseBackend_ListPrefix(t, b) + }) } func TestRaft_TransactionalBackend(t *testing.T) { - b, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - physical.ExerciseTransactionalBackend(t, b) + b, _ := GetRaftWithConfig(t, true, true, conf) + physical.ExerciseTransactionalBackend(t, b) + }) } func TestRaft_HABackend(t *testing.T) { t.Skip() - raft, dir := GetRaft(t, true, true) - defer os.RemoveAll(dir) - raft2, dir2 := GetRaft(t, false, true) - defer os.RemoveAll(dir2) + raft1, _ := GetRaft(t, true, true) + raft2, _ := GetRaft(t, false, true) // Add raft2 to the cluster - addPeer(t, raft, raft2) - - physical.ExerciseHABackend(t, raft, raft2) + addPeer(t, raft1, raft2) + physical.ExerciseHABackend(t, raft1, raft2) } func TestRaft_Backend_ThreeNode(t *testing.T) { - raft1, dir := GetRaft(t, true, true) - raft2, dir2 := GetRaft(t, false, true) - raft3, dir3 := GetRaft(t, false, true) - defer os.RemoveAll(dir) - defer os.RemoveAll(dir2) - defer os.RemoveAll(dir3) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - // Add raft2 to the cluster - addPeer(t, raft1, raft2) + raft1, _ := GetRaftWithConfig(t, true, true, conf) + raft2, _ := GetRaftWithConfig(t, false, true, conf) + raft3, _ := GetRaftWithConfig(t, false, true, conf) + + // Add raft2 to the cluster + addPeer(t, raft1, raft2) - // Add raft3 to the cluster - addPeer(t, raft1, raft3) + // Add raft3 to the cluster + addPeer(t, raft1, raft3) - physical.ExerciseBackend(t, raft1) + physical.ExerciseBackend(t, raft1) - time.Sleep(10 * time.Second) - // Make sure all stores are the same - compareFSMs(t, raft1.fsm, raft2.fsm) - compareFSMs(t, raft1.fsm, raft3.fsm) + time.Sleep(10 * time.Second) + // Make sure all stores are the same + compareFSMs(t, raft1.fsm, raft2.fsm) + compareFSMs(t, raft1.fsm, raft3.fsm) + }) } func TestRaft_GetOfflineConfig(t *testing.T) { - // Create 3 raft nodes - raft1, dir1 := GetRaft(t, true, true) - raft2, dir2 := GetRaft(t, false, true) - raft3, dir3 := GetRaft(t, false, true) - defer os.RemoveAll(dir1) - defer os.RemoveAll(dir2) - defer os.RemoveAll(dir3) - - // Add them all to the cluster - addPeer(t, raft1, raft2) - addPeer(t, raft1, raft3) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + config := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - // Add some data into the FSM - physical.ExerciseBackend(t, raft1) + // Create 3 raft nodes + raft1, _ := GetRaftWithConfig(t, true, true, config) + raft2, _ := GetRaftWithConfig(t, false, true, config) + raft3, _ := GetRaftWithConfig(t, false, true, config) - time.Sleep(10 * time.Second) + // Add them all to the cluster + addPeer(t, raft1, raft2) + addPeer(t, raft1, raft3) - // Spin down the raft cluster and check that GetConfigurationOffline - // returns 3 voters - raft3.TeardownCluster(nil) - raft2.TeardownCluster(nil) - raft1.TeardownCluster(nil) + // Add some data into the FSM + physical.ExerciseBackend(t, raft1) - conf, err := raft1.GetConfigurationOffline() - if err != nil { - t.Fatal(err) - } - if len(conf.Servers) != 3 { - t.Fatalf("three raft nodes existed but we only see %d", len(conf.Servers)) - } - for _, s := range conf.Servers { - if s.Voter != true { - t.Fatalf("one of the nodes is not a voter") + time.Sleep(10 * time.Second) + + // Spin down the raft cluster and check that GetConfigurationOffline + // returns 3 voters + err := raft3.TeardownCluster(nil) + if err != nil { + t.Fatal(err) } - } + err = raft2.TeardownCluster(nil) + if err != nil { + t.Fatal(err) + } + err = raft1.TeardownCluster(nil) + if err != nil { + t.Fatal(err) + } + + conf, err := raft1.GetConfigurationOffline() + if err != nil { + t.Fatal(err) + } + if len(conf.Servers) != 3 { + t.Fatalf("three raft nodes existed but we only see %d", len(conf.Servers)) + } + for _, s := range conf.Servers { + if s.Voter != true { + t.Fatalf("one of the nodes is not a voter") + } + } + }) } func TestRaft_Recovery(t *testing.T) { - // Create 4 raft nodes - raft1, dir1 := GetRaft(t, true, true) - raft2, dir2 := GetRaft(t, false, true) - raft3, dir3 := GetRaft(t, false, true) - raft4, dir4 := GetRaft(t, false, true) - defer os.RemoveAll(dir1) - defer os.RemoveAll(dir2) - defer os.RemoveAll(dir3) - defer os.RemoveAll(dir4) - - // Add them all to the cluster - addPeer(t, raft1, raft2) - addPeer(t, raft1, raft3) - addPeer(t, raft1, raft4) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - // Add some data into the FSM - physical.ExerciseBackend(t, raft1) + // Create 4 raft nodes + raft1, dir1 := GetRaftWithConfig(t, true, true, conf) + raft2, dir2 := GetRaftWithConfig(t, false, true, conf) + raft3, _ := GetRaftWithConfig(t, false, true, conf) + raft4, dir4 := GetRaftWithConfig(t, false, true, conf) - time.Sleep(10 * time.Second) + // Add them all to the cluster + addPeer(t, raft1, raft2) + addPeer(t, raft1, raft3) + addPeer(t, raft1, raft4) - // Bring down all nodes - raft1.TeardownCluster(nil) - raft2.TeardownCluster(nil) - raft3.TeardownCluster(nil) - raft4.TeardownCluster(nil) + // Add some data into the FSM + physical.ExerciseBackend(t, raft1) - // Prepare peers.json - type RecoveryPeer struct { - ID string `json:"id"` - Address string `json:"address"` - NonVoter bool `json:"non_voter"` - } + time.Sleep(10 * time.Second) - // Leave out node 1 during recovery - peersList := make([]*RecoveryPeer, 0, 3) - peersList = append(peersList, &RecoveryPeer{ - ID: raft1.NodeID(), - Address: raft1.NodeID(), - NonVoter: false, - }) - peersList = append(peersList, &RecoveryPeer{ - ID: raft2.NodeID(), - Address: raft2.NodeID(), - NonVoter: false, - }) - peersList = append(peersList, &RecoveryPeer{ - ID: raft4.NodeID(), - Address: raft4.NodeID(), - NonVoter: false, - }) + // Bring down all nodes + err := raft1.TeardownCluster(nil) + if err != nil { + t.Fatal(err) + } + err = raft2.TeardownCluster(nil) + if err != nil { + t.Fatal(err) + } + err = raft3.TeardownCluster(nil) + if err != nil { + t.Fatal(err) + } + err = raft4.TeardownCluster(nil) + if err != nil { + t.Fatal(err) + } - peersJSONBytes, err := jsonutil.EncodeJSON(peersList) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(filepath.Join(dir1, raftState), "peers.json"), peersJSONBytes, 0o644) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(filepath.Join(dir2, raftState), "peers.json"), peersJSONBytes, 0o644) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(filepath.Join(filepath.Join(dir4, raftState), "peers.json"), peersJSONBytes, 0o644) - if err != nil { - t.Fatal(err) - } + // Prepare peers.json + type RecoveryPeer struct { + ID string `json:"id"` + Address string `json:"address"` + NonVoter bool `json:"non_voter"` + } - // Bring up the nodes again - raft1.SetupCluster(context.Background(), SetupOpts{}) - raft2.SetupCluster(context.Background(), SetupOpts{}) - raft4.SetupCluster(context.Background(), SetupOpts{}) + // Leave out node 1 during recovery + peersList := make([]*RecoveryPeer, 0, 3) + peersList = append(peersList, &RecoveryPeer{ + ID: raft1.NodeID(), + Address: raft1.NodeID(), + NonVoter: false, + }) + peersList = append(peersList, &RecoveryPeer{ + ID: raft2.NodeID(), + Address: raft2.NodeID(), + NonVoter: false, + }) + peersList = append(peersList, &RecoveryPeer{ + ID: raft4.NodeID(), + Address: raft4.NodeID(), + NonVoter: false, + }) - peers, err := raft1.Peers(context.Background()) - if err != nil { - t.Fatal(err) - } - if len(peers) != 3 { - t.Fatalf("failed to recover the cluster") - } + peersJSONBytes, err := jsonutil.EncodeJSON(peersList) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile(filepath.Join(filepath.Join(dir1, raftState), "peers.json"), peersJSONBytes, 0o644) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile(filepath.Join(filepath.Join(dir2, raftState), "peers.json"), peersJSONBytes, 0o644) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile(filepath.Join(filepath.Join(dir4, raftState), "peers.json"), peersJSONBytes, 0o644) + if err != nil { + t.Fatal(err) + } - time.Sleep(10 * time.Second) + // Bring up the nodes again + err = raft1.SetupCluster(context.Background(), SetupOpts{}) + if err != nil { + t.Fatal(err) + } + err = raft2.SetupCluster(context.Background(), SetupOpts{}) + if err != nil { + t.Fatal(err) + } + err = raft4.SetupCluster(context.Background(), SetupOpts{}) + if err != nil { + t.Fatal(err) + } + + peers, err := raft1.Peers(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(peers) != 3 { + t.Fatalf("failed to recover the cluster") + } - compareFSMs(t, raft1.fsm, raft2.fsm) - compareFSMs(t, raft1.fsm, raft4.fsm) + time.Sleep(10 * time.Second) + + compareFSMs(t, raft1.fsm, raft2.fsm) + compareFSMs(t, raft1.fsm, raft4.fsm) + }) } func TestRaft_TransactionalBackend_ThreeNode(t *testing.T) { - raft1, dir := GetRaft(t, true, true) - raft2, dir2 := GetRaft(t, false, true) - raft3, dir3 := GetRaft(t, false, true) - defer os.RemoveAll(dir) - defer os.RemoveAll(dir2) - defer os.RemoveAll(dir3) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - // Add raft2 to the cluster - addPeer(t, raft1, raft2) + raft1, _ := GetRaftWithConfig(t, true, true, conf) + raft2, _ := GetRaftWithConfig(t, false, true, conf) + raft3, _ := GetRaftWithConfig(t, false, true, conf) + + // Add raft2 to the cluster + addPeer(t, raft1, raft2) - // Add raft3 to the cluster - addPeer(t, raft1, raft3) + // Add raft3 to the cluster + addPeer(t, raft1, raft3) - physical.ExerciseTransactionalBackend(t, raft1) + physical.ExerciseTransactionalBackend(t, raft1) - time.Sleep(10 * time.Second) - // Make sure all stores are the same - compareFSMs(t, raft1.fsm, raft2.fsm) - compareFSMs(t, raft1.fsm, raft3.fsm) + time.Sleep(10 * time.Second) + // Make sure all stores are the same + compareFSMs(t, raft1.fsm, raft2.fsm) + compareFSMs(t, raft1.fsm, raft3.fsm) + }) } // TestRaft_TransactionalLimitsEnvOverride ensures the ENV var overrides for @@ -718,125 +1046,150 @@ func TestRaft_TransactionalLimitsEnvOverride(t *testing.T) { } func TestRaft_Backend_Performance(t *testing.T) { - b, dir := GetRaft(t, true, false) - defer os.RemoveAll(dir) + t.Parallel() + testBothRaftBackends(t, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - defaultConfig := raft.DefaultConfig() + b, dir := GetRaftWithConfig(t, true, true, conf) - localConfig := raft.DefaultConfig() - b.applyConfigSettings(localConfig) + defaultConfig := raft.DefaultConfig() + localConfig := raft.DefaultConfig() + err := b.applyConfigSettings(localConfig) + if err != nil { + t.Fatal(err) + } - if localConfig.ElectionTimeout != defaultConfig.ElectionTimeout*5 { - t.Fatalf("bad config: %v", localConfig) - } - if localConfig.HeartbeatTimeout != defaultConfig.HeartbeatTimeout*5 { - t.Fatalf("bad config: %v", localConfig) - } - if localConfig.LeaderLeaseTimeout != defaultConfig.LeaderLeaseTimeout*5 { - t.Fatalf("bad config: %v", localConfig) - } + if localConfig.ElectionTimeout != defaultConfig.ElectionTimeout*5 { + t.Fatalf("bad config: %v", localConfig) + } + if localConfig.HeartbeatTimeout != defaultConfig.HeartbeatTimeout*5 { + t.Fatalf("bad config: %v", localConfig) + } + if localConfig.LeaderLeaseTimeout != defaultConfig.LeaderLeaseTimeout*5 { + t.Fatalf("bad config: %v", localConfig) + } - b.conf = map[string]string{ - "path": dir, - "performance_multiplier": "5", - } + b.conf = map[string]string{ + "path": dir, + "performance_multiplier": "5", + } - localConfig = raft.DefaultConfig() - b.applyConfigSettings(localConfig) + localConfig = raft.DefaultConfig() + err = b.applyConfigSettings(localConfig) + if err != nil { + t.Fatal(err) + } - if localConfig.ElectionTimeout != defaultConfig.ElectionTimeout*5 { - t.Fatalf("bad config: %v", localConfig) - } - if localConfig.HeartbeatTimeout != defaultConfig.HeartbeatTimeout*5 { - t.Fatalf("bad config: %v", localConfig) - } - if localConfig.LeaderLeaseTimeout != defaultConfig.LeaderLeaseTimeout*5 { - t.Fatalf("bad config: %v", localConfig) - } + if localConfig.ElectionTimeout != defaultConfig.ElectionTimeout*5 { + t.Fatalf("bad config: %v", localConfig) + } + if localConfig.HeartbeatTimeout != defaultConfig.HeartbeatTimeout*5 { + t.Fatalf("bad config: %v", localConfig) + } + if localConfig.LeaderLeaseTimeout != defaultConfig.LeaderLeaseTimeout*5 { + t.Fatalf("bad config: %v", localConfig) + } - b.conf = map[string]string{ - "path": dir, - "performance_multiplier": "1", - } + b.conf = map[string]string{ + "path": dir, + "performance_multiplier": "1", + } - localConfig = raft.DefaultConfig() - b.applyConfigSettings(localConfig) + localConfig = raft.DefaultConfig() + err = b.applyConfigSettings(localConfig) + if err != nil { + t.Fatal(err) + } - if localConfig.ElectionTimeout != defaultConfig.ElectionTimeout { - t.Fatalf("bad config: %v", localConfig) - } - if localConfig.HeartbeatTimeout != defaultConfig.HeartbeatTimeout { - t.Fatalf("bad config: %v", localConfig) - } - if localConfig.LeaderLeaseTimeout != defaultConfig.LeaderLeaseTimeout { - t.Fatalf("bad config: %v", localConfig) - } + if localConfig.ElectionTimeout != defaultConfig.ElectionTimeout { + t.Fatalf("bad config: %v", localConfig) + } + if localConfig.HeartbeatTimeout != defaultConfig.HeartbeatTimeout { + t.Fatalf("bad config: %v", localConfig) + } + if localConfig.LeaderLeaseTimeout != defaultConfig.LeaderLeaseTimeout { + t.Fatalf("bad config: %v", localConfig) + } + }) } func BenchmarkDB_Puts(b *testing.B) { - raft, dir := GetRaft(b, true, false) - defer os.RemoveAll(dir) - raft2, dir2 := GetRaft(b, true, false) - defer os.RemoveAll(dir2) - - bench := func(b *testing.B, s physical.Backend, dataSize int) { - data, err := uuid.GenerateRandomBytes(dataSize) - if err != nil { - b.Fatal(err) + testBothRaftBackendsBenchmark(b, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, } - ctx := context.Background() - pe := &physical.Entry{ - Value: data, - } - testName := b.Name() + raft1, _ := GetRaftWithConfig(b, true, false, conf) + raft2, _ := GetRaftWithConfig(b, true, false, conf) - b.ResetTimer() - for i := 0; i < b.N; i++ { - pe.Key = fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i)))) - err := s.Put(ctx, pe) + bench := func(b *testing.B, s physical.Backend, dataSize int) { + data, err := uuid.GenerateRandomBytes(dataSize) if err != nil { b.Fatal(err) } + + ctx := context.Background() + pe := &physical.Entry{ + Value: data, + } + testName := b.Name() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pe.Key = fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i)))) + err := s.Put(ctx, pe) + if err != nil { + b.Fatal(err) + } + } } - } - b.Run("256b", func(b *testing.B) { bench(b, raft, 256) }) - b.Run("256kb", func(b *testing.B) { bench(b, raft2, 256*1024) }) + b.Run("256b", func(b *testing.B) { bench(b, raft1, 256) }) + b.Run("256kb", func(b *testing.B) { bench(b, raft2, 256*1024) }) + }) } func BenchmarkDB_Snapshot(b *testing.B) { - raft, dir := GetRaft(b, true, false) - defer os.RemoveAll(dir) - - data, err := uuid.GenerateRandomBytes(256 * 1024) - if err != nil { - b.Fatal(err) - } - - ctx := context.Background() - pe := &physical.Entry{ - Value: data, - } - testName := b.Name() + testBothRaftBackendsBenchmark(b, func(useRaftWal string) { + conf := map[string]string{ + "trailing_logs": "100", + "raft_wal": useRaftWal, + } - for i := 0; i < 100; i++ { - pe.Key = fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i)))) - err = raft.Put(ctx, pe) + raft1, _ := GetRaftWithConfig(b, true, false, conf) + data, err := uuid.GenerateRandomBytes(256 * 1024) if err != nil { b.Fatal(err) } - } - bench := func(b *testing.B, s *FSM) { - b.ResetTimer() - for i := 0; i < b.N; i++ { + ctx := context.Background() + pe := &physical.Entry{ + Value: data, + } + testName := b.Name() + + for i := 0; i < 100; i++ { pe.Key = fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i)))) - s.writeTo(ctx, discardCloser{Writer: ioutil.Discard}, discardCloser{Writer: ioutil.Discard}) + err = raft1.Put(ctx, pe) + if err != nil { + b.Fatal(err) + } + } + + bench := func(b *testing.B, s *FSM) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + pe.Key = fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i)))) + s.writeTo(ctx, discardCloser{Writer: io.Discard}, discardCloser{Writer: io.Discard}) + } } - } - b.Run("256kb", func(b *testing.B) { bench(b, raft.fsm) }) + b.Run("256kb", func(b *testing.B) { bench(b, raft1.fsm) }) + }) } type discardCloser struct { diff --git a/physical/raft/snapshot.go b/physical/raft/snapshot.go index adcfac4e1c44..b584af7665ed 100644 --- a/physical/raft/snapshot.go +++ b/physical/raft/snapshot.go @@ -20,11 +20,10 @@ import ( "github.com/golang/protobuf/proto" bolt "github.com/hashicorp-forge/bbolt" log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/raft" "github.com/hashicorp/vault/sdk/plugin/pb" "github.com/rboyer/safeio" "go.uber.org/atomic" - - "github.com/hashicorp/raft" ) const ( diff --git a/physical/raft/testing.go b/physical/raft/testing.go index 16f74ff1696b..7250782b086f 100644 --- a/physical/raft/testing.go +++ b/physical/raft/testing.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "testing" "github.com/hashicorp/go-hclog" @@ -15,26 +14,37 @@ import ( ) func GetRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, string) { - raftDir, err := ioutil.TempDir("", "vault-raft-") - if err != nil { - t.Fatal(err) - } - t.Logf("raft dir: %s", raftDir) + return getRaftInternal(t, bootstrap, defaultRaftConfig(t, bootstrap, noStoreState), nil) +} - return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, nil) +func GetRaftWithConfig(t testing.TB, bootstrap bool, noStoreState bool, conf map[string]string) (*RaftBackend, string) { + defaultConf := defaultRaftConfig(t, bootstrap, noStoreState) + conf["path"] = defaultConf["path"] + conf["doNotStoreLatestState"] = defaultConf["doNotStoreLatestState"] + return getRaftInternal(t, bootstrap, conf, nil) } func GetRaftWithLogOutput(t testing.TB, bootstrap bool, noStoreState bool, logOutput io.Writer) (*RaftBackend, string) { - raftDir, err := ioutil.TempDir("", "vault-raft-") - if err != nil { - t.Fatal(err) - } + return getRaftInternal(t, bootstrap, defaultRaftConfig(t, bootstrap, noStoreState), logOutput) +} + +func defaultRaftConfig(t testing.TB, bootstrap bool, noStoreState bool) map[string]string { + raftDir := t.TempDir() t.Logf("raft dir: %s", raftDir) - return getRaftWithDirAndLogOutput(t, bootstrap, noStoreState, raftDir, logOutput) + conf := map[string]string{ + "path": raftDir, + "trailing_logs": "100", + } + + if noStoreState { + conf["doNotStoreLatestState"] = "" + } + + return conf } -func getRaftWithDirAndLogOutput(t testing.TB, bootstrap bool, noStoreState bool, raftDir string, logOutput io.Writer) (*RaftBackend, string) { +func getRaftInternal(t testing.TB, bootstrap bool, conf map[string]string, logOutput io.Writer) (*RaftBackend, string) { id, err := uuid.GenerateUUID() if err != nil { t.Fatal(err) @@ -45,17 +55,8 @@ func getRaftWithDirAndLogOutput(t testing.TB, bootstrap bool, noStoreState bool, Level: hclog.Trace, Output: logOutput, }) - logger.Info("raft dir", "dir", raftDir) - - conf := map[string]string{ - "path": raftDir, - "trailing_logs": "100", - "node_id": id, - } - if noStoreState { - conf["doNotStoreLatestState"] = "" - } + conf["node_id"] = id backendRaw, err := NewRaftBackend(conf, logger) if err != nil { @@ -88,6 +89,5 @@ func getRaftWithDirAndLogOutput(t testing.TB, bootstrap bool, noStoreState bool, } backend.DisableAutopilot() - - return backend, raftDir + return backend, conf["path"] } diff --git a/sdk/helper/testcluster/docker/environment.go b/sdk/helper/testcluster/docker/environment.go index 801ee924ac7c..259ba5e12385 100644 --- a/sdk/helper/testcluster/docker/environment.go +++ b/sdk/helper/testcluster/docker/environment.go @@ -1256,6 +1256,18 @@ COPY vault /bin/vault return tag, nil } +func (dc *DockerCluster) GetActiveClusterNode() *DockerClusterNode { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + node, err := testcluster.WaitForActiveNode(ctx, dc) + if err != nil { + panic(fmt.Sprintf("no cluster node became active in timeout window: %v", err)) + } + + return dc.ClusterNodes[node] +} + /* Notes on testing the non-bridge network case: - you need the test itself to be running in a container so that it can use the network; create the network using diff --git a/sdk/physical/testing.go b/sdk/physical/testing.go index 1114f34a5195..9b7d339284dd 100644 --- a/sdk/physical/testing.go +++ b/sdk/physical/testing.go @@ -15,9 +15,10 @@ import ( func ExerciseBackend(t testing.TB, b Backend) { t.Helper() + ctx := context.Background() // Should be empty - keys, err := b.List(context.Background(), "") + keys, err := b.List(ctx, "") if err != nil { t.Fatalf("initial list failed: %v", err) } @@ -26,13 +27,13 @@ func ExerciseBackend(t testing.TB, b Backend) { } // Delete should work if it does not exist - err = b.Delete(context.Background(), "foo") + err = b.Delete(ctx, "foo") if err != nil { t.Fatalf("idempotent delete: %v", err) } // Get should not fail, but be nil - out, err := b.Get(context.Background(), "foo") + out, err := b.Get(ctx, "foo") if err != nil { t.Fatalf("initial get failed: %v", err) } @@ -42,13 +43,13 @@ func ExerciseBackend(t testing.TB, b Backend) { // Make an entry e := &Entry{Key: "foo", Value: []byte("test")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("put failed: %v", err) } // Get should work - out, err = b.Get(context.Background(), "foo") + out, err = b.Get(ctx, "foo") if err != nil { t.Fatalf("get failed: %v", err) } @@ -57,7 +58,7 @@ func ExerciseBackend(t testing.TB, b Backend) { } // List should not be empty - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("list failed: %v", err) } @@ -66,13 +67,13 @@ func ExerciseBackend(t testing.TB, b Backend) { } // Delete should work - err = b.Delete(context.Background(), "foo") + err = b.Delete(ctx, "foo") if err != nil { t.Fatalf("delete: %v", err) } // Should be empty - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("list after delete: %v", err) } @@ -81,7 +82,7 @@ func ExerciseBackend(t testing.TB, b Backend) { } // Get should fail - out, err = b.Get(context.Background(), "foo") + out, err = b.Get(ctx, "foo") if err != nil { t.Fatalf("get after delete: %v", err) } @@ -91,25 +92,25 @@ func ExerciseBackend(t testing.TB, b Backend) { // Multiple Puts should work; GH-189 e = &Entry{Key: "foo", Value: []byte("test")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("multi put 1 failed: %v", err) } e = &Entry{Key: "foo", Value: []byte("test")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("multi put 2 failed: %v", err) } // Make a nested entry e = &Entry{Key: "foo/bar", Value: []byte("baz")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("nested put failed: %v", err) } // Get should work - out, err = b.Get(context.Background(), "foo/bar") + out, err = b.Get(ctx, "foo/bar") if err != nil { t.Fatalf("get failed: %v", err) } @@ -117,7 +118,7 @@ func ExerciseBackend(t testing.TB, b Backend) { t.Errorf("bad: %v expected: %v", out, e) } - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("list multi failed: %v", err) } @@ -127,13 +128,13 @@ func ExerciseBackend(t testing.TB, b Backend) { } // Delete with children should work - err = b.Delete(context.Background(), "foo") + err = b.Delete(ctx, "foo") if err != nil { t.Fatalf("delete after multi: %v", err) } // Get should return the child - out, err = b.Get(context.Background(), "foo/bar") + out, err = b.Get(ctx, "foo/bar") if err != nil { t.Fatalf("get after multi delete: %v", err) } @@ -143,17 +144,17 @@ func ExerciseBackend(t testing.TB, b Backend) { // Removal of nested secret should not leave artifacts e = &Entry{Key: "foo/nested1/nested2/nested3", Value: []byte("baz")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("deep nest: %v", err) } - err = b.Delete(context.Background(), "foo/nested1/nested2/nested3") + err = b.Delete(ctx, "foo/nested1/nested2/nested3") if err != nil { t.Fatalf("failed to remove deep nest: %v", err) } - keys, err = b.List(context.Background(), "foo/") + keys, err = b.List(ctx, "foo/") if err != nil { t.Fatalf("err: %v", err) } @@ -163,18 +164,18 @@ func ExerciseBackend(t testing.TB, b Backend) { // Make a second nested entry to test prefix removal e = &Entry{Key: "foo/zip", Value: []byte("zap")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("failed to create second nested: %v", err) } // Delete should not remove the prefix - err = b.Delete(context.Background(), "foo/bar") + err = b.Delete(ctx, "foo/bar") if err != nil { t.Fatalf("failed to delete nested prefix: %v", err) } - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("list nested prefix: %v", err) } @@ -183,12 +184,12 @@ func ExerciseBackend(t testing.TB, b Backend) { } // Delete should remove the prefix - err = b.Delete(context.Background(), "foo/zip") + err = b.Delete(ctx, "foo/zip") if err != nil { t.Fatalf("failed to delete second prefix: %v", err) } - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("listing after second delete failed: %v", err) } @@ -198,29 +199,29 @@ func ExerciseBackend(t testing.TB, b Backend) { // When the root path is empty, adding and removing deep nested values should not break listing e = &Entry{Key: "foo/nested1/nested2/value1", Value: []byte("baz")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("deep nest: %v", err) } e = &Entry{Key: "foo/nested1/nested2/value2", Value: []byte("baz")} - err = b.Put(context.Background(), e) + err = b.Put(ctx, e) if err != nil { t.Fatalf("deep nest: %v", err) } - err = b.Delete(context.Background(), "foo/nested1/nested2/value2") + err = b.Delete(ctx, "foo/nested1/nested2/value2") if err != nil { t.Fatalf("failed to remove deep nest: %v", err) } - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("listing of root failed after deletion: %v", err) } if len(keys) == 0 { t.Errorf("root is returning empty after deleting a single nested value, expected nested1/: %v", keys) - keys, err = b.List(context.Background(), "foo/nested1") + keys, err = b.List(ctx, "foo/nested1") if err != nil { t.Fatalf("listing of expected nested path 'foo/nested1' failed: %v", err) } @@ -231,12 +232,12 @@ func ExerciseBackend(t testing.TB, b Backend) { } // cleanup left over listing bug test value - err = b.Delete(context.Background(), "foo/nested1/nested2/value1") + err = b.Delete(ctx, "foo/nested1/nested2/value1") if err != nil { t.Fatalf("failed to remove deep nest: %v", err) } - keys, err = b.List(context.Background(), "") + keys, err = b.List(ctx, "") if err != nil { t.Fatalf("listing of root failed after delete of deep nest: %v", err) } @@ -247,32 +248,33 @@ func ExerciseBackend(t testing.TB, b Backend) { func ExerciseBackend_ListPrefix(t testing.TB, b Backend) { t.Helper() + ctx := context.Background() e1 := &Entry{Key: "foo", Value: []byte("test")} e2 := &Entry{Key: "foo/bar", Value: []byte("test")} e3 := &Entry{Key: "foo/bar/baz", Value: []byte("test")} defer func() { - b.Delete(context.Background(), "foo") - b.Delete(context.Background(), "foo/bar") - b.Delete(context.Background(), "foo/bar/baz") + _ = b.Delete(ctx, "foo") + _ = b.Delete(ctx, "foo/bar") + _ = b.Delete(ctx, "foo/bar/baz") }() - err := b.Put(context.Background(), e1) + err := b.Put(ctx, e1) if err != nil { t.Fatalf("failed to put entry 1: %v", err) } - err = b.Put(context.Background(), e2) + err = b.Put(ctx, e2) if err != nil { t.Fatalf("failed to put entry 2: %v", err) } - err = b.Put(context.Background(), e3) + err = b.Put(ctx, e3) if err != nil { t.Fatalf("failed to put entry 3: %v", err) } // Scan the root - keys, err := b.List(context.Background(), "") + keys, err := b.List(ctx, "") if err != nil { t.Fatalf("list root: %v", err) } @@ -282,7 +284,7 @@ func ExerciseBackend_ListPrefix(t testing.TB, b Backend) { } // Scan foo/ - keys, err = b.List(context.Background(), "foo/") + keys, err = b.List(ctx, "foo/") if err != nil { t.Fatalf("list level 1: %v", err) } @@ -292,7 +294,7 @@ func ExerciseBackend_ListPrefix(t testing.TB, b Backend) { } // Scan foo/bar/ - keys, err = b.List(context.Background(), "foo/bar/") + keys, err = b.List(ctx, "foo/bar/") if err != nil { t.Fatalf("list level 2: %v", err) } @@ -401,6 +403,8 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) { func ExerciseTransactionalBackend(t testing.TB, b Backend) { t.Helper() + ctx := context.Background() + tb, ok := b.(Transactional) if !ok { t.Fatal("Not a transactional backend") @@ -408,11 +412,11 @@ func ExerciseTransactionalBackend(t testing.TB, b Backend) { txns := SetupTestingTransactions(t, b) - if err := tb.Transaction(context.Background(), txns); err != nil { + if err := tb.Transaction(ctx, txns); err != nil { t.Fatal(err) } - keys, err := b.List(context.Background(), "") + keys, err := b.List(ctx, "") if err != nil { t.Fatal(err) } @@ -425,7 +429,7 @@ func ExerciseTransactionalBackend(t testing.TB, b Backend) { t.Fatalf("mismatch: expected\n%#v\ngot\n%#v\n", expected, keys) } - entry, err := b.Get(context.Background(), "foo") + entry, err := b.Get(ctx, "foo") if err != nil { t.Fatal(err) } @@ -439,7 +443,7 @@ func ExerciseTransactionalBackend(t testing.TB, b Backend) { t.Fatal("updates did not apply correctly") } - entry, err = b.Get(context.Background(), "zip") + entry, err = b.Get(ctx, "zip") if err != nil { t.Fatal(err) } @@ -456,25 +460,27 @@ func ExerciseTransactionalBackend(t testing.TB, b Backend) { func SetupTestingTransactions(t testing.TB, b Backend) []*TxnEntry { t.Helper() + ctx := context.Background() + // Add a few keys so that we test rollback with deletion - if err := b.Put(context.Background(), &Entry{ + if err := b.Put(ctx, &Entry{ Key: "foo", Value: []byte("bar"), }); err != nil { t.Fatal(err) } - if err := b.Put(context.Background(), &Entry{ + if err := b.Put(ctx, &Entry{ Key: "zip", Value: []byte("zap"), }); err != nil { t.Fatal(err) } - if err := b.Put(context.Background(), &Entry{ + if err := b.Put(ctx, &Entry{ Key: "deleteme", }); err != nil { t.Fatal(err) } - if err := b.Put(context.Background(), &Entry{ + if err := b.Put(ctx, &Entry{ Key: "deleteme2", }); err != nil { t.Fatal(err) diff --git a/vault/external_tests/raft/raft_binary/raft_test.go b/vault/external_tests/raft/raft_binary/raft_test.go index 851961c4e326..dc508f035c57 100644 --- a/vault/external_tests/raft/raft_binary/raft_test.go +++ b/vault/external_tests/raft/raft_binary/raft_test.go @@ -4,13 +4,22 @@ package raft_binary import ( + "bytes" "context" + "fmt" + "io" "os" "testing" + "time" + "github.com/hashicorp/go-secure-stdlib/strutil" + autopilot "github.com/hashicorp/raft-autopilot" + "github.com/hashicorp/vault/api" + "github.com/hashicorp/vault/helper/testhelpers" "github.com/hashicorp/vault/sdk/helper/testcluster" "github.com/hashicorp/vault/sdk/helper/testcluster/docker" rafttest "github.com/hashicorp/vault/vault/external_tests/raft" + "github.com/stretchr/testify/require" ) // TestRaft_Configuration_Docker is a variant of TestRaft_Configuration that @@ -47,3 +56,376 @@ func TestRaft_Configuration_Docker(t *testing.T) { } rafttest.Raft_Configuration_Test(t, cluster) } + +// removeRaftNode removes a node from the raft configuration using the leader client +// and removes the docker node. +func removeRaftNode(t *testing.T, node *docker.DockerClusterNode, client *api.Client, serverID string) { + t.Helper() + _, err := client.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{ + "server_id": serverID, + }) + if err != nil { + t.Fatal(err) + } + // clean up the cluster nodes. Note that the node is not removed from the ClusterNodes + node.Cleanup() +} + +// stabilizeAndPromote makes sure the given node ID is among the voters using +// autoPilot state +func stabilizeAndPromote(t *testing.T, client *api.Client, nodeID string) { + t.Helper() + deadline := time.Now().Add(2 * autopilot.DefaultReconcileInterval) + failed := true + var state *api.AutopilotState + var err error + for time.Now().Before(deadline) { + state, err = client.Sys().RaftAutopilotState() + require.NoError(t, err) + if state != nil && state.Servers != nil && state.Servers[nodeID].Status == "voter" { + failed = false + break + } + time.Sleep(1 * time.Second) + } + + if failed { + t.Fatalf("autopilot failed to promote node: id: %#v: state:%# v\n", nodeID, state) + } +} + +// stabilize makes sure the cluster is in a healthy state using autopilot state +func stabilize(t *testing.T, client *api.Client) { + t.Helper() + deadline := time.Now().Add(2 * autopilot.DefaultReconcileInterval) + healthy := false + for time.Now().Before(deadline) { + state, err := client.Sys().RaftAutopilotState() + require.NoError(t, err) + if state.Healthy { + healthy = true + break + } + time.Sleep(1 * time.Second) + } + if !healthy { + t.Fatalf("cluster failed to stabilize") + } +} + +// TestDocker_LogStore_Boltdb_To_Raftwal_And_Back runs 3 node cluster leveraging boltDB +// as the logStore, then migrates the cluster to raft-wal logStore and back. +// This shows raft-wal does not lose data. +// There is no migration procedure for individual nodes. +// The correct procedure is destroying existing raft-boltdb nodes and starting brand-new +// nodes that use raft-wal (and vice-versa) +// Having a cluster of mixed nodes, some using raft-boltdb and some using raft-wal, is not a problem. +func TestDocker_LogStore_Boltdb_To_Raftwal_And_Back(t *testing.T) { + t.Parallel() + binary := os.Getenv("VAULT_BINARY") + if binary == "" { + t.Skip("only running docker test when $VAULT_BINARY present") + } + opts := &docker.DockerClusterOptions{ + ImageRepo: "hashicorp/vault", + // We're replacing the binary anyway, so we're not too particular about + // the docker image version tag. + ImageTag: "latest", + VaultBinary: binary, + ClusterOptions: testcluster.ClusterOptions{ + VaultNodeConfig: &testcluster.VaultNodeConfig{ + LogLevel: "TRACE", + }, + }, + } + cluster := docker.NewTestDockerCluster(t, opts) + defer cluster.Cleanup() + + rafttest.Raft_Configuration_Test(t, cluster) + + leaderNode := cluster.GetActiveClusterNode() + leaderClient := leaderNode.APIClient() + + err := leaderClient.Sys().MountWithContext(context.TODO(), "kv", &api.MountInput{ + Type: "kv", + }) + if err != nil { + t.Fatal(err) + } + + val := 0 + writeKV := func(client *api.Client, num int) { + t.Helper() + start := val + for i := start; i < start+num; i++ { + if _, err = leaderClient.Logical().WriteWithContext(context.TODO(), fmt.Sprintf("kv/foo-%d", i), map[string]interface{}{ + "bar": val, + }); err != nil { + t.Fatal(err) + } + val++ + } + } + + readKV := func(client *api.Client) { + t.Helper() + for i := 0; i < val; i++ { + secret, err := client.Logical().Read(fmt.Sprintf("kv/foo-%d", i)) + if err != nil { + t.Fatal(err) + } + if secret == nil || secret.Data == nil { + t.Fatal("failed to read the value") + } + } + } + // writing then reading some data + writeKV(leaderClient, 10) + readKV(leaderClient) + + if opts.ClusterOptions.VaultNodeConfig.StorageOptions == nil { + opts.ClusterOptions.VaultNodeConfig.StorageOptions = make(map[string]string, 0) + } + // adding three new nodes with raft-wal as their log store + opts.ClusterOptions.VaultNodeConfig.StorageOptions["raft_wal"] = "true" + for i := 0; i < 3; i++ { + if err := cluster.AddNode(context.TODO(), opts); err != nil { + t.Fatal(err) + } + } + // check raft config contain 6 nodes + rafttest.Raft_Configuration_Test(t, cluster) + + // write data before removing two ndoes + writeKV(leaderClient, 10) + + // remove two nodes using boltDB + removeRaftNode(t, cluster.ClusterNodes[1], leaderClient, "core-1") + removeRaftNode(t, cluster.ClusterNodes[2], leaderClient, "core-2") + + // all data should still be readable after removing two nodes + readKV(leaderClient) + + err = testhelpers.VerifyRaftPeers(t, leaderClient, map[string]bool{ + "core-0": true, + "core-3": true, + "core-4": true, + "core-5": true, + }) + if err != nil { + t.Fatal(err) + } + + // stabilize the cluster, wait for the autopilot to promote new nodes to Voter + stabilizeAndPromote(t, leaderClient, "core-3") + stabilizeAndPromote(t, leaderClient, "core-4") + stabilizeAndPromote(t, leaderClient, "core-5") + + // step down leader and remove the node afterwards + // this will remove the container without explicitly calling stepdown + cluster.ClusterNodes[0].Stop() + + // get new leader node + leaderNode = cluster.GetActiveClusterNode() + leaderClient = leaderNode.APIClient() + + // remove the old leader, which was using boltDB + // this will remove it from raft configuration though the container was already removed + removeRaftNode(t, cluster.ClusterNodes[0], leaderClient, "core-0") + + // check if the cluster is stable + stabilize(t, leaderClient) + + // write some more data and read all data again. + // Here, only raft-wal is in use in the cluster. + writeKV(leaderClient, 10) + readKV(leaderClient) + + // going back to boltdb. Adding three nodes to the cluster having them use boltDB + opts.ClusterOptions.VaultNodeConfig.StorageOptions["raft_wal"] = "false" + for i := 0; i < 3; i++ { + if err := cluster.AddNode(context.TODO(), opts); err != nil { + t.Fatal(err) + } + } + // make sure the new nodes are promoted to voter + stabilizeAndPromote(t, leaderClient, "core-6") + stabilizeAndPromote(t, leaderClient, "core-7") + stabilizeAndPromote(t, leaderClient, "core-8") + + err = testhelpers.VerifyRaftPeers(t, leaderClient, map[string]bool{ + "core-3": true, + "core-4": true, + "core-5": true, + "core-6": true, + "core-7": true, + "core-8": true, + }) + if err != nil { + t.Fatal(err) + } + + // get the leader node again just in case + leaderNode = cluster.GetActiveClusterNode() + leaderClient = leaderNode.APIClient() + + // remove all the nodes that use raft-wal except the one that is the leader. + removingNodes := []string{"core-3", "core-4", "core-5"} + var raftWalLeader string + // if the leaderNode.NodeID exists in removingNodes, keep track of that + if strutil.StrListContains(removingNodes, leaderNode.NodeID) { + raftWalLeader = leaderNode.NodeID + } + + // remove all nodes except the leader, if the leader is a node with raft-wal as its log store + for _, node := range cluster.ClusterNodes { + if node.NodeID == leaderNode.NodeID || !strutil.StrListContains(removingNodes, node.NodeID) { + continue + } + removeRaftNode(t, node, leaderClient, node.NodeID) + } + + // write and read data again after removing two or three nodes + writeKV(leaderClient, 10) + readKV(leaderClient) + + // remove the old leader that uses raft-wal as its logStore if it has not been removed + if raftWalLeader != "" { + oldLeader := leaderNode + + // remove the node + leaderNode.Stop() + + // get new leader node + leaderNode = cluster.GetActiveClusterNode() + leaderClient = leaderNode.APIClient() + + // remove the old leader from the raft configuration + removeRaftNode(t, oldLeader, leaderClient, raftWalLeader) + } + + // make sure the cluster is healthy + stabilize(t, leaderClient) + + // write some data again and read all data from the beginning + writeKV(leaderClient, 10) + readKV(leaderClient) +} + +// TestRaft_LogStore_Migration_Snapshot checks migration from a boltDB to raftwal +// by performing a snapshot restore from one cluster to another, and checking no data loss +func TestRaft_LogStore_Migration_Snapshot(t *testing.T) { + t.Parallel() + binary := os.Getenv("VAULT_BINARY") + if binary == "" { + t.Skip("only running docker test when $VAULT_BINARY present") + } + opts := &docker.DockerClusterOptions{ + ImageRepo: "hashicorp/vault", + // We're replacing the binary anyway, so we're not too particular about + // the docker image version tag. + ImageTag: "latest", + VaultBinary: binary, + ClusterOptions: testcluster.ClusterOptions{ + NumCores: 1, + VaultNodeConfig: &testcluster.VaultNodeConfig{ + LogLevel: "TRACE", + }, + }, + } + cluster := docker.NewTestDockerCluster(t, opts) + defer cluster.Cleanup() + rafttest.Raft_Configuration_Test(t, cluster) + + leaderNode := cluster.GetActiveClusterNode() + leaderClient := leaderNode.APIClient() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + err := leaderClient.Sys().MountWithContext(ctx, "kv", &api.MountInput{ + Type: "kv", + }) + if err != nil { + t.Fatal(err) + } + val := 1 + for i := 0; i < 10; i++ { + _, err = leaderClient.Logical().WriteWithContext(ctx, fmt.Sprintf("kv/foo-%d", i), map[string]interface{}{ + "bar": val, + }) + val++ + } + + readKV := func(client *api.Client) { + for i := 0; i < 10; i++ { + secret, err := client.Logical().Read(fmt.Sprintf("kv/foo-%d", i)) + if err != nil { + t.Fatal(err) + } + if secret == nil || secret.Data == nil { + t.Fatal("failed to read the value") + } + } + } + readKV(leaderClient) + + // Take a snapshot + buf := new(bytes.Buffer) + err = leaderClient.Sys().RaftSnapshot(buf) + if err != nil { + t.Fatal(err) + } + snap, err := io.ReadAll(buf) + if err != nil { + t.Fatal(err) + } + if len(snap) == 0 { + t.Fatal("no snapshot returned") + } + + // start a new cluster with raft-wal as its logStore + if opts.ClusterOptions.VaultNodeConfig.StorageOptions == nil { + opts.ClusterOptions.VaultNodeConfig.StorageOptions = make(map[string]string, 0) + } + opts.ClusterOptions.VaultNodeConfig.StorageOptions["raft_wal"] = "true" + + // caching the old cluster's barrier keys + oldBarrierKeys := cluster.GetBarrierKeys() + // clean up the old cluster as there is no further use to it + cluster.Cleanup() + + // Start a new cluster, set the old cluster's barrier keys as its own, and restore + // the snapshot from the old cluster + newCluster := docker.NewTestDockerCluster(t, opts) + defer newCluster.Cleanup() + + // get the leader client + newLeaderNode := newCluster.GetActiveClusterNode() + newLeaderClient := newLeaderNode.APIClient() + + // set the barrier keys to the old cluster so that we could restore the snapshot + newCluster.SetBarrierKeys(oldBarrierKeys) + + // Restore snapshot + err = newLeaderClient.Sys().RaftSnapshotRestore(bytes.NewReader(snap), true) + if err != nil { + t.Fatal(err) + } + + if err = testcluster.UnsealNode(ctx, newCluster, 0); err != nil { + t.Fatal(err) + } + testcluster.WaitForActiveNode(ctx, newCluster) + + // generate a root token as the unseal keys have changed + rootToken, err := testcluster.GenerateRoot(newCluster, testcluster.GenerateRootRegular) + if err != nil { + t.Fatal(err) + } + newLeaderClient.SetToken(rootToken) + + // stabilize the cluster + stabilize(t, newLeaderClient) + // check all data exists + readKV(newLeaderClient) +} diff --git a/vault/raft.go b/vault/raft.go index 585c0f08af3f..abc7a4acaf37 100644 --- a/vault/raft.go +++ b/vault/raft.go @@ -324,6 +324,9 @@ func (c *Core) setupRaftActiveNode(ctx context.Context) error { return err } + // Run the verifier if we're configured to do so + raftBackend.StartRaftWalVerifier(ctx) + if err := c.startPeriodicRaftTLSRotate(ctx); err != nil { return err }