Skip to content

Commit

Permalink
implement (re)store of proxy's state
Browse files Browse the repository at this point in the history
Introduce the high availability feature of cc-proxy by implementing
store/restore of proxy's state to/from disk. This feature depends
on the ability of shim to reconnect to cc-proxy if connection is lost.

Fixes clearcontainers#4.

Signed-off-by: Dmitry Voytik <[email protected]>
  • Loading branch information
Dmitry Voytik committed Aug 18, 2017
1 parent d7a4dd8 commit 78688fb
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 12 deletions.
246 changes: 238 additions & 8 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -124,6 +125,33 @@ func newClient(proxy *proxy, conn net.Conn) *client {
}
}

func (proxy *proxy) restoreTokens(vm *vm, tokens []string) error {
if len(tokens) == 0 {
return nil
}

for _, token := range tokens {
token, err := vm.AllocateTokenAs(Token(token))
if err != nil {
return err
}
proxy.Lock()
proxy.tokenToVM[token] = &tokenInfo{
state: tokenStateAllocated,
vm: vm,
}
proxy.Unlock()

session := vm.findSessionByToken(token)
if session == nil {
return fmt.Errorf("unknown token %s", token)
}
// Signal that the process is already started
close(session.processStarted)
}
return nil
}

func (proxy *proxy) allocateTokens(vm *vm, numIOStreams int) (*api.IOResponse, error) {
url := url.URL{
Scheme: "unix",
Expand Down Expand Up @@ -188,6 +216,207 @@ func (proxy *proxy) releaseToken(token Token) (*tokenInfo, error) {
return info, nil
}

var storeStateDir string = "/var/lib/clear-containers/proxy/"

type proxyStateOnDisk struct {
SocketPath string `json:"socket_path"`
EnableVMConsole bool `json:"enable_vm_console"`
ContainerIDs []string `json:"container_ids"`
}

// returns false if it's a clean start (i.e. no state is stored) or restoring failed
func (proxy *proxy) restoreState() bool {
proxyStateFilePath := storeStateDir + "proxy_state.json"
if _, err := os.Stat(storeStateDir); os.IsNotExist(err) {
if err := os.MkdirAll(storeStateDir, 0755); err != nil {
proxyLog.Errorf("Couldn't create directory %s: %v",
storeStateDir, err)
}
return false
}
if _, err := os.Stat(proxyStateFilePath); os.IsNotExist(err) {
return false
}

fdata, err := ioutil.ReadFile(proxyStateFilePath)
if err != nil {
proxyLog.Errorf("Couldn't recover from %s: %v", proxyStateFilePath, err)
return false
}

var proxyState proxyStateOnDisk
err = json.Unmarshal(fdata, &proxyState)
if err != nil {
proxyLog.Errorf("Couldn't unmarshal %s: %v", proxyStateFilePath, err)
return false
}
proxyLog.Debugf("proxy: %+v", proxyState)

if len(proxyState.ContainerIDs) == 0 {
return false
}
proxyLog.Warn("Recovering proxy state from: ", proxyStateFilePath)
proxy.socketPath = proxyState.SocketPath
proxy.enableVMConsole = proxyState.EnableVMConsole

for _, contID := range proxyState.ContainerIDs {
go restoreVMState(proxy, contID)
}

return true
}

func (proxy *proxy) storeState() error {
proxyStateFilePath := storeStateDir + "proxy_state.json"
proxy.Lock()
defer proxy.Unlock()

// if there are 0 VMs then remove state from disk
if (len(proxy.vms)) == 0 {
if err := os.Remove(proxyStateFilePath); err != nil {
proxyLog.Errorf("Can not remove %s: %v", proxyStateFilePath, err)
return err
}
return nil
}

proxyState := &proxyStateOnDisk{
SocketPath: proxy.socketPath,
EnableVMConsole: proxy.enableVMConsole,
ContainerIDs: make([]string, 0, len(proxy.vms)),
}
for cid := range proxy.vms {
proxyState.ContainerIDs = append(proxyState.ContainerIDs, cid)
}

data, err := json.MarshalIndent(proxyState, "", "\t")
if err != nil {
return err
}
ioutil.WriteFile(proxyStateFilePath, data, 0644)

return nil
}

// Represents vm struct on disk
type vmStateOnDisk struct {
RegisterVM api.RegisterVM `json:"registerVM"`
Tokens []string `json:"tokens"`
}

func vmStateFilePath(id string) string {
return storeStateDir + "vm_" + id + ".json"
}

func storeVMState(vm *vm, tokens []string) {
odVM := vmStateOnDisk{
RegisterVM: api.RegisterVM{
ContainerID: vm.containerID,
CtlSerial: vm.hyperHandler.GetCtlSockPath(),
IoSerial: vm.hyperHandler.GetIoSockPath(),
Console: vm.console.socketPath,
},
Tokens: tokens,
}
o, err := json.MarshalIndent(&odVM, "", "\t")
if err != nil {
proxyLog.WithField("vm", vm.containerID).Warn("Couldn't marshal VM state")
}
storeFile := vmStateFilePath(vm.containerID)
if err = ioutil.WriteFile(storeFile, o, 0644); err != nil {
proxyLog.WithField("vm", vm.containerID).Warn("Couldn't store VM state to ",
storeFile)
}
}

func delVMAndState(proxy *proxy, vm *vm) {
proxyLog.Infof("Removing on-disk state of %s", vm.containerID)
proxy.Lock()
delete(proxy.vms, vm.containerID)
proxy.Unlock()
proxy.storeState()
storeFile := vmStateFilePath(vm.containerID)
if err := os.Remove(storeFile); err != nil {
proxyLog.WithField("vm", vm.containerID).Warn("Couldn't remove file ",
storeFile)
}
}

func restoreVMState(proxy *proxy, containerID string) {
vmStateFilePath := vmStateFilePath(containerID)
proxy.Lock()
fdata, err := ioutil.ReadFile(vmStateFilePath)
proxy.Unlock()
if err != nil {
proxyLog.Errorf("Couldn't read %s: %v", vmStateFilePath, err)
return
}

var vmState vmStateOnDisk
err = json.Unmarshal(fdata, &vmState)
if err != nil {
proxyLog.Errorf("Can not unmarshal %s: %v", vmStateFilePath, err)
return
}
proxyLog.Debugf("restored vm state: %+v", vmState)

regVM := vmState.RegisterVM

if regVM.ContainerID == "" || regVM.CtlSerial == "" || regVM.IoSerial == "" {
proxyLog.Errorf("wrong VM parameters")
return
}

proxy.Lock()
if _, ok := proxy.vms[regVM.ContainerID]; ok {
proxy.Unlock()
proxyLog.Errorf("%s: container already registered", regVM.ContainerID)
return
}
vm := newVM(regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial)
proxy.vms[regVM.ContainerID] = vm
proxy.Unlock()

proxyLog.Infof("restoreVMState(containerId=%s,ctlSerial=%s,ioSerial=%s,console=%s)",
regVM.ContainerID, regVM.CtlSerial, regVM.IoSerial, regVM.Console)

if regVM.Console != "" && proxy.enableVMConsole {
vm.setConsole(regVM.Console)
}

if err := proxy.restoreTokens(vm, vmState.Tokens); err != nil {
proxyLog.Errorf("Failed to restore tokens: %v", err)
return
}

for _, token := range vmState.Tokens {
session := vm.findSessionByToken(Token(token))
if session == nil {
proxyLog.Errorf("Session must be not nil")
delVMAndState(proxy, vm)
return
}
if err := session.WaitForShim(); err != nil {
proxyLog.Errorf("Failed to re-connect with shim: %v", err)
delVMAndState(proxy, vm)
return
}
}
if err := vm.Reconnect(true); err != nil {
proxyLog.Errorf("Failed to connect: %v", err)
delVMAndState(proxy, vm)
return
}

// We start one goroutine per-VM to monitor the qemu process
proxy.wg.Add(1)
go func() {
<-vm.OnVMLost()
vm.Close()
proxy.wg.Done()
}()
}

// "RegisterVM"
func registerVM(data []byte, userData interface{}, response *handlerResponse) {
client := userData.(*client)
Expand Down Expand Up @@ -242,6 +471,8 @@ func registerVM(data []byte, userData interface{}, response *handlerResponse) {
}

client.vm = vm
storeVMState(vm, io.Tokens)
proxy.storeState()

// We start one goroutine per-VM to monitor the qemu process
proxy.wg.Add(1)
Expand Down Expand Up @@ -314,9 +545,7 @@ func unregisterVM(data []byte, userData interface{}, response *handlerResponse)

client.log.Info("UnregisterVM()")

proxy.Lock()
delete(proxy.vms, vm.containerID)
proxy.Unlock()
delVMAndState(proxy, vm)

client.vm = nil
}
Expand Down Expand Up @@ -598,12 +827,13 @@ func (proxy *proxy) init() error {
var l net.Listener
var err error

// flags
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel
if !proxy.restoreState() {
// flags
proxy.enableVMConsole = logrus.GetLevel() == logrus.DebugLevel

// Open the proxy socket
if proxy.socketPath, err = getSocketPath(); err != nil {
return fmt.Errorf("couldn't get a rigth socket path: %v", err)
if proxy.socketPath, err = getSocketPath(); err != nil {
return fmt.Errorf("couldn't get a rigth socket path: %v", err)
}
}
fds := listenFds()

Expand Down
27 changes: 27 additions & 0 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"encoding/json"
"io/ioutil"
"net"
"os"
"strings"
Expand Down Expand Up @@ -900,3 +901,29 @@ func TestHyperstartResponse(t *testing.T) {

rig.Stop()
}

func TestStoreRestore(t *testing.T) {
storeStateDir = "/tmp/clearcontainers/proxy/"
rig := newTestRig(t)
rig.Start()

// clean up a possible state
os.RemoveAll(storeStateDir)
assert.Equal(t, rig.proxy.restoreState(), false)

rig.RegisterVM()
rig.Stop()
// the state must be present on the disk
files, err := ioutil.ReadDir(storeStateDir)
assert.Nil(t, err)
assert.Equal(t, len(files), 2)
assert.Equal(t, files[0].Name(), "proxy_state.json")

rig.Start()
assert.Equal(t, rig.proxy.restoreState(), true)
assert.Nil(t, rig.Client.UnregisterVM(testContainerID))
// the state must be absent on the disk
files, err = ioutil.ReadDir(storeStateDir)
assert.Nil(t, err)
assert.Equal(t, len(files), 0)
}
25 changes: 21 additions & 4 deletions vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func (vm *vm) consoleToLog() {
}

func (vm *vm) Connect() error {
return vm.Reconnect(false)
}

func (vm *vm) Reconnect(reconnect bool) error {
if vm.console.socketPath != "" {
var err error

Expand All @@ -289,7 +293,12 @@ func (vm *vm) Connect() error {
return err
}

if err := vm.hyperHandler.WaitForReady(); err != nil {
if reconnect {
if !vm.hyperHandler.IsStarted() {
vm.hyperHandler.CloseSockets()
return errors.New("failed to reconnect to the agent")
}
} else if err := vm.hyperHandler.WaitForReady(); err != nil {
vm.hyperHandler.CloseSockets()
return err
}
Expand Down Expand Up @@ -589,6 +598,11 @@ func (session *ioSession) SendSignal(signal syscall.Signal) error {
}

func (vm *vm) AllocateToken() (Token, error) {
return vm.AllocateTokenAs("")
}

// if token == "" then a new token is generate, otherwise provided token is reused
func (vm *vm) AllocateTokenAs(token Token) (Token, error) {
vm.Lock()
defer vm.Unlock()

Expand All @@ -598,9 +612,12 @@ func (vm *vm) AllocateToken() (Token, error) {
ioBase := vm.nextIoBase
vm.nextIoBase += uint64(nStreams)

token, err := GenerateToken(32)
if err != nil {
return nilToken, err
if token == "" {
var err error
token, err = GenerateToken(32)
if err != nil {
return nilToken, err
}
}

session := &ioSession{
Expand Down

0 comments on commit 78688fb

Please sign in to comment.