Skip to content

Commit

Permalink
Implementation of async context tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jan 11, 2023
1 parent e064a8b commit 97d9501
Show file tree
Hide file tree
Showing 16 changed files with 1,197 additions and 55 deletions.
35 changes: 35 additions & 0 deletions samples/async-context/config.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Imports the base schema for workerd configuration files.

# Refer to the comments in /src/workerd/server/workerd.capnp for more details.

using Workerd = import "/workerd/workerd.capnp";

# A constant of type Workerd.Config defines the top-level configuration for an
# instance of the workerd runtime. A single config file can contain multiple
# Workerd.Config definitions and must have at least one.
const helloWorldExample :Workerd.Config = (

# Every workerd instance consists of a set of named services. A worker, for instance,
# is a type of service. Other types of services can include external servers, the
# ability to talk to a network, or accessing a disk directory. Here we create a single
# worker service. The configuration details for the worker are defined below.
services = [ (name = "main", worker = .helloWorld) ],

# Every configuration defines the one or more sockets on which the server will listene.
# Here, we create a single socket that will listen on localhost port 8080, and will
# dispatch to the "main" service that we defined above.
sockets = [ ( name = "http", address = "*:8080", http = (), service = "main" ) ]
);

# The definition of the actual helloWorld worker exposed using the "main" service.
# In this example the worker is implemented as a single simple script (see worker.js).
# The compatibilityDate is required. For more details on compatibility dates see:
# https://developers.cloudflare.com/workers/platform/compatibility-dates/

const helloWorld :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "worker.js")
],
compatibilityDate = "2022-11-08",
compatibilityFlags = ["nodejs_18_compat_experimental"]
);
23 changes: 23 additions & 0 deletions samples/async-context/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { default as async_hooks } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = async_hooks;

const als = new AsyncLocalStorage();

export default {
async fetch(request) {
const differentScope = als.run(123, () => AsyncResource.bind(() => {
console.log(als.getStore());
}));

return als.run("Hello World", async () => {

// differentScope is attached to a different async context, so
// it will see a different value for als.getStore() (123)
setTimeout(differentScope, 5);

// Some simulated async delay.
await scheduler.wait(10);
return new Response(als.getStore()); // "Hello World"
});
}
};
107 changes: 73 additions & 34 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <kj/encoding.h>

#include <workerd/api/system-streams.h>
#include <workerd/jsg/async-context.h>
#include <workerd/jsg/ser.h>
#include <workerd/jsg/util.h>
#include <workerd/io/trace.h>
Expand Down Expand Up @@ -96,6 +97,24 @@ void ExecutionContext::passThroughOnException() {
IoContext::current().setFailOpen();
}

ServiceWorkerGlobalScope::ServiceWorkerGlobalScope(v8::Isolate* isolate)
: unhandledRejections(
[this](jsg::Lock& js,
v8::PromiseRejectEvent event,
jsg::V8Ref<v8::Promise> promise,
jsg::Value value) {
// If async context tracking is enabled, then we need to ensure that we enter the frame
// associated with the promise before we invoke the unhandled rejection callback handling.
kj::Maybe<jsg::AsyncContextFrame::Scope> maybeScope;
KJ_IF_MAYBE(context, jsg::AsyncContextFrame::tryGetContext(js, promise)) {
if (!context->isRoot(js)) {
maybeScope.emplace(js, jsg::AsyncContextFrame::tryGetContext(js, promise));
}
}
auto ev = jsg::alloc<PromiseRejectionEvent>(event, kj::mv(promise), kj::mv(value));
dispatchEventImpl(js, kj::mv(ev));
}) {}

