Skip to content

Commit

Permalink
proxy: implement (re)store of proxy's state
Browse files Browse the repository at this point in the history
Introduce the high availability feature of cc-proxy by implementing
store/restore of proxy's state to/from disk. This feature depends
on the ability of shim to reconnect to cc-proxy if connection is lost.

Fixes clearcontainers#4.

Signed-off-by: Dmitry Voytik <[email protected]>
  • Loading branch information
Dmitry Voytik committed Oct 5, 2017
1 parent 05e19de commit 83827c8
Show file tree
Hide file tree
Showing 4 changed files with 395 additions and 13 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ LOCALSTATEDIR := /var

SOURCES := $(shell find . 2>&1 | grep -E '.*\.(c|h|go)$$')
PROXY_SOCKET := $(LOCALSTATEDIR)/run/clear-containers/proxy.sock
STORE_STATE_DIR := $(LOCALSTATEDIR)/lib/clear-containers/proxy/

DESCRIBE := $(shell git describe 2> /dev/null || true)
DESCRIBE_DIRTY := $(if $(shell git status --porcelain --untracked-files=no 2> /dev/null),${DESCRIBE}-dirty,${DESCRIBE})
Expand Down Expand Up @@ -53,7 +54,7 @@ all: cc-proxy $(UNIT_FILES)

cc-proxy: $(SOURCES) Makefile
$(QUIET_GOBUILD)go build -i -o $@ -ldflags \
"-X main.DefaultSocketPath=$(PROXY_SOCKET) -X main.Version=$(VERSION)"
"-X main.DefaultSocketPath=$(PROXY_SOCKET) -X main.Version=$(VERSION) -X main.storeStateDir=$(STORE_STATE_DIR)"

#
# Tests
Expand Down
289 changes: 281 additions & 8 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
Expand All @@ -36,6 +37,13 @@ import (
"github.com/clearcontainers/proxy/api"
)

// storeStateDir is populated at link time with the value of:
// $(LOCALSTATEDIR)/lib/clear-containers/proxy/"
var storeStateDir string = "/var/lib/clearcontainers/proxy/"

const proxyStateDirPerm = 0755
const proxyStateFilesPerm = 0640

// tokenState tracks if an I/O token has been claimed by a shim.
type tokenState int

Expand Down Expand Up @@ -84,6 +92,20 @@ type proxy struct {
wg sync.WaitGroup
}

// proxyStateOnDisk is used to re(store) proxy state on disk
type proxyStateOnDisk struct {
Version string `json:"version"`
SocketPath string `json:"socket_path"`
EnableVMConsole bool `json:"enable_vm_console"`
ContainerIDs []string `json:"container_ids"`
}

// vmStateOnDisk is used to re(store) vm struct on disk
type vmStateOnDisk struct {
RegisterVM api.RegisterVM `json:"registerVM"`
Tokens []string `json:"tokens"`
}

type clientKind int

const (
Expand Down Expand Up @@ -129,6 +151,33 @@ func newClient(proxy *proxy, conn net.Conn) *client {
}
}

func (proxy *proxy) restoreTokens(vm *vm, tokens []string) error {
if vm == nil {
return fmt.Errorf("vm parameter must be not nil")
}

for _, token := range tokens {
token, err := vm.AllocateTokenAs(Token(token))
if err != nil {
return err
}
proxy.Lock()
proxy.tokenToVM[token] = &tokenInfo{
state: tokenStateAllocated,
vm: vm,
}
proxy.Unlock()

session := vm.findSessionByToken(token)
if session == nil {
return fmt.Errorf("unknown token %s", token)
}
// Signal that the process is already started
close(session.processStarted)
}
return nil
}

