Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of async context tracking #208

Merged
merged 1 commit into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions samples/async-context/config.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Imports the base schema for workerd configuration files.

# Refer to the comments in /src/workerd/server/workerd.capnp for more details.

using Workerd = import "/workerd/workerd.capnp";

# A constant of type Workerd.Config defines the top-level configuration for an
# instance of the workerd runtime. A single config file can contain multiple
# Workerd.Config definitions and must have at least one.
const helloWorldExample :Workerd.Config = (

# Every workerd instance consists of a set of named services. A worker, for instance,
# is a type of service. Other types of services can include external servers, the
# ability to talk to a network, or accessing a disk directory. Here we create a single
# worker service. The configuration details for the worker are defined below.
services = [ (name = "main", worker = .helloWorld) ],

# Every configuration defines the one or more sockets on which the server will listene.
# Here, we create a single socket that will listen on localhost port 8080, and will
# dispatch to the "main" service that we defined above.
sockets = [ ( name = "http", address = "*:8080", http = (), service = "main" ) ]
);

# The definition of the actual helloWorld worker exposed using the "main" service.
# In this example the worker is implemented as a single simple script (see worker.js).
# The compatibilityDate is required. For more details on compatibility dates see:
# https://developers.cloudflare.com/workers/platform/compatibility-dates/

const helloWorld :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "worker.js")
],
compatibilityDate = "2022-11-08",
compatibilityFlags = ["nodejs_18_compat_experimental"]
);
23 changes: 23 additions & 0 deletions samples/async-context/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { default as async_hooks } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = async_hooks;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm. doesn't import AsyncLocalStorage, AsyncResource from "" work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would if these were typescript modules. The built-in using a backing jsg::Object does not yet support named exports.


const als = new AsyncLocalStorage();

export default {
async fetch(request) {
const differentScope = als.run(123, () => AsyncResource.bind(() => {
console.log(als.getStore());
}));

return als.run("Hello World", async () => {

// differentScope is attached to a different async context, so
// it will see a different value for als.getStore() (123)
setTimeout(differentScope, 5);

// Some simulated async delay.
await scheduler.wait(10);
return new Response(als.getStore()); // "Hello World"
});
}
};
107 changes: 73 additions & 34 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <kj/encoding.h>

#include <workerd/api/system-streams.h>
#include <workerd/jsg/async-context.h>
#include <workerd/jsg/ser.h>
#include <workerd/jsg/util.h>
#include <workerd/io/trace.h>
Expand Down Expand Up @@ -96,6 +97,24 @@ void ExecutionContext::passThroughOnException() {
IoContext::current().setFailOpen();
}

ServiceWorkerGlobalScope::ServiceWorkerGlobalScope(v8::Isolate* isolate)
: unhandledRejections(
[this](jsg::Lock& js,
v8::PromiseRejectEvent event,
jsg::V8Ref<v8::Promise> promise,
jsg::Value value) {
// If async context tracking is enabled, then we need to ensure that we enter the frame
// associated with the promise before we invoke the unhandled rejection callback handling.
kj::Maybe<jsg::AsyncContextFrame::Scope> maybeScope;
KJ_IF_MAYBE(context, jsg::AsyncContextFrame::tryGetContext(js, promise)) {
if (!context->isRoot(js)) {
maybeScope.emplace(js, jsg::AsyncContextFrame::tryGetContext(js, promise));
}
}
auto ev = jsg::alloc<PromiseRejectionEvent>(event, kj::mv(promise), kj::mv(value));
dispatchEventImpl(js, kj::mv(ev));
}) {}

void ServiceWorkerGlobalScope::clear() {
removeAllHandlers();
unhandledRejections.clear();
Expand Down Expand Up @@ -478,8 +497,10 @@ v8::Local<v8::String> ServiceWorkerGlobalScope::atob(kj::String data, v8::Isolat
return jsg::v8StrFromLatin1(isolate, decoded.asBytes());
}

void ServiceWorkerGlobalScope::queueMicrotask(v8::Local<v8::Function> task, v8::Isolate* isolate) {
isolate->EnqueueMicrotask(task);
void ServiceWorkerGlobalScope::queueMicrotask(
jsg::Lock& js,
v8::Local<v8::Function> task) {
js.v8Isolate->EnqueueMicrotask(jsg::AsyncContextFrame::wrap(js, task, nullptr, nullptr));
}

v8::Local<v8::Value> ServiceWorkerGlobalScope::structuredClone(
Expand Down Expand Up @@ -508,27 +529,36 @@ TimeoutId::NumberType ServiceWorkerGlobalScope::setTimeoutInternal(
}

TimeoutId::NumberType ServiceWorkerGlobalScope::setTimeout(
jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate) {
jsg::Varargs args) {
auto& context = jsg::AsyncContextFrame::current(js);
if (!context.isRoot(js)) {
// If the AsyncContextFrame is the root frame, we do not have to wrap it at all.
// This is because setInterval/setTimeout callbacks are always invoked by the
// system. If there is no AsyncContextFrame::Scope on the stack, it will always
// use the root frame.
function = js.v8Ref(jsg::AsyncContextFrame::current(js).wrap(
js, function.getHandle(js), nullptr, nullptr));
}
auto argv = kj::heapArrayFromIterable<jsg::Value>(kj::mv(args));
auto timeoutId = IoContext::current().setTimeoutImpl(
timeoutIdGenerator,
/* repeats = */ false,
[function = function.addRef(isolate), argv = kj::mv(argv)](jsg::Lock& js) mutable {
auto isolate = js.v8Isolate;
auto context = isolate->GetCurrentContext();
auto localFunction = function.getHandle(isolate);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(isolate);
};
auto argc = localArgs.size();
timeoutIdGenerator,
/* repeats = */ false,
[function = function.addRef(js),
argv = kj::mv(argv)]
(jsg::Lock& js) mutable {
auto context = js.v8Isolate->GetCurrentContext();
auto localFunction = function.getHandle(js);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(js);
};
auto argc = localArgs.size();

// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
},
msDelay.orDefault(0));
// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
}, msDelay.orDefault(0));
return timeoutId.toNumber();
}