void ServiceWorkerGlobalScope::clear() {
removeAllHandlers();
unhandledRejections.clear();
Expand Down Expand Up @@ -478,8 +497,10 @@ v8::Local<v8::String> ServiceWorkerGlobalScope::atob(kj::String data, v8::Isolat
return jsg::v8StrFromLatin1(isolate, decoded.asBytes());
}

void ServiceWorkerGlobalScope::queueMicrotask(v8::Local<v8::Function> task, v8::Isolate* isolate) {
isolate->EnqueueMicrotask(task);
void ServiceWorkerGlobalScope::queueMicrotask(
jsg::Lock& js,
v8::Local<v8::Function> task) {
js.v8Isolate->EnqueueMicrotask(jsg::AsyncContextFrame::wrap(js, task, nullptr, nullptr));
}

v8::Local<v8::Value> ServiceWorkerGlobalScope::structuredClone(
Expand Down Expand Up @@ -508,27 +529,36 @@ TimeoutId::NumberType ServiceWorkerGlobalScope::setTimeoutInternal(
}

TimeoutId::NumberType ServiceWorkerGlobalScope::setTimeout(
jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate) {
jsg::Varargs args) {
auto& context = jsg::AsyncContextFrame::current(js);
if (!context.isRoot(js)) {
// If the AsyncContextFrame is the root frame, we do not have to wrap it at all.
// This is because setInterval/setTimeout callbacks are always invoked by the
// system. If there is no AsyncContextFrame::Scope on the stack, it will always
// use the root frame.
function = js.v8Ref(jsg::AsyncContextFrame::current(js).wrap(
js, function.getHandle(js), nullptr, nullptr));
}
auto argv = kj::heapArrayFromIterable<jsg::Value>(kj::mv(args));
auto timeoutId = IoContext::current().setTimeoutImpl(
timeoutIdGenerator,
/* repeats = */ false,
[function = function.addRef(isolate), argv = kj::mv(argv)](jsg::Lock& js) mutable {
auto isolate = js.v8Isolate;
auto context = isolate->GetCurrentContext();
auto localFunction = function.getHandle(isolate);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(isolate);
};
auto argc = localArgs.size();
timeoutIdGenerator,
/* repeats = */ false,
[function = function.addRef(js),
argv = kj::mv(argv)]
(jsg::Lock& js) mutable {
auto context = js.v8Isolate->GetCurrentContext();
auto localFunction = function.getHandle(js);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(js);
};
auto argc = localArgs.size();

// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
},
msDelay.orDefault(0));
// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
}, msDelay.orDefault(0));
return timeoutId.toNumber();
}

Expand All @@ -539,27 +569,36 @@ void ServiceWorkerGlobalScope::clearTimeout(kj::Maybe<TimeoutId::NumberType> tim
}

TimeoutId::NumberType ServiceWorkerGlobalScope::setInterval(
jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate) {
jsg::Varargs args) {
auto& context = jsg::AsyncContextFrame::current(js);
if (!context.isRoot(js)) {
// If the AsyncContextFrame is the root frame, we do not have to wrap it at all.
// This is because setInterval/setTimeout callbacks are always invoked by the
// system. If there is no AsyncContextFrame::Scope on the stack, it will always
// use the root frame.
function = js.v8Ref(jsg::AsyncContextFrame::current(js).wrap(
js, function.getHandle(js), nullptr, nullptr));
}
auto argv = kj::heapArrayFromIterable<jsg::Value>(kj::mv(args));
auto timeoutId = IoContext::current().setTimeoutImpl(
timeoutIdGenerator,
/* repeats = */ true,
[function = function.addRef(isolate), argv = kj::mv(argv)](jsg::Lock& js) mutable {
auto isolate = js.v8Isolate;
auto context = isolate->GetCurrentContext();
auto localFunction = function.getHandle(isolate);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(isolate);
};
auto argc = localArgs.size();
timeoutIdGenerator,
/* repeats = */ true,
[function = function.addRef(js),
argv = kj::mv(argv)]
(jsg::Lock& js) mutable {
auto context = js.v8Isolate->GetCurrentContext();
auto localFunction = function.getHandle(js);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(js);
};
auto argc = localArgs.size();

// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
},
msDelay.orDefault(0));
// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
}, msDelay.orDefault(0));
return timeoutId.toNumber();
}

Expand Down
30 changes: 10 additions & 20 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
// Global object API exposed to JavaScript.

public:
ServiceWorkerGlobalScope(v8::Isolate* isolate)
: unhandledRejections(
[this](jsg::Lock& js,
v8::PromiseRejectEvent event,
jsg::V8Ref<v8::Promise> promise,
jsg::Value value) {
auto ev = jsg::alloc<PromiseRejectionEvent>(event, kj::mv(promise), kj::mv(value));
dispatchEventImpl(js, kj::mv(ev));
}) {}
ServiceWorkerGlobalScope(v8::Isolate* isolate);

void clear();
// Drop all references to JavaScript objects so that the context can be garbage-collected. Call
Expand Down Expand Up @@ -237,7 +229,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
kj::String btoa(v8::Local<v8::Value> data, v8::Isolate* isolate);
v8::Local<v8::String> atob(kj::String data, v8::Isolate* isolate);

void queueMicrotask(v8::Local<v8::Function> task, v8::Isolate* isolate);
void queueMicrotask(jsg::Lock& js, v8::Local<v8::Function> task);

struct StructuredCloneOptions {
jsg::Optional<kj::Array<jsg::Value>> transfer;
Expand All @@ -250,22 +242,20 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
jsg::Optional<StructuredCloneOptions> options,
v8::Isolate* isolate);

TimeoutId::NumberType setTimeout(
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate);
TimeoutId::NumberType setTimeout(jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args);
void clearTimeout(kj::Maybe<TimeoutId::NumberType> timeoutId);

TimeoutId::NumberType setTimeoutInternal(
jsg::Function<void()> function,
double msDelay);

TimeoutId::NumberType setInterval(
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate);
TimeoutId::NumberType setInterval(jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args);
void clearInterval(kj::Maybe<TimeoutId::NumberType> timeoutId) { clearTimeout(timeoutId); }

