Skip to content

Commit

Permalink
Merge pull request #12382 from roosterfish/lock_err
Browse files Browse the repository at this point in the history
Return error from locking.Lock
  • Loading branch information
tomponline authored Oct 16, 2023
2 parents f0d733c + d481a78 commit f134f11
Show file tree
Hide file tree
Showing 21 changed files with 296 additions and 82 deletions.
6 changes: 3 additions & 3 deletions lxd/api_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func metricsGet(d *Daemon, r *http.Request) response.Response {
lockCtx, lockCtxCancel := context.WithTimeout(r.Context(), cacheDuration)
defer lockCtxCancel()

unlock := locking.Lock(lockCtx, "metricsGet")
if unlock == nil {
return response.SmartError(api.StatusErrorf(http.StatusLocked, "Metrics are currently being built by another request"))
unlock, err := locking.Lock(lockCtx, "metricsGet")
if err != nil {
return response.SmartError(api.StatusErrorf(http.StatusLocked, "Metrics are currently being built by another request: %s", err))
}

defer unlock()
Expand Down
8 changes: 6 additions & 2 deletions lxd/daemon_images.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type ImageDownloadArgs struct {
}

// imageOperationLock acquires a lock for operating on an image and returns the unlock function.
func imageOperationLock(fingerprint string) locking.UnlockFunc {
func imageOperationLock(fingerprint string) (locking.UnlockFunc, error) {
l := logger.AddContext(logger.Ctx{"fingerprint": fingerprint})
l.Debug("Acquiring lock for image")
defer l.Debug("Lock acquired for image")
Expand Down Expand Up @@ -124,7 +124,11 @@ func ImageDownload(r *http.Request, s *state.State, op *operations.Operation, ar
}

// Ensure we are the only ones operating on this image.
unlock := imageOperationLock(fp)
unlock, err := imageOperationLock(fp)
if err != nil {
return nil, err
}

defer unlock()

// If auto-update is on and we're being given the image by
Expand Down
6 changes: 5 additions & 1 deletion lxd/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -2476,7 +2476,11 @@ func imageDelete(d *Daemon, r *http.Request) response.Response {
do := func(op *operations.Operation) error {
// Lock this operation to ensure that concurrent image operations don't conflict.
// Other operations will wait for this one to finish.
unlock := imageOperationLock(imgInfo.Fingerprint)
unlock, err := imageOperationLock(imgInfo.Fingerprint)
if err != nil {
return err
}

defer unlock()

// Check image still exists and another request hasn't removed it since we resolved the image
Expand Down
8 changes: 6 additions & 2 deletions lxd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func ensureImageIsLocallyAvailable(s *state.State, r *http.Request, img *api.Ima
// time may also arrive at the conclusion that the image doesn't exist on this cluster member and then
// think it needs to download the image and store the record in the database as well, which will lead to
// duplicate record errors.
unlock := imageOperationLock(img.Fingerprint)
unlock, err := imageOperationLock(img.Fingerprint)
if err != nil {
return err
}

defer unlock()

memberAddress, err := s.DB.Cluster.LocateImage(img.Fingerprint)
Expand Down Expand Up @@ -733,7 +737,7 @@ func getSourceImageFromInstanceSource(ctx context.Context, s *state.State, tx *d
}

// instanceOperationLock acquires a lock for operating on an instance and returns the unlock function.
func instanceOperationLock(ctx context.Context, projectName string, instanceName string) locking.UnlockFunc {
func instanceOperationLock(ctx context.Context, projectName string, instanceName string) (locking.UnlockFunc, error) {
l := logger.AddContext(logger.Ctx{"project": projectName, "instance": instanceName})
l.Debug("Acquiring lock for instance")
defer l.Debug("Lock acquired for instance")
Expand Down
2 changes: 1 addition & 1 deletion lxd/instance/drivers/driver_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func (d *common) devicesRemove(inst instance.Instance) {
}

// updateBackupFileLock acquires the update backup file lock that protects concurrent access to actions that will call UpdateBackupFile() as part of their operation.
func (d *common) updateBackupFileLock(ctx context.Context) locking.UnlockFunc {
func (d *common) updateBackupFileLock(ctx context.Context) (locking.UnlockFunc, error) {
parentName, _, _ := api.GetParentAndSnapshotName(d.Name())
return locking.Lock(ctx, fmt.Sprintf("instance_updatebackupfile_%s_%s", d.Project().Name, parentName))
}
Expand Down
58 changes: 47 additions & 11 deletions lxd/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2351,7 +2351,11 @@ func (d *lxc) detachInterfaceRename(netns string, ifName string, hostName string

// Start starts the instance.
func (d *lxc) Start(stateful bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

d.logger.Debug("Start started", logger.Ctx{"stateful": stateful})
Expand All @@ -2360,7 +2364,7 @@ func (d *lxc) Start(stateful bool) error {
// Check that we are startable before creating an operation lock.
// Must happen before creating operation Start lock to avoid the status check returning Stopped due to the
// existence of a Start operation lock.
err := d.validateStartup(stateful, d.statusCode())
err = d.validateStartup(stateful, d.statusCode())
if err != nil {
return err
}
Expand Down Expand Up @@ -3445,7 +3449,11 @@ func (d *lxc) snapshot(name string, expiry time.Time, stateful bool) error {

// Snapshot takes a new snapshot.
func (d *lxc) Snapshot(name string, expiry time.Time, stateful bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

return d.snapshot(name, expiry, stateful)
Expand Down Expand Up @@ -3668,7 +3676,11 @@ func (d *lxc) cleanup() {

// Delete deletes the instance.
func (d *lxc) Delete(force bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

// Setup a new operation.
Expand Down Expand Up @@ -3819,7 +3831,11 @@ func (d *lxc) delete(force bool) error {

// Rename renames the instance. Accepts an argument to enable applying deferred TemplateTriggerRename.
func (d *lxc) Rename(newName string, applyTemplateTrigger bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

oldName := d.Name()
Expand All @@ -3832,7 +3848,7 @@ func (d *lxc) Rename(newName string, applyTemplateTrigger bool) error {
d.logger.Info("Renaming instance", ctxMap)

// Quick checks.
err := instance.ValidName(newName, d.IsSnapshot())
err = instance.ValidName(newName, d.IsSnapshot())
if err != nil {
return err
}
Expand Down Expand Up @@ -4011,7 +4027,11 @@ func (d *lxc) CGroupSet(key string, value string) error {

// Update applies updated config.
func (d *lxc) Update(args db.InstanceArgs, userRequested bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

// Setup a new operation
Expand Down Expand Up @@ -6672,11 +6692,15 @@ func (d *lxc) inheritInitPidFd() (int, *os.File) {
// FileSFTPConn returns a connection to the forkfile handler.
func (d *lxc) FileSFTPConn() (net.Conn, error) {
// Lock to avoid concurrent spawning.
spawnUnlock := locking.Lock(context.TODO(), fmt.Sprintf("forkfile_%d", d.id))
spawnUnlock, err := locking.Lock(context.TODO(), fmt.Sprintf("forkfile_%d", d.id))
if err != nil {
return nil, err
}

defer spawnUnlock()

// Create any missing directories in case the instance has never been started before.
err := os.MkdirAll(d.LogPath(), 0700)
err = os.MkdirAll(d.LogPath(), 0700)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -6733,7 +6757,12 @@ func (d *lxc) FileSFTPConn() (net.Conn, error) {
chReady := make(chan error)
go func() {
// Lock to avoid concurrent running forkfile.
runUnlock := locking.Lock(context.TODO(), d.forkfileRunningLockName())
runUnlock, err := locking.Lock(context.TODO(), d.forkfileRunningLockName())
if err != nil {
chReady <- err
return
}

defer runUnlock()

// Mount the filesystem if needed.
Expand Down Expand Up @@ -6904,7 +6933,14 @@ func (d *lxc) FileSFTP() (*sftp.Client, error) {
func (d *lxc) stopForkfile(force bool) {
// Make sure that when the function exits, no forkfile is running by acquiring the lock (which indicates
// that forkfile isn't running and holding the lock) and then releasing it.
defer func() { locking.Lock(context.TODO(), d.forkfileRunningLockName())() }()
defer func() {
unlock, err := locking.Lock(context.TODO(), d.forkfileRunningLockName())
if err != nil {
return
}

unlock()
}()

content, err := os.ReadFile(filepath.Join(d.LogPath(), "forkfile.pid"))
if err != nil {
Expand Down
32 changes: 26 additions & 6 deletions lxd/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,11 @@ func (d *qemu) validateStartup(stateful bool, statusCode api.StatusCode) error {

// Start starts the instance.
func (d *qemu) Start(stateful bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

return d.start(stateful, nil)
Expand Down Expand Up @@ -4588,7 +4592,11 @@ func (d *qemu) snapshot(name string, expiry time.Time, stateful bool) error {

// Snapshot takes a new snapshot.
func (d *qemu) Snapshot(name string, expiry time.Time, stateful bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

return d.snapshot(name, expiry, stateful)
Expand Down Expand Up @@ -4716,7 +4724,11 @@ func (d *qemu) Restore(source instance.Instance, stateful bool) error {

// Rename the instance. Accepts an argument to enable applying deferred TemplateTriggerRename.
func (d *qemu) Rename(newName string, applyTemplateTrigger bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

oldName := d.Name()
Expand All @@ -4729,7 +4741,7 @@ func (d *qemu) Rename(newName string, applyTemplateTrigger bool) error {
d.logger.Info("Renaming instance", ctxMap)

// Quick checks.
err := instance.ValidName(newName, d.IsSnapshot())
err = instance.ValidName(newName, d.IsSnapshot())
if err != nil {
return err
}
Expand Down Expand Up @@ -4883,7 +4895,11 @@ func (d *qemu) Rename(newName string, applyTemplateTrigger bool) error {

// Update the instance config.
func (d *qemu) Update(args db.InstanceArgs, userRequested bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

// Setup a new operation.
Expand Down Expand Up @@ -5563,7 +5579,11 @@ func (d *qemu) init() error {

// Delete the instance.
func (d *qemu) Delete(force bool) error {
unlock := d.updateBackupFileLock(context.Background())
unlock, err := d.updateBackupFileLock(context.Background())
if err != nil {
return err
}

defer unlock()

// Setup a new operation.
Expand Down
6 changes: 5 additions & 1 deletion lxd/instance_patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func instancePatch(d *Daemon, r *http.Request) response.Response {
return resp
}

unlock := instanceOperationLock(s.ShutdownCtx, projectName, name)
unlock, err := instanceOperationLock(s.ShutdownCtx, projectName, name)
if err != nil {
return response.SmartError(err)
}

defer unlock()

c, err := instance.LoadByProjectAndName(s, projectName, name)
Expand Down
6 changes: 5 additions & 1 deletion lxd/instance_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func instancePut(d *Daemon, r *http.Request) response.Response {
revert := revert.New()
defer revert.Fail()

unlock := instanceOperationLock(s.ShutdownCtx, projectName, name)
unlock, err := instanceOperationLock(s.ShutdownCtx, projectName, name)
if err != nil {
return response.SmartError(err)
}

revert.Add(func() {
unlock()
})
Expand Down
7 changes: 4 additions & 3 deletions lxd/locking/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package locking

import (
"context"
"fmt"
"sync"
)

Expand All @@ -21,7 +22,7 @@ type UnlockFunc func()
// Will block until the lock is established or the context is cancelled.
// On successfully acquiring the lock, it returns an unlock function which needs to be called to unlock the lock.
// If the context is canceled then nil will be returned.
func Lock(ctx context.Context, lockName string) UnlockFunc {
func Lock(ctx context.Context, lockName string) (UnlockFunc, error) {
for {
// Get exclusive access to the map and see if there is already an operation ongoing.
locksMutex.Lock()
Expand Down Expand Up @@ -53,7 +54,7 @@ func Lock(ctx context.Context, lockName string) UnlockFunc {
// map entry has been deleted, this will allow any waiting users
// to try and get access to the map to create a new operation.
locksMutex.Unlock()
}
}, nil
}

// An existing operation is ongoing, lets wait for that to finish and then try
Expand All @@ -64,7 +65,7 @@ func Lock(ctx context.Context, lockName string) UnlockFunc {
case <-waitCh:
continue
case <-ctx.Done():
return nil
return nil, fmt.Errorf("Failed to obtain lock %q: %w", lockName, ctx.Err())
}
}
}
12 changes: 10 additions & 2 deletions lxd/network/driver_ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,11 @@ func (n *ovn) startUplinkPort() error {

// Lock uplink network so that if multiple OVN networks are trying to connect to the same uplink we don't
// race each other setting up the connection.
unlock := locking.Lock(context.TODO(), n.uplinkOperationLockName(uplinkNet))
unlock, err := locking.Lock(context.TODO(), n.uplinkOperationLockName(uplinkNet))
if err != nil {
return err
}

defer unlock()

switch uplinkNet.Type() {
Expand Down Expand Up @@ -1479,7 +1483,11 @@ func (n *ovn) deleteUplinkPort() error {
}

// Lock uplink network so we don't race each other networks using the OVS uplink bridge.
unlock := locking.Lock(context.TODO(), n.uplinkOperationLockName(uplinkNet))
unlock, err := locking.Lock(context.TODO(), n.uplinkOperationLockName(uplinkNet))
if err != nil {
return err
}

defer unlock()

switch uplinkNet.Type() {
Expand Down
Loading

0 comments on commit f134f11

Please sign in to comment.