Skip to content

Commit

Permalink
major: Add async/await parallel steps, unify createFunction, cl…
Browse files Browse the repository at this point in the history
…ean examples (#45)

## Breaking changes

### Function creation helpers removed

The following function creation methods have been removed and should be
replaced by using `Inngest#createFunction`:
- `createFunction`
- `createScheduledFunction`
- `createStepFunction`
- `Inngest#createScheduledFunction`
- `Inngest#createStepFunction`

```ts
import { createFunction } from "inngest";
createFunction("Example", "app/user.created", () => {});
// becomes
import { Inngest } from "inngest";
const inngest = new Inngest({ name: "App" });
inngest.createFunction("Example", "app/user.created", () => {});
```

```ts
import { createScheduledFunction } from "inngest";
createScheduledFunction("Example", "* * * * *", () => {}); // or inngest.createScheduledFunction
// becomes
import { Inngest } from "inngest";
const inngest = new Inngest({ name: "App" });
inngest.createFunction("Example", { cron: "* * * * *" }, () => {});
```

```ts
import { createStepFunction } from "inngest";
createStepFunction("Example", "* * * * *", () => {}); // or inngest.createStepFunction
// becomes
import { Inngest } from "inngest";
const inngest = new Inngest({ name: "App" });
inngest.createFunction("Example", { cron: "* * * * *" }, async ({ tools }) => {
  // Use a tool to create a step function
});
```

### Step functions are now asynchronous

In order to provide the full power of asynchronous JavaScript, step
functions are now required to be async functions, and all step tooling
will return promises instead of synchronously.

If you're using TypeScript, you'll be guided through the changes at each
stage. For example, trying to access `.id` of the new `Promise<User>`
will throw an error telling you that it must first be awaited.

```ts
import { createStepFunction } from "inngest";
import { userDb } from "./db";
import { email } from "./email";

export default createStepFunction(
  "Example",
  "app/user.created",
  ({ event, tools }) => {
    const user = tools.run("Get user email", () => userDb.get(event.userId));

    // We run synchronously, so wait for the email to be send before the alert is sent
    tools.run("Send email", () => email.sendEmail(user.email, "Welcome!"));
    tools.run("Send alert to staff", () =>
      email.sendAlert("New user created!")
    );
  }
);
```

This would be converted to the following:

```ts
import { inngest } from "./client";
import { userDb } from "./db";
import { email } from "./email";

export default inngest.createFunction( // use client instead of helper
  { name: "Example", fns: { ...userDb, ...email } }, // can pass functions to wrap in tools.run()
  "app/user.created",
  async ({ event, fns: { getUser, sendEmail, sendAlert }}) => {
    const user = await getUser(event.userId); // use fns directly that have been passed

    // We don't await these, so they are run in parallel now
    sendEmail(user.email, "Welcome!");
    sendAlert("New user created!");
  }
);
```

### Custom handlers require a `stepId`

In order to provide the parallel functionality in this PR, all handlers
created using `InngestCommHandler` must provide a `stepId` parameter
when attempting to run a function. This should be accessed via the query
string using the exported `queryKeys.StepId` enum.

```diff
run: async () => {
  if (req.method === "POST") {
    return {
      fnId: url.searchParams.get(queryKeys.FnId) as string,
+     stepId: url.searchParams.get(queryKeys.StepId) as string,
```

## Features

### Pass functions to wrap as retriable steps

When creating a function, you can now pass a `fns` key to automatically
wrap all found functions in `tools.run()` step tooling, automatically
providing your existing functionality with retries and durability.

```ts
import { inngest } from "./client";
import { userDb } from "./db";
import { email } from "./email";

export default inngest.createFunction(
  { name: "Example", fns: { ...userDb, ...email } },
  "app/user.created",
  async ({ event, fns: { getUser, sendEmail}}) => {
    const user = await getUser(event.userId);
    sendEmail(user.email, "Welcome!");
  }
);
```

## Fixes

- `user` key in event payloads can now be any value, to ensure
conflicting generated events are accepted; see #87 for further
discussion

## Related changes/issues based on this PR

- #66 
- #73 
- #74 
- #75 
- #76 
- #69 
- #43 

## SDK changes

- [x] Refactor step function tooling to be `async`
- [x] Unify `createFunction`, `createScheduledFunction`, and
`createStepFunction` under a single `createFunction` method
- [x] ~Preserve complex client-less `createFunction` helper~
  > Prefer instantiation with a client for now.
- [x] Add error-handling capabilities by giving the SDK the ability to
send back a serialised `error` as well as `data`
- [x] Allow use of regular JS tooling (`.then()`, `try/catch`, etc) in
step functions without gotchas
- [x] ~Create a new `StepOpCode` for a no-op other than `None`~
We just return an empty array.
- [x] Add ability to pass steps in as "just functions" to avoid wrapping
all user code in `tools.run()`
- [x] ~Add ability to pass steps in as "just functions" to a `new
Inngest()` client, which is merged with any steps passed directly to
`inngest.createFunction()`~
> Defer this to be implemented with #66, as this will alter generics for
`Inngest` and `InngestFunction` to aid with this change.
- [x] Hash synchronous groups of steps and throw errors if synchronous
step order doesn't match
- [x] Allow triggering steps using `stepId` query param instead of op
position codes
- [x] Add new `StepPlanned` (or similar) op code for back compat
- [ ] Create `step` alias for `tools` and deprecate `tools`
- [ ] Add a large warning when the a step function resolves, but some
tooling is still pending; we want to highlight for users that to be safe
they should make sure to await in serverless environments

## Examples

<details>
<summary>See examples</summary>

```ts
// Any type of function uses the same method.
// declare createFunction: (nameOrFunctionOpts, eventNameOrTriggerOpts, fn) => void;
inngest.createFunction("...", "demo/event.sent", () => {});
inngest.createFunction("...", { event: "demo/event.sent" }, () => {});
inngest.createFunction("...", { cron: "* * * * *" }, () => {});
```

```ts
// A step function uses the same method; just use a tool.
inngest.createFunction("...", "demo/event.sent", ({ tools: { run } }) => {
  run("Step", () => {});
});
```

```ts
// Step functions now use async functions - easier to map complex flows.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  const randomNumber = await run("Get random number", () => Math.random());
  await run("Send random number", () => sendEmail("...", randomNumber));
});
```

```ts
// With async support, fire-and-forget parallel steps are easy to create - just
// trigger them all and SDK will let Inngest know of all of the pending actions.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  run("Send email", () => sendEmail("..."));
  run("Send another email", () => sendEmail("..."));
  run("Send yet another email", () => sendEmail("..."));
});

// This means that the response from the SDK to Inngest is always an array of
// upcoming steps.
//
// In this case, the first call would return all three actions, as we're not
// awaiting them; the SDK gets a single synchronous tick of the event loop to
// decide what is next.
[
  { op: "Step", name: "Send email", run: true },
  { op: "Step", name: "Send another email", run: true },
  { op: "Step", name: "Send yet another email", run: true },
];
```

```ts
// Step functions can handle errors too, meaning we can use usual tools like
// try/catch.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  try {
    await run("Step", () => {});
  } catch (err) {
    await run("Send error to Tim", () => sendEmail("...", err));
  }
});

// Or perhaps we want to silently handle an error case and not have it affect
// other steps.
inngest.createFunction("...", "demo/event.sent", async () => {
  await run("Send email", () => sendEmail("...")).catch(() => {
    run("Send error to Tim", () => sendEmail("...", err));
  });
});
```

```ts
// Tools such as `Promise.all` are fine too.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  await Promise.all([
    run("Send email", () => sendEmail("...")),
    run("Send another email", () => sendEmail("...")),
  ]);
});
```

```ts
// We can even create parallel chains of steps that trigger immediately, follow
// their own path, then eventually come back together.
inngest.createFunction("...", "demo/event.sent", async ({ tools: { run } }) => {
  const fooStep = run("Foo", () => fooSomething())
    .then((data) => run("Foo again", () => fooSomethingElse(data)))
    .then((data) => run("Foo again again", () => fooSomethingElse(data)));

  const barStep = run("Bar", () => barSomething())
    .then((data) => run("Bar again", () => barSomethingElse(data)))
    .then((data) => run("Bar again again", () => barSomethingElse(data)));

  const [foo, bar] = await Promise.all([fooStep, barStep]);

  await run("Send email", () => sendEmail("...", { foo, bar }));
});
```

```ts
// Wrapping all user code in `run()` can be a bit verbose. One of the initial
// goals of the SDK was that steps were "just functions". If we pass those to
// our Inngest function then we can shim them with `run()` automatically, which
// makes the code look _super_ clean.
//
// The TS types and runtime JS will non-destructively filter the input and add
// shims, meaning you could pass entire structures/classes in with no bother,
// like we do here with `userDb` and `email` which may export more than just
// functions. We even retain the function's comments!
import * as userDb from "./dbs/user";
import * as email from "./email";

inngest.createFunction(
  { name: "...", fns: { ...email, ...userDb } },
  "demo/event.sent",
  async ({ event, fns: { sendEmail, getUserById } }) => {
    const user = await getUserById(event.user.id);
    await sendEmail(user.email, "...");
  }
);

// This can really help clean up functions with lots of steps, like our example
// of parallel paths above.
inngest.createFunction(
  { name: "...", fns: { ...fooLib, ...barLib, ...email } },
  "demo/event.sent",
  async ({ tools: { run } }) => {
    const fooStep = fooSomething()
      .then(fooSomethingElse)
      .then(fooSomethingElse);

    const barStep = barSomething()
      .then(barSomethingElse)
      .then(barSomethingElse);

    const [foo, bar] = await Promise.all([fooStep, barStep]);
    await sendEmail("...", { foo, bar });
  }
);
```

```ts
// Sometimes, a user may want to bundle async actions together in a single step,
// for example when something must be fetched from the DB right before sending
// an email.
//
// If this is unique to this Inngest function, we can always use `tools.run()`
// to run in-line code. If we wanted to create a reusable step, however, we just
// make a regular function.
const sendEmailToUser = async (userId: string, body: string) => {
  const user = await getUserById(userId);
  await sendEmail(user.email, body);
};

inngest.createFunction(
  { name: "...", fns: { sendEmailToUser, something } },
  "demo/event.sent",
  async ({ event }) => {
    await something();
    await sendEmailToUser(event.user.id, "...");
  }
);
```

```ts
// If we were to create a library of reusable steps, a _future_ option that is
// not in this PR could be to pass steps in to the Inngest constructor to
// provide the tooling for every function automatically.
const inngest = new Inngest<Events>({
  name: "...",
  fns: { ...email, ...userDb, ...postDb },
});

inngest.createFunction(
  "...",
  "demo/event.sent",
  async ({ event, fns: { getUserById, getPostsByTag, sendEmail } }) => {
    const user = await getUserById(event.user.id);
    const posts = await getPostsByTag(user.favouriteTag);
    await sendEmail(user.email, posts);
  }
);
```
</details>
  • Loading branch information
jpwilliams authored Feb 9, 2023
1 parent f815541 commit 4ff191a
Show file tree
Hide file tree
Showing 49 changed files with 2,213 additions and 1,210 deletions.
51 changes: 38 additions & 13 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@ jobs:
- inngest/sdk-example-fresh-deno-deploy
- inngest/sdk-example-nuxt-vercel
steps:
# Temporary for inngest@next
- name: Install GoReleaser
uses: goreleaser/goreleaser-action@v4
with:
install-only: true

# Temporary for inngest@next
# Checkout the CLI
- uses: actions/checkout@v3
with:
token: ${{ secrets.EXAMPLES_GITHUB_TOKEN }}
repository: inngest/inngest
path: cli
ref: main

# Temporary for inngest@next
# Get the CLI ready
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18
- run: make dev
working-directory: cli

# Checkout the repo
- name: Checkout SDK
uses: actions/checkout@v3
Expand Down Expand Up @@ -158,7 +182,7 @@ jobs:
if: ${{ steps.deno-check.outputs.isDeno == 'true' }}
run: |
find ${{ steps.inngest-functions-path.outputs.dir }} -type f -name "*.ts" -exec sed -i 's/from "inngest/from "npm:inngest/g' {} \;
find ${{ steps.inngest-functions-path.outputs.dir }} -type f -name "*.ts" -exec sed -i -E 's/from "\.(.+)"/from "\.\1\.ts"/g' {} \;
find ${{ steps.inngest-functions-path.outputs.dir }} -type f -name "*.ts" -exec sed -i -E 's/from "\.(.+)"/from "\.\1\/index\.ts"/g' {} \;
working-directory: examples/${{ matrix.repo }}

# Try to build the example
Expand All @@ -172,6 +196,19 @@ jobs:
fi
working-directory: examples/${{ matrix.repo }}

# Temporary for inngest@next
# Run the built Inngest CLI dev server
- name: Run Inngest CLI dev server
run: ./inngest dev &
working-directory: cli/dist/inngest_linux_amd64_v1
env:
DO_NOT_TRACK: 1
- name: Wait for the Inngest dev server to start
uses: mydea/action-wait-for-api@v1
with:
url: "http://localhost:8288"
timeout: "60"

# Run the example
- name: Run the example's dev server
run: |
Expand All @@ -196,18 +233,6 @@ jobs:
run: cat dev.log
working-directory: examples/${{ matrix.repo }}

# TODO Examples should run their own dev server, which means we can remove
# this.
- name: Run the Inngest dev server
run: npx inngest-cli@latest dev &
working-directory: examples/${{ matrix.repo }}
env:
DO_NOT_TRACK: 1
- name: Wait for the Inngest dev server to start
uses: mydea/action-wait-for-api@v1
with:
url: "http://localhost:8288"
timeout: "60"
# Give the dev server 5 seconds to register with the example
# TODO Check logs instead of sleeping
- name: Wait 5 seconds for dev server registration
Expand Down
4 changes: 0 additions & 4 deletions .gitmodules

This file was deleted.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ Inngest allows you to:
<br />

<p align="center">
<a href="#getting-started">Getting started</a> ·
<a href="#features">Features</a> ·
<a href="#contributing">Contributing</a> ·
<a href="#getting-started">Getting started</a> ·
<a href="#features">Features</a> ·
<a href="#contributing">Contributing</a> ·
<a href="https://www.inngest.com/docs">Documentation</a>
</p>

Expand Down
80 changes: 15 additions & 65 deletions etc/inngest.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,19 @@ export interface ClientOptions {
name: string;
}

// Warning: (ae-forgotten-export) The symbol "EventName" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "InngestFunction" needs to be exported by the entry point index.d.ts
//
// @public
export const createFunction: <Event_1 extends EventPayload>(nameOrOpts: string | FunctionOptions, event: EventName<Event_1>, fn: SingleStepFn<Event_1, string, "step">) => InngestFunction<any>;

// @public
export const createScheduledFunction: (nameOrOpts: string | FunctionOptions, cron: string, fn: SingleStepFn<null, string, "step">) => InngestFunction<any>;

// @public
export const createStepFunction: <T extends EventPayload>(nameOrOpts: string | FunctionOptions, event: EventName<T>, fn: MultiStepFn<Record<T["name"], T>, T["name"], string, "step">) => InngestFunction<any>;

// @public
export interface EventPayload {
data: any;
name: string;
ts?: number;
user?: Record<string, any>;
user?: any;
v?: string;
}

// @public
export interface FunctionOptions {
// (undocumented)
fns?: Record<string, any>;
id?: string;
idempotency?: string;
name: string;
Expand All @@ -51,30 +41,14 @@ export interface FunctionOptions {
// @public
export class Inngest<Events extends Record<string, EventPayload>> {
constructor({ name, eventKey, inngestBaseUrl, fetch, }: ClientOptions);
createFunction<Event extends keyof Events, Name extends string, Fn extends SingleStepFn<Events[Event], Name, "step">>(
name: Name,
event: Event,
fn: Fn): InngestFunction<Events>;
createFunction<Event extends keyof Events, Opts extends FunctionOptions, Fn extends SingleStepFn<Events[Event], Opts extends FunctionOptions ? Opts["name"] : string, "step">>(
opts: Opts,
event: Event,
fn: Fn): InngestFunction<Events>;
createScheduledFunction<Name extends string>(
name: Name,
cron: string,
fn: SingleStepFn<null, Name, "step">): InngestFunction<Events>;
createScheduledFunction<Opts extends FunctionOptions>(
opts: Opts,
cron: string,
fn: SingleStepFn<null, Opts extends FunctionOptions ? Opts["name"] : string, "step">): InngestFunction<Events>;
createStepFunction<Event extends keyof Events, Name extends string, Fn extends MultiStepFn<Events, Event, Name, "step">>(
name: Name,
event: Event,
fn: Fn): InngestFunction<Events>;
createStepFunction<Event extends keyof Events, Opts extends FunctionOptions, Fn extends MultiStepFn<Events, Event, Opts extends FunctionOptions ? Opts["name"] : string, "step">>(
opts: Opts,
event: Event,
fn: Fn): InngestFunction<Events>;
// Warning: (ae-forgotten-export) The symbol "TriggerOptions" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "Handler" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "InngestFunction" needs to be exported by the entry point index.d.ts
//
// (undocumented)
createFunction<Trigger extends TriggerOptions<keyof Events & string>, NameOrOpts extends string | FunctionOptions>(nameOrOpts: NameOrOpts, trigger: Trigger, fn: Handler<Events, Trigger extends string ? Trigger : Trigger extends {
event: string;
} ? Trigger["event"] : string, NameOrOpts extends FunctionOptions ? NameOrOpts : never>): InngestFunction<Events>;
readonly inngestBaseUrl: URL;
readonly name: string;
// Warning: (ae-forgotten-export) The symbol "SingleOrArray" needs to be exported by the entry point index.d.ts
Expand All @@ -86,10 +60,10 @@ export class Inngest<Events extends Record<string, EventPayload>> {
eventKey: string): void;
}

// Warning: (ae-forgotten-export) The symbol "Handler" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "Handler_2" needs to be exported by the entry point index.d.ts
//
// @public
export class InngestCommHandler<H extends Handler, TransformedRes> {
export class InngestCommHandler<H extends Handler_2, TransformedRes> {
constructor(
frameworkName: string,
appNameOrInngest: string | Inngest<any>,
Expand All @@ -116,7 +90,7 @@ export class InngestCommHandler<H extends Handler, TransformedRes> {
// Warning: (ae-forgotten-export) The symbol "StepRunResponse" needs to be exported by the entry point index.d.ts
//
// (undocumented)
protected runStep(functionId: string, stepId: string, data: any): Promise<StepRunResponse>;
protected runStep(functionId: string, stepId: string | null, data: any): Promise<StepRunResponse>;
// Warning: (ae-forgotten-export) The symbol "RegisterRequest" needs to be exported by the entry point index.d.ts
protected get sdkHeader(): [
prefix: string,
Expand All @@ -137,17 +111,6 @@ export class InngestCommHandler<H extends Handler, TransformedRes> {
protected validateSignature(): boolean;
}

// @public
export type MultiStepFn<Events extends Record<string, EventPayload>, Event extends keyof Events, FnId, StepId> = (arg: MultiStepFnArgs<Events, Event, FnId, StepId>) => void;

// @public
export interface MultiStepFnArgs<Events extends Record<string, EventPayload>, Event extends keyof Events, FnId, StepId> extends SingleStepFnArgs<Events[Event], FnId, StepId> {
// Warning: (ae-forgotten-export) The symbol "createStepTools" needs to be exported by the entry point index.d.ts
//
// (undocumented)
tools: ReturnType<typeof createStepTools<Events, Event>>[0];
}

// @public
export class NonRetriableError extends Error {
constructor(message: string, options?: {
Expand All @@ -173,20 +136,7 @@ functions: InngestFunction<any>[],
opts?: RegisterOptions) => any;

// @public
export type SingleStepFn<Event, FnId, StepId> = (arg: SingleStepFnArgs<Event, FnId, StepId>) => any;

// @public
export interface SingleStepFnArgs<Event, FnId, StepId> {
ctx: {
fn_id: FnId;
step_id: StepId;
};
event: Event;
steps: Record<string, never>;
}

// @public
export type TimeStr = `${`${number}w` | ""}${`${number}d` | ""}${`${number}h` | ""}${`${number}m` | ""}${`${number}s` | ""}${`${number}ms` | ""}`;
export type TimeStr = `${`${number}w` | ""}${`${number}d` | ""}${`${number}h` | ""}${`${number}m` | ""}${`${number}s` | ""}`;

// (No @packageDocumentation comment for this package)

Expand Down
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
testMatch: ["<rootDir>/src/**/*.test.ts", "!**/examples/test/**/*.test.ts"],
testMatch: ["<rootDir>/src/**/*.test.ts", "!**/examples/**/*.test.ts"],
};
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "inngest",
"version": "0.9.2",
"version": "0.10.0-next.7",
"description": "Official SDK for Inngest.com",
"main": "./index.js",
"types": "./index.d.ts",
Expand All @@ -20,7 +20,7 @@
"pb:landing": "yarn build:landing && node -e 'const page = JSON.stringify(require(\"fs\").readFileSync(\"./landing/dist/index.html\").toString()); console.log(\"export const landing = \" + page);' > ./src/landing.ts && npx prettier ./src/landing.ts --write",
"build": "yarn run clean && tsc --project tsconfig.build.json",
"test": "node --expose-gc --max-old-space-size=4096 ./node_modules/.bin/jest --silent --logHeapUsage --maxWorkers=8 --coverage --ci --verbose",
"test:examples": "node --expose-gc --max-old-space-size=4096 ./node_modules/.bin/jest --silent --logHeapUsage --maxWorkers=8 --testMatch \"**/examples/test/*.test.ts\" --ci --verbose",
"test:examples": "node --expose-gc --max-old-space-size=4096 ./node_modules/.bin/jest --logHeapUsage --maxWorkers=8 --testMatch \"**/examples/**/*.test.ts\" --ci --verbose",
"clean": "rm -rf ./dist",
"lint": "eslint .",
"postversion": "yarn run build && yarn run build:copy",
Expand Down Expand Up @@ -55,9 +55,9 @@
"cross-fetch": "^3.1.5",
"h3": "^1.0.2",
"hash.js": "^1.1.7",
"json-stringify-deterministic": "^1.0.8",
"ms": "^2.1.3",
"serialize-error-cjs": "^0.1.3",
"sigmund": "^1.0.1",
"type-fest": "^3.5.1",
"zod": "^3.19.1"
},
Expand All @@ -69,7 +69,6 @@
"@types/jest": "^27.4.1",
"@types/ms": "^0.7.31",
"@types/sha.js": "^2.4.0",
"@types/sigmund": "^1.0.0",
"@typescript-eslint/eslint-plugin": "^5.47.0",
"@typescript-eslint/parser": "^5.47.0",
"concurrently": "^7.4.0",
Expand Down
1 change: 1 addition & 0 deletions src/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export const serve: ServeHandler = (nameOrInngest, fns, opts) => {
if (req.method === "POST") {
return {
fnId: url.searchParams.get(queryKeys.FnId) as string,
stepId: url.searchParams.get(queryKeys.StepId) as string,
data: (await req.json()) as Record<string, any>,
env,
isProduction,
Expand Down
Loading

0 comments on commit 4ff191a

Please sign in to comment.