generated from Jille/raft-grpc-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
112 lines (94 loc) · 2.86 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"path/filepath"
pb "github.com/Jille/raft-grpc-example/proto"
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
transport "github.com/Jille/raft-grpc-transport"
"github.com/Jille/raftadmin"
"github.com/hashicorp/raft"
boltdb "github.com/hashicorp/raft-boltdb"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var (
myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node")
raftId = flag.String("raft_id", "", "Node id used by Raft")
raftDir = flag.String("raft_data_dir", "data/", "Raft data dir")
raftBootstrap = flag.Bool("raft_bootstrap", false, "Whether to bootstrap the Raft cluster")
)
func main() {
flag.Parse()
if *raftId == "" {
log.Fatalf("flag --raft_id is required")
}
ctx := context.Background()
_, port, err := net.SplitHostPort(*myAddr)
if err != nil {
log.Fatalf("failed to parse local address (%q): %v", *myAddr, err)
}
sock, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
wt := &wordTracker{}
r, tm, err := NewRaft(ctx, *raftId, *myAddr, wt)
if err != nil {
log.Fatalf("failed to start raft: %v", err)
}
s := grpc.NewServer()
pb.RegisterExampleServer(s, &rpcInterface{
wordTracker: wt,
raft: r,
})
tm.Register(s)
leaderhealth.Setup(r, s, []string{"Example"})
raftadmin.Register(s, r)
reflection.Register(s)
if err := s.Serve(sock); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func NewRaft(ctx context.Context, myID, myAddress string, fsm raft.FSM) (*raft.Raft, *transport.Manager, error) {
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(myID)
baseDir := filepath.Join(*raftDir, myID)
ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
}
sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
if err != nil {
return nil, nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
}
fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
if err != nil {
return nil, nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{grpc.WithInsecure()})
r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
if err != nil {
return nil, nil, fmt.Errorf("raft.NewRaft: %v", err)
}
if *raftBootstrap {
cfg := raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
ID: raft.ServerID(myID),
Address: raft.ServerAddress(myAddress),
},
},
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
}
return r, tm, nil
}