-
Notifications
You must be signed in to change notification settings - Fork 987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline store writes #3177
Pipeline store writes #3177
Conversation
We had agreed to do point 2 of #3084 (comment) where the instance manager keeps track of its subgraph pointer rather than querying the writable agent for it. Does that still sound like a good idea to you? |
It does, I just didn't want to hold up this PR by adding that in, too. It'll require returning subgraph pointer/cursor from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big brain PR 🧠
store/postgres/src/writable.rs
Outdated
// work since that has its own call hierarchy, and using the | ||
// foreground metrics will lead to incorrect nesting of sections | ||
let stopwatch = | ||
StopwatchMetrics::new(logger, queue.store.site.deployment.clone(), registry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will end up double counting on the unknown
section and in the total deployment_sync_secs
metric. A solution might be for StopwatchMetrics
to support adding a prefix to all metrics, so each pipeline stage would have its own prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it so that you can now add a 'stage' to StopwatchMetrics. LMK what you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A label is very smart!
Addressed all review comments |
store/postgres/src/writable.rs
Outdated
firehose_cursor, | ||
} => queue | ||
.store | ||
.revert_block_operations(block_ptr.clone(), firehose_cursor.as_deref()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this is called from a non-blocking tokio task, transact_block_operations
and revert_block_operations
should be asyncified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried asyncifying transact_block_operations
for a bit, but it requires that DeploymentStore.transact_block_operations
clone all its arguments to own them because they get put into a background task since Rust doesn't understand that the with_conn(|_, _| ...).await
that uses the arguments has to return before transact_block_operations
returns and wants references with a static lifetime.
Rather than copy large amounts of data, I think we should hold off on asyncifying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we should spawn the writer in a blocking task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't put the whole writer into a blocking task since that blocks the process from exiting (very annoying for tests); rather, I run the actual store operations as a blocking task now.
graph/src/util/bounded_queue.rs
Outdated
queue.iter().rev().fold(init, f) | ||
} | ||
|
||
pub async fn clear(&self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way we use it wouldn't manifest this, but in principle calling clear
and pop
concurrently could cause pop to panic, since clear
will empty the queue without first acquiring the permits. If it tried to acquire the permits first we'd have a different potential bug, where the permit count may change between it being checked and the permits actually being acquired, and then clear
could deadlock.
I'm thinking that BoundedQueue
shouldn't intrinsically support clear
, but rather the writer should call pop
in a loop until the queue is empty when clearing it. This way we can justify the assumption that neither pop
nor push
are being called concurrently to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a try_pop
(non-blocking pop
) and made clear
a wrapper around calling that in a loop; since there's no await
on that code path now, there shouldn't be any possibility for races.
Addressed all comments; I'll rebase after the review to make it not too hard to follow the convo. |
store/postgres/src/writable.rs
Outdated
let store = queue.store.cheap_clone(); | ||
let stopwatch = stopwatch.cheap_clone(); | ||
let res = | ||
graph::spawn_blocking( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming of the graph::task_spawn
methods is not very consistent, but you should use graph::spawn_blocking_allow_panic
here which takes a closure rather than this which takes a future.
Or if you'd rather abort on panic you'd need to add a new helper that takes an FnOnce
and does that, though catching the panic and propagating as an error seems like what we'd want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize that - I just updated the code; now, panics are turned into StoreError
which seems like a much better way to handle them anyway
44bbae5
to
68b71c7
Compare
Rebased to latest master |
4af3183
to
703294a
Compare
@leoyvens I made some changes to this PR since you reviewed it. Could you have a look at the last 4 commits? I think the other ones didn't change, they were just rebased. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it would be helpful to rename Request::Revert
to Request::RevertTo
.
store/postgres/src/writable.rs
Outdated
} | ||
|
||
/// Return `true` if a write at this block will be visible, i.e., not | ||
/// reverted by a previous queue entry | ||
fn visible(&self, block_ptr: &BlockPtr) -> bool { | ||
self.revert > block_ptr.number | ||
self.revert >= block_ptr.number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's just me but it feels like this is in Yoda order, and it would be more natural to write block_ptr.number <= self.revert
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah .. Yoda order :) Yes, reversing the comparison looks better
703294a
to
86a6e79
Compare
Added a commit with the two suggestions and rebased to latest master |
5204771
to
1f6b8ad
Compare
We will need to make sure that we do not see changes that may or may not have been written yet by the background writer for `get`, `get_many`, and `load_dynamic_data_sources`. We do this by passing in an explicit block constraint.
Pipelining/buffering of writes can be turned off by setting GRAPH_STORE_WRITE_QUEUE to 0
This avoids a potential deadlock in flushing the write queue
Rather than clearing the queue by removing entries in bulk, which could race against a pop at the same time, clear the queue by popping one entry at a time.
Tests wait for the queue to be empty to see the result of changes; the code previously emptied the queue before a possible error had been recorded, which would cause test failures.
Since we now call StopwatchMetrics::new twice for each deployment, we need to go through the global_ .. mechanisms in the registry. Otherwise, only one set of metrics gets actually kept.
1f6b8ad
to
764f5c1
Compare
This PR makes it possible to perform database writes in parallel with the rest of processing during indexing. The size of the write queue can be set with the environment variable
GRAPH_STORE_WRITE_QUEUE
which defaults to 5 write or revert operations. Setting this to 0 makes writing synchronously and reinstates the previous behavior, also bypassing a bunch of new code.