Expand All @@ -539,27 +569,36 @@ void ServiceWorkerGlobalScope::clearTimeout(kj::Maybe<TimeoutId::NumberType> tim
}

TimeoutId::NumberType ServiceWorkerGlobalScope::setInterval(
jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate) {
jsg::Varargs args) {
auto& context = jsg::AsyncContextFrame::current(js);
if (!context.isRoot(js)) {
// If the AsyncContextFrame is the root frame, we do not have to wrap it at all.
// This is because setInterval/setTimeout callbacks are always invoked by the
// system. If there is no AsyncContextFrame::Scope on the stack, it will always
// use the root frame.
function = js.v8Ref(jsg::AsyncContextFrame::current(js).wrap(
js, function.getHandle(js), nullptr, nullptr));
}
auto argv = kj::heapArrayFromIterable<jsg::Value>(kj::mv(args));
auto timeoutId = IoContext::current().setTimeoutImpl(
timeoutIdGenerator,
/* repeats = */ true,
[function = function.addRef(isolate), argv = kj::mv(argv)](jsg::Lock& js) mutable {
auto isolate = js.v8Isolate;
auto context = isolate->GetCurrentContext();
auto localFunction = function.getHandle(isolate);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(isolate);
};
auto argc = localArgs.size();
timeoutIdGenerator,
/* repeats = */ true,
[function = function.addRef(js),
argv = kj::mv(argv)]
(jsg::Lock& js) mutable {
auto context = js.v8Isolate->GetCurrentContext();
auto localFunction = function.getHandle(js);
auto localArgs = KJ_MAP(arg, argv) {
return arg.getHandle(js);
};
auto argc = localArgs.size();

// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
},
msDelay.orDefault(0));
// Cast to void to discard the result value.
(void)jsg::check(localFunction->Call(context, context->Global(), argc, &localArgs.front()));
}, msDelay.orDefault(0));
return timeoutId.toNumber();
}

Expand Down
30 changes: 10 additions & 20 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
// Global object API exposed to JavaScript.

public:
ServiceWorkerGlobalScope(v8::Isolate* isolate)
: unhandledRejections(
[this](jsg::Lock& js,
v8::PromiseRejectEvent event,
jsg::V8Ref<v8::Promise> promise,
jsg::Value value) {
auto ev = jsg::alloc<PromiseRejectionEvent>(event, kj::mv(promise), kj::mv(value));
dispatchEventImpl(js, kj::mv(ev));
}) {}
ServiceWorkerGlobalScope(v8::Isolate* isolate);

void clear();
// Drop all references to JavaScript objects so that the context can be garbage-collected. Call
Expand Down Expand Up @@ -237,7 +229,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
kj::String btoa(v8::Local<v8::Value> data, v8::Isolate* isolate);
v8::Local<v8::String> atob(kj::String data, v8::Isolate* isolate);

void queueMicrotask(v8::Local<v8::Function> task, v8::Isolate* isolate);
void queueMicrotask(jsg::Lock& js, v8::Local<v8::Function> task);

struct StructuredCloneOptions {
jsg::Optional<kj::Array<jsg::Value>> transfer;
Expand All @@ -250,22 +242,20 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
jsg::Optional<StructuredCloneOptions> options,
v8::Isolate* isolate);

TimeoutId::NumberType setTimeout(
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate);
TimeoutId::NumberType setTimeout(jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args);
void clearTimeout(kj::Maybe<TimeoutId::NumberType> timeoutId);

TimeoutId::NumberType setTimeoutInternal(
jsg::Function<void()> function,
double msDelay);

