-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sftp refactor #3073
Sftp refactor #3073
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ooesili, thanks for bearing with me for a review on this! Really awesome job man 🏆 This does make it easier to go through the code. I think I saw a few potential issues, but should be good otherwise.
I think @rockwotj also left a few comments in #3037. Please have a look and then I'm happy to merge both PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a couple of concurrency issues, I don't think either would actually be encountered in practice, but it's better to hold the locks for longer and be sure.
internal/impl/sftp/input.go
Outdated
s.stateLock.Lock() | ||
defer s.stateLock.Unlock() | ||
|
||
parts, codecAckFn, err := s.scanner.NextBatch(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's actually possible to be hit in practice but there's a race condition here, as the lock around s.scanner
is yielded within initScanner
before the lock is re-acquired, so there's opportunity for another goroutine to set s.scanner
to nil
in that gap. If you want to avoid this then have initScanner
return the scanner pointer and use that reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow good catch. I'll do this
return fmt.Errorf("creating scanner: %w", err) | ||
} | ||
|
||
s.stateLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-acquiring the lock here means there's a period of time where another goroutine could potentially have used the client to open a file, create a scanner and assign it, and now this goroutine is going to overwrite s.scanner
, which means it's lost. I think in terms of performance you should be fine holding the lock for the entire method call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem holding the lock for the entire call is that in watch mode it will hold the lock open while s.pathProvider.Next()
blocks and waits for a new file, which will cause AckFns to block because they need to grab the lock for a bit too to access s.pathProvider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job @ooesili! 🏆 Thanks for bearing with us on this review! The pool implementation looks like a great idea!
I left a small comment and please don't forget to add a note in the Changelog.
Feel free to merge if the stuff Ash mentioned is sorted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🎉
This commit reduces the scope of critical sections guarded by scannerMut to remove a deadlock that causes the last file to not be deleted when the SFTP input is used with watching enabled.
`(*watcherPathProvider).Next()` currently uses recursion to loop until a path is found. This commit refactors that function to use a for loop instead which is more straight forward to read.
This integration test makes sure that when `delete_on_finish` is true and watching is enabled that we delete every file.
Before this commit, when a file was exuasted the `ReadBatch` method returned ErrNotConnected which cause the engine to call `Connect` again. Aside from being awkward, this causes the connection status to incorrectly be reported as disconnected during normal operation. This commit moves the logic to advance to the next file when the current file is exhuasted into a the ReadBatch method.
The v2 suffix was added to some functions during the recent refactor and they were accidentally left in place.
ReadBatch was holding the state lock the while it polled for new files, which blocked AckFns from cleaning up successfully processed files when deleteOnFinish is set to true.
This prevents a race condition between two calls to ReadBatch clobbering each other.
9cf4382
to
0f2f6d0
Compare
Before this commit, when a file was exuasted the
ReadBatch
methodreturned ErrNotConnected which cause the engine to call
Connect
again.Aside from being awkward, this causes the connection status to
incorrectly be reported as disconnected during normal operation.
This commit moves the logic to advance to the next file when the current
file is exhuasted into a the ReadBatch method.
Builds on top of #3037