func (proxy *proxy) allocateTokens(vm *vm, numIOStreams int) (*api.IOResponse, error) {
url := url.URL{
Scheme: "unix",
Expand Down Expand Up @@ -193,6 +242,229 @@ func (proxy *proxy) releaseToken(token Token) (*tokenInfo, error) {
return info, nil
}

// returns false if it's a clean start (i.e. no state is stored) or restoring failed
func (proxy *proxy) restoreState() bool {
proxyStateFilePath := storeStateDir + "proxy_state.json"
if _, err := os.Stat(storeStateDir); os.IsNotExist(err) {
err := os.MkdirAll(storeStateDir, proxyStateDirPerm)
if err != nil {
proxyLog.Errorf("Couldn't create directory %s: %v",
storeStateDir, err)
}
return false
}

fdata, err := ioutil.ReadFile(proxyStateFilePath)
if err != nil {
proxyLog.Errorf("Couldn't recover from %s: %v", proxyStateFilePath, err)
return false
}

var proxyState proxyStateOnDisk
err = json.Unmarshal(fdata, &proxyState)
if err != nil {
proxyLog.Errorf("Couldn't unmarshal %s: %v", proxyStateFilePath, err)
return false
}
proxyLog.Debugf("proxy: %+v", proxyState)

if len(proxyState.ContainerIDs) == 0 {
return false
}
proxyLog.Warn("Recovering proxy state from: ", proxyStateFilePath)
if proxyState.Version != Version {
proxyLog.Warnf("Stored state version (%s) mismatches proxy"+
" version (%s). Aborting", proxyState.Version, Version)
return false
}

proxy.socketPath = proxyState.SocketPath
proxy.enableVMConsole = proxyState.EnableVMConsole

for _, contID := range proxyState.ContainerIDs {
go restoreVMState(proxy, contID)
}

return true
}

func (proxy *proxy) storeState() {
proxyStateFilePath := storeStateDir + "proxy_state.json"
proxy.Lock()
defer proxy.Unlock()

// if there are 0 VMs then remove state from disk
if (len(proxy.vms)) == 0 {
if err := os.Remove(proxyStateFilePath); err != nil {
proxyLog.Errorf("Couldn't remove %s: %v",
proxyStateFilePath, err)
}
return
}

proxyState := &proxyStateOnDisk{
Version: Version,
SocketPath: proxy.socketPath,
EnableVMConsole: proxy.enableVMConsole,
ContainerIDs: make([]string, 0, len(proxy.vms)),
}
for cid := range proxy.vms {
proxyState.ContainerIDs = append(proxyState.ContainerIDs, cid)
}

data, err := json.MarshalIndent(proxyState, "", "\t")
if err != nil {
proxyLog.Errorf("Couldn't marshal proxy state %+v", proxyState)
}
err = ioutil.WriteFile(proxyStateFilePath, data, proxyStateFilesPerm)
if err != nil {
proxyLog.Errorf("Couldn't store proxy state to %s: %v",
proxyStateFilePath, err)
}
}

func vmStateFilePath(id string) string {
return storeStateDir + "vm_" + id + ".json"
}

func storeVMState(vm *vm, tokens []string) {
odVM := vmStateOnDisk{
RegisterVM: api.RegisterVM{
ContainerID: vm.containerID,
CtlSerial: vm.hyperHandler.GetCtlSockPath(),
IoSerial: vm.hyperHandler.GetIoSockPath(),
Console: vm.console.socketPath,
},
Tokens: tokens,
}
o, err := json.MarshalIndent(&odVM, "", "\t")
if err != nil {
proxyLog.WithField("vm", vm.containerID).Warnf(
"Couldn't marshal VM state: %v", err)
return
}
storeFile := vmStateFilePath(vm.containerID)
err = ioutil.WriteFile(storeFile, o, proxyStateFilesPerm)
if err != nil {
proxyLog.WithField("vm", vm.containerID).Warnf(
"Couldn't store VM state to %s: %v", storeFile, err)
}
}

func delVMAndState(proxy *proxy, vm *vm) {
if proxy == nil {
proxyLog.Error("proxy parameter must be not nil")
return
}
if vm == nil {
proxyLog.Error("vm parameter must be not nil")
return
}
proxyLog.Infof("Removing on-disk state of %s", vm.containerID)
proxy.Lock()
delete(proxy.vms, vm.containerID)
proxy.Unlock()
proxy.storeState()
storeFile := vmStateFilePath(vm.containerID)
if err := os.Remove(storeFile); err != nil {
proxyLog.WithField("vm", vm.containerID).Warnf(
"Couldn't remove file %s: %v", storeFile, err)
}
}

func readVMState(containerID string) *vmStateOnDisk {
if containerID == "" {
proxyLog.Errorf("containerID parameter must be not empty")
return nil
}
vmStateFilePath := vmStateFilePath(containerID)
fdata, err := ioutil.ReadFile(vmStateFilePath)
if err != nil {
proxyLog.Errorf("Couldn't read %s: %v", vmStateFilePath, err)
return nil
}

var vmState vmStateOnDisk
err = json.Unmarshal(fdata, &vmState)
if err != nil {
proxyLog.Errorf("Couldn't unmarshal %s: %v", vmStateFilePath, err)
return nil
}
proxyLog.Debugf("restoring vm state: %+v", vmState)
return &vmState
}

func restoreTokens(proxy *proxy, vmState *vmStateOnDisk, vm *vm) {
if err := proxy.restoreTokens(vm, vmState.Tokens); err != nil {
proxyLog.Errorf("Failed to restore tokens: %v", err)
return
}

for _, token := range vmState.Tokens {
session := vm.findSessionByToken(Token(token))
if session == nil {
proxyLog.Errorf("Session must be not nil")
delVMAndState(proxy, vm)
return
}
if err := session.WaitForShim(); err != nil {
proxyLog.Errorf("Failed to re-connect with shim: %v", err)
delVMAndState(proxy, vm)
return
}
}
}

func restoreVMState(proxy *proxy, containerID string) {
if proxy == nil {
proxyLog.Errorf("proxy parameter must be not nil")
return
}

vmState := readVMState(containerID)
if vmState == nil {
return
}

regVM := vmState.RegisterVM
if regVM.ContainerID == "" || regVM.CtlSerial == "" || regVM.IoSerial == "" {
proxyLog.Errorf("wrong VM parameters")
return
}

proxy.Lock()
if _, ok := proxy.vms[regVM.ContainerID]; ok {
proxy.Unlock()
proxyLog.Errorf("%s: container already registered", regVM.ContainerID)
return
}
vm := newVM(regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial)
proxy.vms[regVM.ContainerID] = vm
proxy.Unlock()

proxyLog.Infof("restoreVMState(containerId=%s,ctlSerial=%s,ioSerial=%s,console=%s)",
regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial, regVM.Console)

if regVM.Console != "" && proxy.enableVMConsole {
vm.setConsole(regVM.Console)
}

restoreTokens(proxy, vmState, vm)
if err := vm.Reconnect(true); err != nil {
proxyLog.Errorf("Failed to connect: %v", err)
delVMAndState(proxy, vm)
return
}

// We start one goroutine per-VM to monitor the qemu process
proxy.wg.Add(1)
go func() {
<-vm.OnVMLost()
vm.Close()
proxy.wg.Done()
}()
}

// "RegisterVM"
func registerVM(data []byte, userData interface{}, response *handlerResponse) {
client := userData.(*client)
Expand Down Expand Up @@ -247,6 +519,8 @@ func registerVM(data []byte, userData interface{}, response *handlerResponse) {
}

client.vm = vm
storeVMState(vm, io.Tokens)
proxy.storeState()

if proxyKSM != nil {
proxyKSM.kick()
Expand Down Expand Up @@ -323,9 +597,7 @@ func unregisterVM(data []byte, userData interface{}, response *handlerResponse)

client.log.Info("UnregisterVM()")

proxy.Lock()
delete(proxy.vms, vm.containerID)
proxy.Unlock()
delVMAndState(proxy, vm)

client.vm = nil
}
Expand Down Expand Up @@ -617,12 +889,13 @@ func (proxy *proxy) init() error {
var l net.Listener
var err error

// flags
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel
if !proxy.restoreState() {
// flags
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel

// Open the proxy socket
if proxy.socketPath, err = getSocketPath(); err != nil {
return fmt.Errorf("couldn't get a rigth socket path: %v", err)
if proxy.socketPath, err = getSocketPath(); err != nil {
return fmt.Errorf("couldn't get a right socket path: %v", err)
}
}
fds := listenFds()

Expand Down
Loading

0 comments on commit 83827c8

Please sign in to comment.