Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async/await parallel steps, unify createFunction, clean examples #45

Merged
merged 70 commits into from
Feb 9, 2023

Conversation

jpwilliams
Copy link
Member

@jpwilliams jpwilliams commented Nov 13, 2022

Breaking changes

Function creation helpers removed

The following function creation methods have been removed and should be replaced by using Inngest#createFunction:

  • createFunction
  • createScheduledFunction
  • createStepFunction
  • Inngest#createScheduledFunction
  • Inngest#createStepFunction
import { createFunction } from "inngest";
createFunction("Example", "app/user.created", () => {});
// becomes
import { Inngest } from "inngest";
const inngest = new Inngest({ name: "App" });
inngest.createFunction("Example", "app/user.created", () => {});
import { createScheduledFunction } from "inngest";
createScheduledFunction("Example", "* * * * *", () => {}); // or inngest.createScheduledFunction
// becomes
import { Inngest } from "inngest";
const inngest = new Inngest({ name: "App" });
inngest.createFunction("Example", { cron: "* * * * *" }, () => {});
import { createStepFunction } from "inngest";
createStepFunction("Example", "* * * * *", () => {}); // or inngest.createStepFunction
// becomes
import { Inngest } from "inngest";
const inngest = new Inngest({ name: "App" });
inngest.createFunction("Example", { cron: "* * * * *" }, async ({ tools }) => {
  // Use a tool to create a step function
});

Step functions are now asynchronous

In order to provide the full power of asynchronous JavaScript, step functions are now required to be async functions, and all step tooling will return promises instead of synchronously.

If you're using TypeScript, you'll be guided through the changes at each stage. For example, trying to access .id of the new Promise<User> will throw an error telling you that it must first be awaited.

import { createStepFunction } from "inngest";
import { userDb } from "./db";
import { email } from "./email";

export default createStepFunction(
  "Example",
  "app/user.created",
  ({ event, tools }) => {
    const user = tools.run("Get user email", () => userDb.get(event.userId));

    // We run synchronously, so wait for the email to be send before the alert is sent
    tools.run("Send email", () => email.sendEmail(user.email, "Welcome!"));
    tools.run("Send alert to staff", () =>
      email.sendAlert("New user created!")
    );
  }
);

This would be converted to the following:

import { inngest } from "./client";
import { userDb } from "./db";
import { email } from "./email";

export default inngest.createFunction( // use client instead of helper
  { name: "Example", fns: { ...userDb, ...email } }, // can pass functions to wrap in tools.run()
  "app/user.created",
  async ({ event, fns: { getUser, sendEmail, sendAlert }}) => {
    const user = await getUser(event.userId); // use fns directly that have been passed

    // We don't await these, so they are run in parallel now
    sendEmail(user.email, "Welcome!");
    sendAlert("New user created!");
  }
);

Custom handlers require a stepId

In order to provide the parallel functionality in this PR, all handlers created using InngestCommHandler must provide a stepId parameter when attempting to run a function. This should be accessed via the query string using the exported queryKeys.StepId enum.

