A C++ library for composing asynchronous continuations without locking or requiring dynamic heap allocations.
Callbacks are the most common approach to continue an asynchronous computation, but they are hard to compose, don't (often) support cancellation, and are generally tricky to reason about.
Futures/Promises are an approach that does support composition and cancellation, but many implementations have poor performance due to locking overhead and dynamic heap allocations, and because the evaluation model of most futures/promises libraries is eager an expression isn't referentially transparent.
Eventuals are an approach that, much like futures/promises, support composition and cancellation, however, are lazy. That is, an eventual has to be explicitly started.
Another key difference from futures/promises is that an eventual's continuation is not type-erased and can be directly used, saved, etc by the programmer. This allows the compiler to perform significant optimizations that are difficult to do with other lazy approaches that perform type-erasure. The tradeoff is that more code needs to be written in headers, which may lead to longer compile times. You can, however, mitgate long compile times by using a type-erasing Task
type (more on this later).
The library provides numerous abstractions that simplify asynchronous continuations, for example a Stream
for performing asynchronous streaming. Each of these abstractions follow a "builder" pattern for constructing them, see Usage below for more details.
This library was inspired from experience building and using the libprocess library to power Apache Mesos, which itself has been used at massive scale (production clusters of > 80k hosts).
Please reach out to [email protected]
for any questions or if you're looking for support.
Currently we only support Bazel and expect/use C++17 (some work could likely make this C++14).
You can build the library with:
$ bazel build :eventuals
...
You can build and run the tests with:
$ bazel test test:eventuals
...
macOS
- Download and install Visual Studio Code (VS Code).
- Run VS Code and install the necessary extensions:
- Bazel plugin. This extension provides support for Bazel in Visual Studio Code.
- C/C++ plugin. The C/C++ extension adds language support for C/C++ to Visual Studio Code, including features such as IntelliSense and debugging.
- Clang-format plugin. This extension allows you to comply with the clang format for your code. Read the plugin overview for configuration.
- CodeLLDB. This extension allows you to debug your code. Read the plugin overview for configuration.
- Install Bazel. Possible instructions for doing so using Homebrew:
- Check the presence of Bazel using the following command in your terminal:
$ bazel --version
- If you have no Bazel - install it using Homebrew.
$ brew install bazel3. Install the Bazel package via Homebrew as follows:
$ brew upgrade bazel4. Upgrade to a newer version of Bazel using the following command (if needed):
- Clone stout-eventuals.
- Open the stout-eventuals folder via VS Code.
- Check the checkboxes about "Trust the authors".
- VS Code -> Terminal -> New Terminal
Linux
- Download and install Visual Studio Code (VS Code).
- Run VS Code and install the necessary extensions:
- Bazel plugin. This extension provides support for Bazel in Visual Studio Code.
- C/C++ plugin. The C/C++ extension adds language support for C/C++ to Visual Studio Code, including features such as IntelliSense and debugging.
- Clang-format plugin. This extension allows you to comply with the clang format for your code. Read the plugin overview for configuration.
- CodeLLDB. This extension allows you to debug your code. Read the plugin overview for configuration.
- Install Bazel.
- Install the latest version of the compiler LLVM (LLVM Download Page).
- Install Git.
- Clone stout-eventuals.
- Open the stout-eventuals folder via VS Code.
- Check the checkboxes about "Trust the authors".
- VS Code -> Terminal -> New Terminal
Windows
- Download and install Visual Studio Code (VS Code).
- Run VS Code and install the necessary extensions:
- Bazel plugin. This extension provides support for Bazel in Visual Studio Code.
- C/C++ plugin. The C/C++ extension adds language support for C/C++ to Visual Studio Code, including features such as IntelliSense and debugging.
- Clang-format plugin. This extension allows you to comply with the clang format for your code. Read the plugin overview for configuration.
Possible instuctions for how you can use Visual Studio's
clang-format
:- Create a folder
.vscode
in your project folder. - Create a file
settings.json
in the folder.vscode
- Add the data to the file (check the path to your
clang-format.exe
):
{ "clang-format.style": "Google", "clang-format.executable": "C:/Program Files (x86)/Microsoft Visual Studio/2019/ Community/VC/Tools/Llvm/x64/bin/clang-format.exe", "editor.formatOnSave": true }
- Create a folder
- CodeLLDB. This extension allows you to debug your code. Read the plugin overview for configuration.
- Install Bazel. Detailed installation instructions for Windows can be found here: Installing Bazel on Windows. This is an important step. You must follow all the instructions, otherwise you will get various errors at the compilation stage.
- Install the latest version of the compiler LLVM (LLVM Download Page).
- Install Git.
- Restart your PC. ;-)
- Clone stout-eventuals.
- Open the stout-eventuals folder via VS Code.
- Check the checkboxes about "Trust the authors".
- VS Code -> Terminal -> New Terminal
Add the following to your WORKSPACE
(or WORKSPACE.bazel
):
git_repository(
name = "stout-eventuals",
remote = "https://github.com/3rdparty/stout-eventuals",
commit = "579b62a16da74a4e197c96b39b3ecca39c00452f",
shallow_since = "1624126303 -0700",
)
You can then depend on @stout-eventuals//:eventuals
in your Bazel targets.
An Eventual
provides explicit control of performing a simple asynchronous computation. First you "build" an eventual by specifying the type of the value that it will eventually compute and override the .start()
callback for performing the computation:
auto e = Eventual<Result>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
In addition to .start()
you can also override .fail()
and .stop()
callbacks:
auto e = Eventual<Result>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
.fail([](auto& k, auto&&... errors) {
// Handle raised errors.
})
.stop([](auto& k) {
// Handle stopped computation.
})
Each callback takes the continuation k
which you can use to either succeed(k, result)
the computation, fail(k, error)
the computation, or stop(k)
the computation.
You can also override the .context()
callback which allows you to "capture" data that you can use in each other callback:
auto e = Eventual<Result>()
.context(SomeData())
.start([](auto& data, auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
.fail([](auto& data, auto& k, auto&&... errors) {
// Handle raised errors.
})
.stop([](auto& data, auto& k) {
// Handle stopped computation.
})
In many cases you can simply capture what you need in an individual callback, but sometimes you may want to use some data across callbacks.
You can use the *
operator to run the asynchronous computation:
auto result = *e;
But this blocks the current thread and except in tests is not (likely) what you want to do. Instead, to use the eventually computed value you want to create a "pipeline" of eventuals using the |
operator:
auto e2 = e
| Eventual<std::string>()
.start([](auto& k, auto&& result) {
// Use result, either synchronously or asynchronously.
succeed(k, result.ToString());
});
And when you're all done add a Terminal
:
auto e2 = e
| Eventual<T>()
.start([](auto& k, auto&& result) {
// Use result, either synchronously or asynchronously.
succeed(k, use(result));
})
| Terminal()
.start([](auto&& result) {
// Eventual pipeline succeeded!
})
.fail([](auto&& result) {
// Eventual pipeline failed!
})
.stop([](auto&& result) {
// Eventual pipeline stopped!
});
Then you can explicitly start the eventual:
start(e);
Note that once you start an eventual it must exist and can not be moved until after it has terminated.
Sometimes after you've started an eventual you'll want to cancel or stop it. You can do so by interrupting it. By default an eventual is not interruptible, but you can override the .interrupt()
handler if you want to make your eventual interruptible:
auto e = Eventual<Result>()
.start([](auto& k) {
// Perform some asynchronous computation ...
})
.interrupt([](auto& k) {
// Try and interrupt the asynchronous computation.
});
Then you can register an interrupt and trigger the interrupt like so:
auto e = Eventual<Result>()
.start([](auto& k) {
// Perform some asynchronous computation ...
})
.interrupt([](auto& k) {
// Try and interrupt the asynchronous computation.
})
| (Terminal()
.start([](auto&& result) {
// Eventual pipeline succeeded!
})
.fail([](auto&& result) {
// Eventual pipeline failed!
})
.stop([](auto&& result) {
// Eventual pipeline stopped!
}));
Interrupt interrupt;
e.Register(interrupt);
start(e);
interrupt.Trigger();
Note that there may be other reasons why you want to "interrupt" an eventual, so rather than call this functionality explicitly "cancel", we chose the more broad "interrupt". When creating a general abstraction, however, error on the side of assuming that interrupt means cancel.
Sometimes your continuation is synchronous, i.e., it won't block the current thread. While you can still use an Eventual
you can simplify by using a Lambda
:
auto e = Eventual<T>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
| Lambda([](auto&& result) {
// Return some value ***synchronously**.
return stringify(result);
});
In many cases you can be even more implicit and just use a C++ lambda directly too:
auto e = Eventual<T>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
| [](auto&& result) {
// Return some value ***synchronously**.
return stringify(result);
};
When your continuation is asynchronous (i.e., you need to create another eventual based on the result of an eventual) but you don't need the explicit control that you have with an Eventual
you can use Then
:
auto e = Eventual<T>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
| Then([](auto&& result) {
// Return an eventual that will automatically get started.
return SomeAsynchronousComputation(result);
};
You can inject a value into an eventual pipeline using Just
. This can be useful when you don't care about the result of another eventual as well as with Conditional
.
auto e = Eventual<T>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
| Just("value");
While Just
is for continuing a pipeline "successfully", Raise
can be used to trigger the failure path. Again, this becomes very useful with constructs like Conditional
amongst others.
auto e = Raise(Error());
Sometimes how you want to asynchronously continue depends on some value computed asynchrhonously. You can use 'Conditional' to capture this pattern, e.g., an asynchronous "if else" statement:
auto e = Just(1)
| Conditional(
[](int n) {
return n < 0;
},
[](int n) {
return HandleLessThan(n);
},
[](int n) {
return HandleGreaterThanOrEqual(n);
});
Synchronization is just as necessary with asynchronous code as with synchronous code, except you can't use existing abstractions like std::mutex
because these are blocking! Instead, you need to use asynchronous aware versions:
Lock lock;
auto e = Acquire(&lock)
| Eventual<T>()
.start([](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation ...
auto result = ...;
succeed(k, result);
});
thread.detach();
})
| Release(&lock);
This is often used when capturing this
to use as part of some asynchronous computation. To simplify this common pattern you can actually extend your classes with Synchronizable
and then do:
auto e = Synchronized(
Eventual<T>()
.start([this](auto& k) {
auto thread = std::thread(
[&k]() mutable {
// Perform some asynchronous computation using `this`.
auto result = ...;
succeed(k, result);
});
thread.detach();
}));
Sometimes you need to "wait" for a specific condition to become true while holding on to the lock. You can do that using Wait
:
auto e = Synchronized(
Wait<std::string>() // NOTE: need to specify `&lock` when not using `Synchronized()`.
.condition([this](auto& k) {
if (...) {
auto callback = [&k]() { notify(k); };
// Save 'callback' in order to recheck the condition.
wait(k);
} else {
succeed(k, ...);
}
}));
You can use Stream
to "return" multiple values asynchronously. Instead of using succeed(),
fail(), and
stop()as we've already seen, "streams" use
emit()and
ended()` which emit a value on the stream and signify that there are no more values, respectively.
You "convert" a stream back into an eventual with a Loop
and use next()
, done()
to request the next value on the stream or tell the stream that you don't want any more values, respectively.
By default streams are not buffered so as to be able to provide explicit flow control and back pressure. Here's a simple stream and loop:
auto e = Stream<int>()
.context(5)
.next([](auto& count, auto& k) {
if (count-- > 0) {
emit(k, count);
} else {
ended(k);
}
})
| (Loop<int>()
.context(0)
.body([](auto& sum, auto& stream, auto&& value) {
sum += value;
next(stream);
})
.ended([](auto& sum, auto& k) {
succeed(k, sum);
}));
You can construct a stream out of repeated asynchronous computations using Repeat
:
auto e = Repeat(Asynchronous());
Repeat
acts just like Stream
where you can continue it with a Loop
that calls next()
and done()
. By default a Repeat
will repeat forever (i.e., for every call to next()
) but you can override the default behavior during construction:
auto e = Repeat(Then([](int n) { return Asynchronous(n); }))
.context(5) // Only repeat 5 times.
.next([](auto& count, auto& k) {
if (count-- > 0) {
repeat(k, count);
} else {
ended(k);
}
});
Often times you'll want to perform some transformations on your stream. You can do that explicitly with Transform
, or implicitly with Map
. Here's an example of doing a "map reduce":
auto e = Stream<int>()
.context(5)
.next([](auto& count, auto& k) {
if (count-- > 0) {
emit(k, count);
} else {
ended(k);
}
})
| Map(Eventual<int>()
.start([](auto& k, auto&& i) {
succeed(k, i + 1);
}))
| (Loop<int>()
.context(0)
.body([](auto& sum, auto& stream, auto&& value) {
sum += value;
next(stream);
})
.ended([](auto& sum, auto& k) {
succeed(k, sum);
}));
Sometimes you'll have an infinite stream. You can loop over it infinitely by using Loop()
:
auto e = SomeInfiniteStream()
| Map(Then([](auto&& i) { return Foo(i); }))
| Loop(); // Infinitely loop.
You can use a Task
to type-erase your continuation or pipeline. Currently this performs dynamic heap allocation but in the future we'll likely provide a SizedTask
version that lets you specify the size such that you can type-erase without requiring dynamic heap allocation. Note however, that Task
requires a callable/lambda in order to delay the dynamic heap allocation until the task is started so that the current scheduler has a chance of optimizing the allocation based on the current execution resource being used (e.g., allocating from the local NUMA node for the current thread).
Task<int> task = []() { return Asynchronous(); };
You can compose a Task
just like any other eventual as well:
auto e = Task<int>([]() { return Asynchronous(); })
| [](int i) {
return stringify(i);
};
A Task
needs to be terminated just like any other eventual unless the callable/lambda passed to Task
is terminted. In tests you can use *
to add a terminal for you, but again, be careful as this blocks the current thread!
string s = *e; // BLOCKING! Only use in tests.
... to be completed ...