jsg::Promise<jsg::Ref<Response>> fetch(
Expand Down
120 changes: 120 additions & 0 deletions src/workerd/api/node/async-hooks.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
#include "async-hooks.h"
#include <kj/vector.h>

namespace workerd::api::node {

jsg::Ref<AsyncLocalStorage> AsyncLocalStorage::constructor(jsg::Lock& js) {
return jsg::alloc<AsyncLocalStorage>();
}

v8::Local<v8::Value> AsyncLocalStorage::run(
jsg::Lock& js,
v8::Local<v8::Value> store,
v8::Local<v8::Function> callback,
jsg::Varargs args) {
kj::Vector<v8::Local<v8::Value>> argv(args.size());
for (auto arg : args) {
argv.add(arg.getHandle(js));
}

auto context = js.v8Isolate->GetCurrentContext();

jsg::AsyncContextFrame::StorageScope scope(js, *key, js.v8Ref(store));

return jsg::check(callback->Call(
context,
context->Global(),
argv.size(),
argv.begin()));
}

v8::Local<v8::Value> AsyncLocalStorage::exit(
jsg::Lock& js,
v8::Local<v8::Function> callback,
jsg::Varargs args) {
// Node.js defines exit as running "a function synchronously outside of a context".
// It goes on to say that the store is not accessible within the callback or the
// asynchronous operations created within the callback. Any getStore() call done
// within the callbackfunction will always return undefined... except if run() is
// called which implicitly enables the context again within that scope.
//
// We do not have to emulate Node.js enable/disable behavior since we are not
// implementing the enterWith/disable methods. We can emulate the correct
// behavior simply by calling run with the store value set to undefined, which
// will propagate correctly.
return run(js, v8::Undefined(js.v8Isolate), callback, kj::mv(args));
}

v8::Local<v8::Value> AsyncLocalStorage::getStore(jsg::Lock& js) {
KJ_IF_MAYBE(value, jsg::AsyncContextFrame::current(js).get(*key)) {
return value->getHandle(js);
}
return v8::Undefined(js.v8Isolate);
}

AsyncResource::AsyncResource(jsg::Lock& js)
: frame(kj::addRef(jsg::AsyncContextFrame::current(js))) {}

jsg::Ref<AsyncResource> AsyncResource::constructor(
jsg::Lock& js,
jsg::Optional<kj::String> type,
jsg::Optional<Options> options) {
// The type and options are required as part of the Node.js API compatibility
// but our implementation does not currently make use of them at all. It is ok
// for us to silently ignore both here.
return jsg::alloc<AsyncResource>(js);
}

v8::Local<v8::Function> AsyncResource::staticBind(
jsg::Lock& js,
v8::Local<v8::Function> fn,
jsg::Optional<kj::String> type,
jsg::Optional<v8::Local<v8::Value>> thisArg,
const jsg::TypeHandler<jsg::Ref<AsyncResource>>& handler) {
return AsyncResource::constructor(js, kj::mv(type)
.orDefault([] { return kj::str("AsyncResource"); }))
->bind(js, fn, thisArg, handler);
}

v8::Local<v8::Function> AsyncResource::bind(
jsg::Lock& js,
v8::Local<v8::Function> fn,
jsg::Optional<v8::Local<v8::Value>> thisArg,
const jsg::TypeHandler<jsg::Ref<AsyncResource>>& handler) {
auto& frame = jsg::AsyncContextFrame::current(js);
v8::Local<v8::Function> bound = jsg::AsyncContextFrame::wrap(js, fn, frame, thisArg);

// Per Node.js documentation (https://nodejs.org/dist/latest-v19.x/docs/api/async_context.html#asyncresourcebindfn-thisarg), the returned function "will have an
// asyncResource property referencing the AsyncResource to which the function
// is bound".
jsg::check(bound->Set(js.v8Isolate->GetCurrentContext(),
jsg::v8StrIntern(js.v8Isolate, "asyncResource"_kj),
handler.wrap(js, JSG_THIS)));
return bound;
}

v8::Local<v8::Value> AsyncResource::runInAsyncScope(
jsg::Lock& js,
v8::Local<v8::Function> fn,
jsg::Optional<v8::Local<v8::Value>> thisArg,
jsg::Varargs args) {
kj::Vector<v8::Local<v8::Value>> argv(args.size());
for (auto arg : args) {
argv.add(arg.getHandle(js));
}

auto context = js.v8Isolate->GetCurrentContext();

jsg::AsyncContextFrame::Scope scope(js, *frame);

return jsg::check(fn->Call(
context,
thisArg.orDefault(context->Global()),
argv.size(),
argv.begin()));
}

} // namespace workerd::api::node
Loading

0 comments on commit 97d9501

Please sign in to comment.