Skip to content

Commit

Permalink
Add tools.sendEvent to step tooling to ensure events aren't duplica…
Browse files Browse the repository at this point in the history
…ted (#100)

## Description

Cherry pick's the core commit from #75 to clean up history and
conflicts.

---------

Co-authored-by: Jack Williams <[email protected]>
  • Loading branch information
djfarrelly and jpwilliams authored Feb 14, 2023
1 parent a64decf commit e8fa158
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 144 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ jobs:
working-directory: examples/${{ matrix.repo }}

- name: Run the Inngest dev server
run: npx inngest-cli@latest dev &
run: npx inngest-cli@latest dev > cli.log 2>&1 &
working-directory: examples/${{ matrix.repo }}
env:
DO_NOT_TRACK: 1
Expand Down Expand Up @@ -202,9 +202,6 @@ jobs:
with:
url: "http://localhost:3000/api/inngest"
timeout: "60"
- if: ${{ always() }}
run: cat dev.log
working-directory: examples/${{ matrix.repo }}

# Give the dev server 5 seconds to register with the example
# TODO Check logs instead of sleeping
Expand All @@ -215,3 +212,11 @@ jobs:
- name: Run integration test suite
run: yarn test:examples
working-directory: sdk

- if: ${{ always() }}
run: cat dev.log
working-directory: examples/${{ matrix.repo }}

- if: ${{ always() }}
run: cat cli.log
working-directory: examples/${{ matrix.repo }}
3 changes: 2 additions & 1 deletion src/components/Inngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ export class Inngest<Events extends Record<string, EventPayload>> {
NameOrOpts extends FunctionOptions ? NameOrOpts : never
>
): InngestFunction<Events> {
return new InngestFunction<Events>(
return new InngestFunction(
this,
typeof nameOrOpts === "string" ? { name: nameOrOpts } : nameOrOpts,
typeof trigger === "string" ? { event: trigger } : trigger,
fn
Expand Down
7 changes: 5 additions & 2 deletions src/components/InngestFunction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ describe("#generateID", () => {
it("Returns a correct name", () => {
const fn = () =>
new InngestFunction(
new Inngest({ name: "test" }),
{ name: "HELLO 👋 there mr Wolf 🥳!" },
{ event: "test/event.name" },
() => undefined
Expand Down Expand Up @@ -61,7 +62,8 @@ describe("runFn", () => {
let ret: Awaited<ReturnType<typeof fn["runFn"]>>;

beforeAll(async () => {
fn = new InngestFunction<TestEvents>(
fn = new InngestFunction(
new Inngest<TestEvents>({ name: "test" }),
{ name: "Foo" },
{ event: "foo" },
flowFn
Expand Down Expand Up @@ -89,7 +91,8 @@ describe("runFn", () => {
let fn: InngestFunction<TestEvents>;

beforeAll(() => {
fn = new InngestFunction<TestEvents>(
fn = new InngestFunction(
new Inngest<TestEvents>({ name: "test" }),
{ name: "Foo" },
{ event: "foo" },
badFlowFn
Expand Down
7 changes: 6 additions & 1 deletion src/components/InngestFunction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
OutgoingOp,
StepOpCode,
} from "../types";
import { Inngest } from "./Inngest";
import { createStepTools, TickOp } from "./InngestStepTools";

/**
Expand All @@ -32,6 +33,7 @@ export class InngestFunction<Events extends Record<string, EventPayload>> {
readonly #opts: FunctionOptions;
readonly #trigger: FunctionTrigger<keyof Events>;
readonly #fn: (...args: any[]) => any;
readonly #client: Inngest<Events>;

/**
* A stateless Inngest function, wrapping up function configuration and any
Expand All @@ -41,13 +43,16 @@ export class InngestFunction<Events extends Record<string, EventPayload>> {
* trigger remotely.
*/
constructor(
client: Inngest<Events>,

/**
* Options
*/
opts: FunctionOptions,
trigger: FunctionTrigger<keyof Events>,
fn: (...args: any[]) => any
) {
this.#client = client;
this.#opts = opts;
this.#trigger = trigger;
this.#fn = fn;
Expand Down Expand Up @@ -169,7 +174,7 @@ export class InngestFunction<Events extends Record<string, EventPayload>> {
* user's function has run, we can check the mutated state of these to see
* if an op has been submitted or not.
*/
const [tools, state] = createStepTools();
const [tools, state] = createStepTools(this.#client);

/**
* Create args to pass in to our function. We blindly pass in the data and
Expand Down
39 changes: 35 additions & 4 deletions src/components/InngestStepTools.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import ms from "ms";
import { assertType } from "type-plus";
import { StepOpCode } from "../types";
import { Inngest } from "./Inngest";
import { createStepTools, TickOp } from "./InngestStepTools";

describe("waitForEvent", () => {
const client = new Inngest({ name: "test" });
let waitForEvent: ReturnType<typeof createStepTools>[0]["waitForEvent"];
let state: ReturnType<typeof createStepTools>[1];
let getOp: () => TickOp | undefined;

beforeEach(() => {
[{ waitForEvent }, state] = createStepTools();
[{ waitForEvent }, state] = createStepTools(client);
getOp = () => Object.values(state.tickOps)[0];
});

Expand Down Expand Up @@ -87,12 +89,13 @@ describe("waitForEvent", () => {
});

describe("run", () => {
const client = new Inngest({ name: "test" });
let run: ReturnType<typeof createStepTools>[0]["run"];
let state: ReturnType<typeof createStepTools>[1];
let getOp: () => TickOp | undefined;

beforeEach(() => {
[{ run }, state] = createStepTools();
[{ run }, state] = createStepTools(client);
getOp = () => Object.values(state.tickOps)[0];
});

Expand Down Expand Up @@ -155,12 +158,13 @@ describe("run", () => {
});

describe("sleep", () => {
const client = new Inngest({ name: "test" });
let sleep: ReturnType<typeof createStepTools>[0]["sleep"];
let state: ReturnType<typeof createStepTools>[1];
let getOp: () => TickOp | undefined;

beforeEach(() => {
[{ sleep }, state] = createStepTools();
[{ sleep }, state] = createStepTools(client);
getOp = () => Object.values(state.tickOps)[0];
});

Expand All @@ -180,12 +184,13 @@ describe("sleep", () => {
});

describe("sleepUntil", () => {
const client = new Inngest({ name: "test" });
let sleepUntil: ReturnType<typeof createStepTools>[0]["sleepUntil"];
let state: ReturnType<typeof createStepTools>[1];
let getOp: () => TickOp | undefined;

beforeEach(() => {
[{ sleepUntil }, state] = createStepTools();
[{ sleepUntil }, state] = createStepTools(client);
getOp = () => Object.values(state.tickOps)[0];
});

Expand Down Expand Up @@ -233,3 +238,29 @@ describe("sleepUntil", () => {
);
});
});

describe("sendEvent", () => {
const client = new Inngest({ name: "test" });
let sendEvent: ReturnType<typeof createStepTools>[0]["sendEvent"];
let state: ReturnType<typeof createStepTools>[1];
let getOp: () => TickOp | undefined;

beforeEach(() => {
[{ sendEvent }, state] = createStepTools(client);
getOp = () => Object.values(state.tickOps)[0];
});

test("return Step step op code", () => {
void sendEvent("step", { data: "foo" });
expect(getOp()).toMatchObject({
op: StepOpCode.StepPlanned,
});
});

test("return step name as name", () => {
void sendEvent("step", { data: "foo" });
expect(getOp()).toMatchObject({
name: "step",
});
});
});
38 changes: 36 additions & 2 deletions src/components/InngestStepTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { sha1 } from "hash.js";
import stringify from "json-stringify-deterministic";
import { Jsonify } from "type-fest";
import { timeStr } from "../helpers/strings";
import type { ObjectPaths } from "../helpers/types";
import type { ObjectPaths, PartialK, SingleOrArray } from "../helpers/types";
import { EventPayload, HashedOp, Op, StepOpCode } from "../types";
import { Inngest } from "./Inngest";

export interface TickOp extends HashedOp {
fn?: (...args: any[]) => any;
Expand All @@ -23,7 +24,9 @@ export interface TickOp extends HashedOp {
export const createStepTools = <
Events extends Record<string, EventPayload>,
TriggeringEvent extends keyof Events
>() => {
>(
client: Inngest<Events>
) => {
const state: {
/**
* The tree of all found ops in the entire invocation.
Expand Down Expand Up @@ -187,6 +190,37 @@ export const createStepTools = <
* a generic type for that function as it will appear in the user's code.
*/
const tools = {
/**
* Send one or many events to Inngest. Should always be used in place of
* `inngest.send()` to ensure that the event send is successfully retried
* and not sent multiple times due to memoisation.
*
* @example
* ```ts
* await step.sendEvent("app/user.created", { data: { id: 123 } });
* ```
*
* Returns a promise that will resolve once the event has been sent.
*/
sendEvent: createTool<
<Event extends keyof Events & string>(
name: Event,
payload: SingleOrArray<
PartialK<Omit<Events[Event], "name" | "v">, "ts">
>
) => Promise<void>
>(
(name, _payload) => {
return {
op: StepOpCode.StepPlanned,
name,
};
},
(name, payload) => {
return client.send(name, payload);
}
),

/**
* Wait for a particular event to be received before continuing. When the
* event is received, it will be returned.
Expand Down
5 changes: 4 additions & 1 deletion src/examples/client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { Inngest } from "inngest";

export const inngest = new Inngest({ name: "Example App" });
export const inngest = new Inngest({
name: "Example App",
eventKey: "test-key-123",
});
2 changes: 2 additions & 0 deletions src/examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import parallelWork from "./parallel-work";
import polling from "./polling";
import promiseAll from "./promise-all";
import promiseRace from "./promise-race";
import sendEvent from "./send-event";

export default [
helloWorld,
Expand All @@ -12,4 +13,5 @@ export default [
parallelWork,
parallelReduce,
polling,
sendEvent,
];
12 changes: 12 additions & 0 deletions src/examples/send-event/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Sending an Event Example

This examples demonstrates sending an event within a step function.

It is triggered by a `demo/send.event` event, and runs a single step to reliably send an event to Inngest.

```mermaid
graph TD
Inngest -->|demo/send.event| Function
Function --> step1["step.sendEvent('app/my.event.happened')"]
step1 --> ret[Complete]
```
73 changes: 73 additions & 0 deletions src/examples/send-event/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import fetch from "cross-fetch";
import {
eventRunWithName,
introspectionSchema,
receivedEventWithName,
runHasTimeline,
sendEvent,
} from "../../test/helpers";

describe("introspection", () => {
const specs = [
{ label: "SDK UI", url: "http://127.0.0.1:3000/api/inngest?introspect" },
{ label: "Dev server UI", url: "http://localhost:8288/dev" },
];

specs.forEach(({ label, url }) => {
test(`should show registered functions in ${label}`, async () => {
const res = await fetch(url);
const data = introspectionSchema.parse(await res.json());

expect(data.functions).toContainEqual({
name: "Send event",
id: expect.stringMatching(/^.*-send-event$/),
triggers: [{ event: "demo/send.event" }],
steps: {
step: {
id: "step",
name: "step",
runtime: {
type: "http",
url: expect.stringMatching(
/^http.+\?fnId=.+-send-event&stepId=step$/
),
},
},
},
});
});
});
});

describe("run", () => {
let eventId: string;
let runId: string;

beforeAll(async () => {
eventId = await sendEvent("demo/send.event");
});

test("runs in response to 'demo/send.event'", async () => {
runId = await eventRunWithName(eventId, "Send event");
expect(runId).toEqual(expect.any(String));
});

test("ran Step 'app/my.event.happened'", async () => {
await expect(
runHasTimeline(runId, {
__typename: "StepEvent",
stepType: "COMPLETED",
name: "app/my.event.happened",
})
).resolves.toBeDefined();
});

test("sent event 'app/my.event.happened'", async () => {
const event = await receivedEventWithName("app/my.event.happened");
expect(event).toBeDefined();
expect(JSON.parse(event?.payload ?? {})).toMatchObject({ foo: "bar" });
});
});
9 changes: 9 additions & 0 deletions src/examples/send-event/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { inngest } from "../client";

export default inngest.createFunction(
{ name: "Send event" },
"demo/send.event",
async ({ step }) => {
await step.sendEvent("app/my.event.happened", { data: { foo: "bar" } });
}
);
2 changes: 2 additions & 0 deletions src/express.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Inngest } from "./components/Inngest";
import { InngestCommHandler } from "./components/InngestCommHandler";
import { InngestFunction } from "./components/InngestFunction";
import * as ExpressHandler from "./express";
Expand All @@ -17,6 +18,7 @@ describe("InngestCommHandler", () => {
describe("registerBody", () => {
it("Includes correct base URL for functions", () => {
const fn = new InngestFunction(
new Inngest({ name: "test" }),
{ name: "Test Express Function" },
{ event: "test/event.name" },
() => undefined
Expand Down
Loading

0 comments on commit e8fa158

Please sign in to comment.