TimeoutId::NumberType setInterval(
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args,
v8::Isolate* isolate);
TimeoutId::NumberType setInterval(jsg::Lock& js,
jsg::V8Ref<v8::Function> function,
jsg::Optional<double> msDelay,
jsg::Varargs args);
void clearInterval(kj::Maybe<TimeoutId::NumberType> timeoutId) { clearTimeout(timeoutId); }

jsg::Promise<jsg::Ref<Response>> fetch(
Expand Down
120 changes: 120 additions & 0 deletions src/workerd/api/node/async-hooks.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
#include "async-hooks.h"
#include <kj/vector.h>

namespace workerd::api::node {

jsg::Ref<AsyncLocalStorage> AsyncLocalStorage::constructor(jsg::Lock& js) {
return jsg::alloc<AsyncLocalStorage>();
}

v8::Local<v8::Value> AsyncLocalStorage::run(
jsg::Lock& js,
v8::Local<v8::Value> store,
v8::Local<v8::Function> callback,
jsg::Varargs args) {
kj::Vector<v8::Local<v8::Value>> argv(args.size());
for (auto arg : args) {
argv.add(arg.getHandle(js));
}

auto context = js.v8Isolate->GetCurrentContext();

jsg::AsyncContextFrame::StorageScope scope(js, *key, js.v8Ref(store));

return jsg::check(callback->Call(
context,
context->Global(),
argv.size(),
argv.begin()));
}

v8::Local<v8::Value> AsyncLocalStorage::exit(
jsg::Lock& js,
v8::Local<v8::Function> callback,
jsg::Varargs args) {
// Node.js defines exit as running "a function synchronously outside of a context".
// It goes on to say that the store is not accessible within the callback or the
// asynchronous operations created within the callback. Any getStore() call done
// within the callbackfunction will always return undefined... except if run() is
// called which implicitly enables the context again within that scope.
//
// We do not have to emulate Node.js enable/disable behavior since we are not
// implementing the enterWith/disable methods. We can emulate the correct
// behavior simply by calling run with the store value set to undefined, which
// will propagate correctly.
return run(js, v8::Undefined(js.v8Isolate), callback, kj::mv(args));
}

v8::Local<v8::Value> AsyncLocalStorage::getStore(jsg::Lock& js) {
KJ_IF_MAYBE(value, jsg::AsyncContextFrame::current(js).get(*key)) {
return value->getHandle(js);
}
return v8::Undefined(js.v8Isolate);
}

AsyncResource::AsyncResource(jsg::Lock& js)
: frame(kj::addRef(jsg::AsyncContextFrame::current(js))) {}

jsg::Ref<AsyncResource> AsyncResource::constructor(
jsg::Lock& js,
jsg::Optional<kj::String> type,
jsg::Optional<Options> options) {
// The type and options are required as part of the Node.js API compatibility
// but our implementation does not currently make use of them at all. It is ok
// for us to silently ignore both here.
return jsg::alloc<AsyncResource>(js);
}

v8::Local<v8::Function> AsyncResource::staticBind(
jsg::Lock& js,
v8::Local<v8::Function> fn,
jsg::Optional<kj::String> type,
jsg::Optional<v8::Local<v8::Value>> thisArg,
const jsg::TypeHandler<jsg::Ref<AsyncResource>>& handler) {
return AsyncResource::constructor(js, kj::mv(type)
.orDefault([] { return kj::str("AsyncResource"); }))
->bind(js, fn, thisArg, handler);
}

v8::Local<v8::Function> AsyncResource::bind(
jsg::Lock& js,
v8::Local<v8::Function> fn,
jsg::Optional<v8::Local<v8::Value>> thisArg,
const jsg::TypeHandler<jsg::Ref<AsyncResource>>& handler) {
auto& frame = jsg::AsyncContextFrame::current(js);
v8::Local<v8::Function> bound = jsg::AsyncContextFrame::wrap(js, fn, frame, thisArg);

// Per Node.js documentation (https://nodejs.org/dist/latest-v19.x/docs/api/async_context.html#asyncresourcebindfn-thisarg), the returned function "will have an
// asyncResource property referencing the AsyncResource to which the function
// is bound".
jsg::check(bound->Set(js.v8Isolate->GetCurrentContext(),
jsg::v8StrIntern(js.v8Isolate, "asyncResource"_kj),
handler.wrap(js, JSG_THIS)));
return bound;
}

v8::Local<v8::Value> AsyncResource::runInAsyncScope(
jsg::Lock& js,
v8::Local<v8::Function> fn,
jsg::Optional<v8::Local<v8::Value>> thisArg,
jsg::Varargs args) {
kj::Vector<v8::Local<v8::Value>> argv(args.size());
for (auto arg : args) {
argv.add(arg.getHandle(js));
}

auto context = js.v8Isolate->GetCurrentContext();

jsg::AsyncContextFrame::Scope scope(js, *frame);

return jsg::check(fn->Call(
context,
thisArg.orDefault(context->Global()),
argv.size(),
argv.begin()));
}

} // namespace workerd::api::node
Loading