run: async () => {
  if (req.method === "POST") {
    return {
      fnId: url.searchParams.get(queryKeys.FnId) as string,
+     stepId: url.searchParams.get(queryKeys.StepId) as string,

Features

Pass functions to wrap as retriable steps

When creating a function, you can now pass a fns key to automatically wrap all found functions in tools.run() step tooling, automatically providing your existing functionality with retries and durability.

import { inngest } from "./client";
import { userDb } from "./db";
import { email } from "./email";

export default inngest.createFunction(
  { name: "Example", fns: { ...userDb, ...email } },
  "app/user.created",
  async ({ event, fns: { getUser, sendEmail}}) => {
    const user = await getUser(event.userId);
    sendEmail(user.email, "Welcome!");
  }
);

Fixes

Related changes/issues based on this PR

SDK changes

  • Refactor step function tooling to be async
  • Unify createFunction, createScheduledFunction, and createStepFunction under a single createFunction method
  • Preserve complex client-less createFunction helper

    Prefer instantiation with a client for now.

  • Add error-handling capabilities by giving the SDK the ability to send back a serialised error as well as data
  • Allow use of regular JS tooling (.then(), try/catch, etc) in step functions without gotchas
  • Create a new StepOpCode for a no-op other than None
    We just return an empty array.
  • Add ability to pass steps in as "just functions" to avoid wrapping all user code in tools.run()
  • Add ability to pass steps in as "just functions" to a new Inngest() client, which is merged with any steps passed directly to inngest.createFunction()

    Defer this to be implemented with INN-998 Add local Zod schemas when instantiating clients #66, as this will alter generics for Inngest and InngestFunction to aid with this change.

  • Hash synchronous groups of steps and throw errors if synchronous step order doesn't match
  • Allow triggering steps using stepId query param instead of op position codes
  • Add new StepPlanned (or similar) op code for back compat
  • Create step alias for tools and deprecate tools
  • Add a large warning when the a step function resolves, but some tooling is still pending; we want to highlight for users that to be safe they should make sure to await in serverless environments

Examples

See examples
// Any type of function uses the same method.
// declare createFunction: (nameOrFunctionOpts, eventNameOrTriggerOpts, fn) => void;
inngest.createFunction("...", "demo/event.sent", () => {});
inngest.createFunction("...", { event: "demo/event.sent" }, () => {});
inngest.createFunction("...", { cron: "* * * * *" }, () => {});
// A step function uses the same method; just use a tool.
inngest.createFunction("...", "demo/event.sent", ({ tools: { run } }) => {
  run("Step", () => {});
});
// Step functions now use async functions - easier to map complex flows.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  const randomNumber = await run("Get random number", () => Math.random());
  await run("Send random number", () => sendEmail("...", randomNumber));
});
// With async support, fire-and-forget parallel steps are easy to create - just
// trigger them all and SDK will let Inngest know of all of the pending actions.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  run("Send email", () => sendEmail("..."));
  run("Send another email", () => sendEmail("..."));
  run("Send yet another email", () => sendEmail("..."));
});

// This means that the response from the SDK to Inngest is always an array of
// upcoming steps.
//
// In this case, the first call would return all three actions, as we're not
// awaiting them; the SDK gets a single synchronous tick of the event loop to
// decide what is next.
[
  { op: "Step", name: "Send email", run: true },
  { op: "Step", name: "Send another email", run: true },
  { op: "Step", name: "Send yet another email", run: true },
];
// Step functions can handle errors too, meaning we can use usual tools like
// try/catch.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  try {
    await run("Step", () => {});
  } catch (err) {
    await run("Send error to Tim", () => sendEmail("...", err));
  }
});

// Or perhaps we want to silently handle an error case and not have it affect
// other steps.
inngest.createFunction("...", "demo/event.sent", async () => {
  await run("Send email", () => sendEmail("...")).catch(() => {
    run("Send error to Tim", () => sendEmail("...", err));
  });
});
// Tools such as `Promise.all` are fine too.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  await Promise.all([
    run("Send email", () => sendEmail("...")),
    run("Send another email", () => sendEmail("...")),
  ]);
});
// We can even create parallel chains of steps that trigger immediately, follow
// their own path, then eventually come back together.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  const fooStep = run("Foo", () => fooSomething())
    .then((data) => run("Foo again", () => fooSomethingElse(data)))
    .then((data) => run("Foo again again", () => fooSomethingElse(data)));

  const barStep = run("Bar", () => barSomething())
    .then((data) => run("Bar again", () => barSomethingElse(data)))
    .then((data) => run("Bar again again", () => barSomethingElse(data)));

  const [foo, bar] = await Promise.all([fooStep, barStep]);

  await run("Send email", () => sendEmail("...", { foo, bar }));
});
// Wrapping all user code in `run()` can be a bit verbose. One of the initial
// goals of the SDK was that steps were "just functions". If we pass those to
// our Inngest function then we can shim them with `run()` automatically, which
// makes the code look _super_ clean.
//
// The TS types and runtime JS will non-destructively filter the input and add
// shims, meaning you could pass entire structures/classes in with no bother,
// like we do here with `userDb` and `email` which may export more than just
// functions. We even retain the function's comments!
import * as userDb from "./dbs/user";
import * as email from "./email";

