Skip to content

Commit

Permalink
Sftp refactor (#3073)
Browse files Browse the repository at this point in the history
* fix(sftp): fix deadlock so last file is deleted

This commit reduces the scope of critical sections guarded by scannerMut
to remove a deadlock that causes the last file to not be deleted when
the SFTP input is used with watching enabled.

* refactor(sftp): use for loop in watcher provider

`(*watcherPathProvider).Next()` currently uses recursion to loop until a
path is found. This commit refactors that function to use a for loop
instead which is more straight forward to read.

* test(sftp): add test for delete-on-finish bug

This integration test makes sure that when `delete_on_finish` is true
and watching is enabled that we delete every file.

* refactor(sftp): advance to next file in ReadBatch

Before this commit, when a file was exuasted the `ReadBatch` method
returned ErrNotConnected which cause the engine to call `Connect` again.
Aside from being awkward, this causes the connection status to
incorrectly be reported as disconnected during normal operation.

This commit moves the logic to advance to the next file when the current
file is exhuasted into a the ReadBatch method.

* fix(sftp): reduce critical sections of mutexes

ReadBatch was holding the state lock the while it polled for new files,
which blocked AckFns from cleaning up successfully processed files when
deleteOnFinish is set to true.

* refactor(sftp): create clientPool to manage connection status

* fix(sftp): return scanner from (*sftpReader).initScanner

This prevents a race condition between two calls to ReadBatch clobbering
each other.
  • Loading branch information
ooesili authored Jan 14, 2025
1 parent e753db4 commit 8b3ab71
Show file tree
Hide file tree
Showing 4 changed files with 427 additions and 177 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ All notable changes to this project will be documented in this file.
- Fix an issue in `aws_sqs` with refreshing in-flight message leases which could prevent acks from processed. (@rockwotj)
- Fix an issue with `postgres_cdc` with TOAST values not being propagated with `REPLICA IDENTITY FULL`. (@rockwotj)
- Fix a initial snapshot streaming consistency issue with `postgres_cdc`. (@rockwotj)
- Fix bug in `sftp` input where the last file was not deleted when `watcher` and `delete_on_finish` were enabled (@ooesili)

## 4.44.0 - 2024-12-13

Expand Down
126 changes: 126 additions & 0 deletions internal/impl/sftp/client_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sftp

import (
"errors"
"io/fs"
"sync"

"github.com/pkg/sftp"
)

func newClientPool(newClient func() (*sftp.Client, error)) (*clientPool, error) {
client, err := newClient()
if err != nil {
return nil, err
}
return &clientPool{
newClient: newClient,
client: client,
}, nil
}

type clientPool struct {
newClient func() (*sftp.Client, error)

lock sync.Mutex
client *sftp.Client
closed bool
}

func (c *clientPool) Open(path string) (*sftp.File, error) {
return clientPoolDoReturning(c, func(client *sftp.Client) (*sftp.File, error) {
return client.Open(path)
})
}

func (c *clientPool) Glob(path string) ([]string, error) {
return clientPoolDoReturning(c, func(client *sftp.Client) ([]string, error) {
return client.Glob(path)
})
}

func (c *clientPool) Stat(path string) (fs.FileInfo, error) {
return clientPoolDoReturning(c, func(client *sftp.Client) (fs.FileInfo, error) {
return client.Stat(path)
})
}

func (c *clientPool) Remove(path string) error {
return clientPoolDo(c, func(client *sftp.Client) error {
return client.Remove(path)
})
}

func (c *clientPool) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.closed {
return nil
}
c.closed = true

if c.client != nil {
err := c.client.Close()
c.client = nil
return err
}
return nil
}

func clientPoolDo(c *clientPool, fn func(*sftp.Client) error) error {
_, err := clientPoolDoReturning(c, func(client *sftp.Client) (struct{}, error) {
err := fn(client)
return struct{}{}, err
})
return err
}

func clientPoolDoReturning[T any](c *clientPool, fn func(*sftp.Client) (T, error)) (T, error) {
c.lock.Lock()
defer c.lock.Unlock()

var zero T

// In the case that the clientPool is used from an AckFn after the input is
// closed, we create temporary client to fulfil the operation, then
// immediately close it.
if c.closed {
client, err := c.newClient()
if err != nil {
return zero, err
}
result, err := fn(client)
_ = client.Close()
return result, err
}

if c.client == nil {
client, err := c.newClient()
if err != nil {
return zero, err
}
c.client = client
}

result, err := fn(c.client)
if errors.Is(err, sftp.ErrSSHFxConnectionLost) {
_ = c.client.Close()
c.client = nil
}
return result, err
}
Loading

0 comments on commit 8b3ab71

Please sign in to comment.