Skip to content

Commit

Permalink
use labels as source
Browse files Browse the repository at this point in the history
  • Loading branch information
pixxon committed Apr 30, 2024
1 parent 7637975 commit e56b8e6
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 13 deletions.
101 changes: 89 additions & 12 deletions cmd/backup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,50 @@ import (

type command struct {
logger *slog.Logger
schedules []cron.EntryID
schedules map[configStrategy][]cron.EntryID
cr *cron.Cron
reload chan struct{}
}

func newCommand() *command {
return &command{
logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
schedules: map[configStrategy][]cron.EntryID{
configStrategyEnv: {},
configStrategyConfd: {},
configStrategyLabel: {},
},
}
}

// runAsCommand executes a backup run for each configuration that is available
// and then returns
func (c *command) runAsCommand() error {
func (c *command) runAsCommand(opts commandOpts) error {
configurations, err := sourceConfiguration(configStrategyEnv)
if err != nil {
return errwrap.Wrap(err, "error loading env vars")
}

for _, config := range configurations {
if err := runScript(config); err != nil {
return errwrap.Wrap(err, "error running script")
if config.source == opts.source {
if err := runScript(config); err != nil {
return errwrap.Wrap(err, "error running script")
}
return nil
}
}

configurations, err = sourceConfiguration(configStrategyLabel)
if err != nil {
return errwrap.Wrap(err, "error loading labels")
}

for _, config := range configurations {
if config.source == opts.source {
if err := runProxy(config); err != nil {
return errwrap.Wrap(err, "error running script")
}
return nil
}
}

Expand All @@ -48,6 +70,10 @@ type foregroundOpts struct {
profileCronExpression string
}

type commandOpts struct {
source string
}

// runInForeground starts the program as a long running process, scheduling
// a job for each configuration that is available.
func (c *command) runInForeground(opts foregroundOpts) error {
Expand All @@ -59,7 +85,11 @@ func (c *command) runInForeground(opts foregroundOpts) error {
),
)

if err := c.schedule(configStrategyConfd); err != nil {
if err := c.scheduleConfd(); err != nil {
return errwrap.Wrap(err, "error scheduling")
}

if err := c.scheduleLabel(); err != nil {
return errwrap.Wrap(err, "error scheduling")
}

Expand All @@ -81,7 +111,10 @@ func (c *command) runInForeground(opts foregroundOpts) error {
<-ctx.Done()
return nil
case <-c.reload:
if err := c.schedule(configStrategyConfd); err != nil {
if err := c.scheduleConfd(); err != nil {
return errwrap.Wrap(err, "error reloading configuration")
}
if err := c.scheduleLabel(); err != nil {
return errwrap.Wrap(err, "error reloading configuration")
}
}
Expand All @@ -90,12 +123,12 @@ func (c *command) runInForeground(opts foregroundOpts) error {

// schedule wipes all existing schedules and enqueues all schedules available
// using the given configuration strategy
func (c *command) schedule(strategy configStrategy) error {
for _, id := range c.schedules {
func (c *command) scheduleConfd() error {
for _, id := range c.schedules[configStrategyConfd] {
c.cr.Remove(id)
}

configurations, err := sourceConfiguration(strategy)
configurations, err := sourceConfiguration(configStrategyConfd)
if err != nil {
return errwrap.Wrap(err, "error sourcing configuration")
}
Expand Down Expand Up @@ -131,12 +164,56 @@ func (c *command) schedule(strategy configStrategy) error {
c.logger.Warn(
fmt.Sprintf("Scheduled cron expression %s will never run, is this intentional?", config.BackupCronExpression),
)
c.schedules[configStrategyConfd] = append(c.schedules[configStrategyConfd], id)
}
}

return nil
}

func (c *command) scheduleLabel() error {
for _, id := range c.schedules[configStrategyLabel] {
c.cr.Remove(id)
}

if err != nil {
return errwrap.Wrap(err, "error scheduling")
configurations, err := sourceConfiguration(configStrategyLabel)
if err != nil {
return errwrap.Wrap(err, "error sourcing configuration")
}

for _, cfg := range configurations {
if ok := checkCronSchedule(cfg.BackupCronExpression); !ok {
c.logger.Warn(
fmt.Sprintf("Scheduled cron expression %s will never run, is this intentional?", cfg.BackupCronExpression),
)
}

id, err := c.cr.AddFunc(cfg.BackupCronExpression, func() {
c.logger.Info(
fmt.Sprintf(
"Now running script on schedule %s",
cfg.BackupCronExpression,
),
)

if err := runProxy(cfg); err != nil {
c.logger.Error(
fmt.Sprintf(
"Unexpected error running schedule %s: %v",
cfg.BackupCronExpression,
errwrap.Unwrap(err),
),
"error",
err,
)
}
c.schedules = append(c.schedules, id)
})
if err != nil {
return errwrap.Wrap(err, fmt.Sprintf("error adding schedule %s", cfg.BackupCronExpression))
}

c.logger.Info(fmt.Sprintf("Successfully scheduled backup %s with expression %s", cfg.source, cfg.BackupCronExpression))
c.schedules[configStrategyConfd] = append(c.schedules[configStrategyConfd], id)
}

return nil
Expand Down
1 change: 1 addition & 0 deletions cmd/backup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
// Config holds all configuration values that are expected to be set
// by users.
type Config struct {
Enabled bool
AwsS3BucketName string `split_words:"true"`
AwsS3Path string `split_words:"true"`
AwsEndpoint string `split_words:"true" default:"s3.amazonaws.com"`
Expand Down
49 changes: 49 additions & 0 deletions cmd/backup/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ package main

import (
"bufio"
"context"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/joho/godotenv"
"github.com/offen/docker-volume-backup/internal/errwrap"
"github.com/offen/envconfig"
"github.com/traefik/paerser/parser"
shell "mvdan.cc/sh/v3/shell"
)

Expand All @@ -21,6 +26,7 @@ type configStrategy string
const (
configStrategyEnv configStrategy = "env"
configStrategyConfd configStrategy = "confd"
configStrategyLabel configStrategy = "label"
)

// sourceConfiguration returns a list of config objects using the given
Expand All @@ -39,6 +45,19 @@ func sourceConfiguration(strategy configStrategy) ([]*Config, error) {
}
return nil, errwrap.Wrap(err, "error loading config files")
}
return cs, nil
case configStrategyLabel:
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, errwrap.Wrap(err, "failed to create docker client")
}
defer cli.Close()

cs, err := loadConfigsFromLabels(cli)
if err != nil {
return nil, errwrap.Wrap(err, "error loading configs from labels")
}

return cs, nil
default:
return nil, errwrap.Wrap(nil, fmt.Sprintf("received unknown config strategy: %v", strategy))
Expand Down Expand Up @@ -125,6 +144,36 @@ func loadConfigsFromEnvFiles(directory string) ([]*Config, error) {
return configs, nil
}

func loadConfigsFromLabels(cli *client.Client) ([]*Config, error) {
filter := filters.NewArgs(
filters.KeyValuePair{
Key: "label",
Value: "docker-volume-backup.enabled=true",
},
)
volresp, err := cli.VolumeList(context.Background(), volume.ListOptions{Filters: filter})
if err != nil {
return nil, errwrap.Wrap(err, "failed to list volumes")
}

configs := []*Config{}
for _, vol := range volresp.Volumes {
config := &Config{}

config.NotificationLevel = "error"

err = parser.Decode(vol.Labels, config, "docker-volume-backup", "docker-volume-backup")
if err != nil {
return nil, errwrap.Wrap(err, fmt.Sprintf("failed to decode config from labels of volume %s", vol.Name))
}
config.source = vol.Name

configs = append(configs, config)
}

return configs, nil
}

// source tries to mimic the pre v2.37.0 behavior of calling
// `set +a; source $path; set -a` and returns the env vars as a map
func source(path string) (map[string]string, error) {
Expand Down
6 changes: 5 additions & 1 deletion cmd/backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

func main() {
foreground := flag.Bool("foreground", false, "run the tool in the foreground")
source := flag.String("source", "from environment", "source of backup to execute in command mode")
profile := flag.String("profile", "", "collect runtime metrics and log them periodically on the given cron expression")
flag.Parse()

Expand All @@ -19,6 +20,9 @@ func main() {
}
c.must(c.runInForeground(opts))
} else {
c.must(c.runAsCommand())
opts := commandOpts{
source: *source,
}
c.must(c.runAsCommand(opts))
}
}
1 change: 1 addition & 0 deletions cmd/backup/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package main
103 changes: 103 additions & 0 deletions cmd/backup/run_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"context"
"fmt"
"io"
"os"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/offen/docker-volume-backup/internal/errwrap"
)

func runProxy(c *Config) (err error) {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return errwrap.Wrap(err, "failed to create docker client")
}
defer cli.Close()

reader, err := cli.ImagePull(context.Background(), "offen/docker-volume-backup:v2.39.1", types.ImagePullOptions{})
if err != nil {
return errwrap.Wrap(err, "unable to pull image")
}
io.Copy(os.Stdout, reader)
defer reader.Close()

networkResp, err := cli.NetworkList(context.Background(), types.NetworkListOptions{Filters: filters.NewArgs(filters.KeyValuePair{Key: "name", Value: "volumes_default"})})

resp, err := cli.ContainerCreate(context.Background(), &container.Config{
Image: "offen/docker-volume-backup:v2.39.1",
Tty: true,
Entrypoint: []string{"backup"},
Env: []string{
fmt.Sprintf("AWS_ACCESS_KEY_ID=%s", c.AwsAccessKeyID),
fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s", c.AwsSecretAccessKey),
fmt.Sprintf("AWS_ENDPOINT=%s", c.AwsEndpoint),
fmt.Sprintf("AWS_ENDPOINT_PROTO=%s", c.AwsEndpointProto),
fmt.Sprintf("AWS_S3_BUCKET_NAME=%s", c.AwsS3BucketName),
fmt.Sprintf("BACKUP_FILENAME_EXPAND=%t", c.BackupFilenameExpand),
fmt.Sprintf("BACKUP_FILENAME=%s", c.BackupFilename),
fmt.Sprintf("BACKUP_CRON_EXPRESSION=%s", c.BackupCronExpression),
fmt.Sprintf("BACKUP_RETENTION_DAYS=%x", c.BackupRetentionDays),
fmt.Sprintf("BACKUP_PRUNING_LEEWAY=%s", c.BackupPruningLeeway),
fmt.Sprintf("BACKUP_PRUNING_PREFIX=%s", c.BackupPruningPrefix),
fmt.Sprintf("HOSTNAME=%s", os.Getenv("HOSTNAME")),
},
}, &container.HostConfig{
Mounts: []mount.Mount{
{
Type: mount.TypeVolume,
Source: c.source,
Target: "/backup",
},
{
Type: mount.TypeBind,
Source: "/var/run/docker.sock",
Target: "/var/run/docker.sock",
},
},
}, &network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
"mynetwork": {
NetworkID: networkResp[0].ID,
},
},
}, nil, "")
if err != nil {
return errwrap.Wrap(err, "unable to create container")
}

if err := cli.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil {
return errwrap.Wrap(err, "unable to start container")
}

statusCh, errCh := cli.ContainerWait(context.Background(), resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
return errwrap.Wrap(err, "error running container")
}
case <-statusCh:
}

out, err := cli.ContainerLogs(context.Background(), resp.ID, types.ContainerLogsOptions{ShowStdout: true})
if err != nil {
return errwrap.Wrap(err, "unable to get logs from container")
}

err = cli.ContainerRemove(context.Background(), resp.ID, types.ContainerRemoveOptions{})
if err != nil {
return errwrap.Wrap(err, "unable to remove container")
}

io.Copy(os.Stdout, out)
defer out.Close()

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/traefik/paerser v0.2.0
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
Loading

0 comments on commit e56b8e6

Please sign in to comment.