Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add metrics doc on barrier #8902

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/checkpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ RisingWave makes checkpointing via [Chandy–Lamport algorithm](https://en.wikip

![](./images/checkpoint/checkpoint.svg)

To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme. In particular, RisingWave periodically repeats the following procedure:
To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme.
In particular, RisingWave periodically (every `barrier_interval_ms`) repeats the following procedure:

1. The meta service initializes a barrier and broadcasts it to all source actors across the streaming engine.
2. The barrier messages go through every streaming operator (actors) in the streaming graph.
Expand Down
51 changes: 51 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Metrics

The contents of this document may be subject to frequent change.
It covers what each metric measures, and what information we may derive from it.

## Barrier Latency

Prerequisite: [Checkpoint](./checkpoint.md)

This metric measures the duration from which a barrier is injected into **all** sources in the stream graph,
to the barrier flown through all executors in the graph.

### What can we understand from it?

Usually when examining barrier latency, we look at **high barrier latency**.

There are two contributing factors to it:
1. Time taken to actually process the streaming messages.
2. Buffer capacity for streaming messages.

#### Processing costs

When injecting a new barrier,
there will usually be streaming messages in the stream graph (unless it's the initial barrier).
Since we keep total order for streaming messages,
this means that all streaming messages currently in the stream graph have to be processed
before the barrier can pass through.
If barrier latency is high, it could mean a long time is taken to process these streaming messages.
Concretely, here are some costs of processing streaming messages:
1. CPU cost of evaluating expressions.
2. I/O remote exchange between fragments.
3. Stateful Executor cache-miss (for instance hash-join and hash-agg). This results in extra costs to access state on s3.

#### Buffer capacities

Next, high barrier latency could also be caused by buffers in the graph.
Copy link
Member

@BugenZhao BugenZhao Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May give a hint that in this case we can tweak the buffer size through configuration file (developer-oriented and should be careful), and a heuristic algorithm is on the way. 🤤

Copy link
Contributor Author

@kwannoel kwannoel Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May give a hint that in this case we can tweak the buffer size through configuration file (developer),

Which options can configure this? So far I just see stream_connector_message_buffer_size, but that's just for connector source -> source executor.

and a heuristic algorithm is on the way. 🤤

Any discussions / issues / RFCs I can link to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which options can configure this? So far I just see stream_connector_message_buffer_size, but that's just for connector source -> source executor.

stream_exchange_initial_permits = 8192
stream_exchange_batched_permits = 1024

Any discussions / issues / RFCs I can link to?

I guess we can link to #8654.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, PTAL

If some downstream buffer is congested, we will be unable to queue and continue processing upstream messages.

For instance, if the channel in the exchange executor is full,
upstream messages cannot be sent through this channel.
This means the upstream executor will be unable to continue processing new stream messages, until some space on the buffer is freed.

The various buffer sizes can currently be adjusted via options in the developer configuration file.
For instance, options to configure buffer size of the exchange executor can be found [here](https://github.com/risingwavelabs/risingwave/blob/a36e01307d60491b91870ac5a37049a378fe986f/src/config/example.toml#L49-L50).

Another subtle cause is that large buffer size can also worsen barrier latency.
Suppose stream message processing is at its limit, and there's high latency as a result.
Typically, backpressure kicks in, the source is throttled.
If buffer sizes are too large, or if there are many buffers, there will not be backpressure applied to source immediately.
During this delay, we will continue to see high barrier latency.
A heuristic algorithm is on the way to deal with this: https://github.com/risingwavelabs/risingwave/issues/8654.