inngest.createFunction(
  { name: "...", fns: { ...email, ...userDb } },
  "demo/event.sent",
  async ({ event, fns: { sendEmail, getUserById } }) => {
    const user = await getUserById(event.user.id);
    await sendEmail(user.email, "...");
  }
);

// This can really help clean up functions with lots of steps, like our example
// of parallel paths above.
inngest.createFunction(
  { name: "...", fns: { ...fooLib, ...barLib, ...email } },
  "demo/event.sent",
  async ({ tools: { run } }) => {
    const fooStep = fooSomething()
      .then(fooSomethingElse)
      .then(fooSomethingElse);

    const barStep = barSomething()
      .then(barSomethingElse)
      .then(barSomethingElse);

    const [foo, bar] = await Promise.all([fooStep, barStep]);
    await sendEmail("...", { foo, bar });
  }
);
// Sometimes, a user may want to bundle async actions together in a single step,
// for example when something must be fetched from the DB right before sending
// an email.
//
// If this is unique to this Inngest function, we can always use `tools.run()`
// to run in-line code. If we wanted to create a reusable step, however, we just
// make a regular function.
const sendEmailToUser = async (userId: string, body: string) => {
  const user = await getUserById(userId);
  await sendEmail(user.email, body);
};

inngest.createFunction(
  { name: "...", fns: { sendEmailToUser, something } },
  "demo/event.sent",
  async ({ event }) => {
    await something();
    await sendEmailToUser(event.user.id, "...");
  }
);
// If we were to create a library of reusable steps, a _future_ option that is
// not in this PR could be to pass steps in to the Inngest constructor to
// provide the tooling for every function automatically.
const inngest = new Inngest<Events>({
  name: "...",
  fns: { ...email, ...userDb, ...postDb },
});

inngest.createFunction(
  "...",
  "demo/event.sent",
  async ({ event, fns: { getUserById, getPostsByTag, sendEmail } }) => {
    const user = await getUserById(event.user.id);
    const posts = await getPostsByTag(user.favouriteTag);
    await sendEmail(user.email, posts);
  }
);

@jpwilliams jpwilliams added the ✨ new New features, integrations, or exports label Nov 13, 2022
@jpwilliams jpwilliams self-assigned this Nov 13, 2022
@jpwilliams jpwilliams force-pushed the async-await-parallel-steps branch from 48e0e75 to 756b85b Compare January 27, 2023 17:21
@jpwilliams jpwilliams force-pushed the async-await-parallel-steps branch from 5596e0f to e242f29 Compare January 27, 2023 20:32
@jpwilliams jpwilliams force-pushed the async-await-parallel-steps branch from 4f493ca to 5c1a2e9 Compare February 1, 2023 14:49
If the function is returning a value, then it can't create any more
steps except by resolving or rejecting existing ones. If it does that,
we capture any next steps by stepping through the existing stack loop.

With this, we _can_ smartly identify when all operations in a function
have been completed via success or failure, which is awesome!

With this change, we keep returning `206 Partial Content` until every
Promise is settled, at which point that single, final request will
return `200 OK`.
@jpwilliams jpwilliams force-pushed the async-await-parallel-steps branch from 9871e39 to 1bb3c73 Compare February 3, 2023 14:15
@jpwilliams jpwilliams force-pushed the async-await-parallel-steps branch from 3c6741f to 4fcf0f6 Compare February 3, 2023 18:35
@jpwilliams jpwilliams merged commit 4ff191a into main Feb 9, 2023
@jpwilliams jpwilliams deleted the async-await-parallel-steps branch February 9, 2023 16:17
jpwilliams added a commit that referenced this pull request Feb 13, 2023
## Summary

Approximately measures performance during function execution and returns
a `Server-Timing` header to the executor, e.g.:
```
Server-Timing: handler;dur=0, memoising;dur=1, running-step;dur=164
```
We can add to and tweak what we want to see here, but it should provide
a sane method of approximately measuring some more SDK performance
points.

## Related

- #45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✨ new New features, integrations, or exports
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants