Skip to content

Commit

Permalink
feat(cu): aop6 boot loader #730
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Aug 21, 2024
1 parent 8a686f2 commit 3fae9a1
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 62 deletions.
9 changes: 8 additions & 1 deletion servers/cu/src/domain/client/ao-su.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,14 @@ export const loadProcessWith = ({ fetch, logger }) => {
* SU is currently sending back timestamp in milliseconds,
*/
timestamp: path(['timestamp'])
})
}),
/**
* These were added for the aop6 Boot Loader change so that
* the Process can be used properly downstream.
*/
processId: always(processId),
timestamp: path(['timestamp']),
nonce: always(0)
}))
}
}
Expand Down
6 changes: 5 additions & 1 deletion servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ export const loadProcessSchema = z.function()
suUrl: z.string().url(),
processId: z.string().min(1)
}))
.returns(z.promise(processSchema.omit({ id: true })))
.returns(z.promise(processSchema.omit({ id: true }).extend({
timestamp: z.number().min(1),
nonce: z.number().min(0),
processId: z.string().min(1)
})))

export const loadTimestampSchema = z.function()
.args(z.object({
Expand Down
34 changes: 28 additions & 6 deletions servers/cu/src/domain/lib/loadMessageMeta.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Resolved, fromPromise, of } from 'hyper-async'
import { z } from 'zod'

import { findProcessSchema, loadMessageMetaSchema, locateProcessSchema } from '../dal.js'
import { findProcessSchema, loadMessageMetaSchema, locateProcessSchema, loadProcessSchema } from '../dal.js'
import { findRawTag, trimSlash } from '../utils.js'

/**
Expand Down Expand Up @@ -67,16 +67,38 @@ export function loadMessageMetaWith (env) {
env = { ...env, logger }

const loadMessageMeta = fromPromise(loadMessageMetaSchema.implement(env.loadMessageMeta))
const loadProcess = fromPromise(loadProcessSchema.implement(env.loadProcess))

const maybeCached = maybeCachedWith(env)

return (ctx) => {
return maybeCached(ctx.processId)
.chain(({ url }) => loadMessageMeta({
suUrl: trimSlash(url),
processId: ctx.processId,
messageTxId: ctx.messageTxId
}))
.chain(({ url }) => {
/**
* This condition handles the aop6 Boot Loader functionality
* It is here so that this function can be called with a process id
* as a message id and the cu will evaluate the Process with the Process
* itself as the first Message without there necessarily being any more
* Messages on the Process.
*/
if (ctx.processId === ctx.messageTxId) {
return loadProcess({
suUrl: trimSlash(url),
processId: ctx.processId,
messageTxId: ctx.messageTxId
})
}

/**
* Otherwise, this is just being called with a Message id
* so there is no need to fetch the Process here
*/
return loadMessageMeta({
suUrl: trimSlash(url),
processId: ctx.processId,
messageTxId: ctx.messageTxId
})
})
.map(ctxSchema.parse)
// .map(logger.tap('Loaded message process and timestamp and appended to ctx %j'))
}
Expand Down
24 changes: 24 additions & 0 deletions servers/cu/src/domain/lib/loadMessageMeta.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ describe('loadMessageMeta', () => {
})
return { processId: 'process-123', timestamp: 1697574792000, nonce: 1 }
},
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags: [
{ name: 'Module', value: 'foobar' },
{ name: 'Data-Protocol', value: 'ao' },
{ name: 'Type', value: 'Process' }
],
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})

Expand Down Expand Up @@ -61,6 +73,18 @@ describe('loadMessageMeta', () => {
})
return { processId: 'process-123', timestamp: 1697574792000, nonce: 1 }
},
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags: [
{ name: 'Module', value: 'foobar' },
{ name: 'Data-Protocol', value: 'ao' },
{ name: 'Type', value: 'Process' }
],
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})

Expand Down
122 changes: 80 additions & 42 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { z } from 'zod'
import ms from 'ms'

import { streamSchema } from '../model.js'
import { mapFrom } from '../utils.js'
import { mapFrom, parseTags } from '../utils.js'
import { findBlocksSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js'

export const toSeconds = (millis) => Math.floor(millis / 1000)
Expand Down Expand Up @@ -610,60 +610,98 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save
const isColdStart = !ctx.from

/**
* Generate and emit a message that represents the process itself.
* Generate and emit a message that represents the process itself
* if the Process was started before the aop6 Boot Loader change
*
* It will be the first message evaluated by the module
*/

const messages = $messages[Symbol.asyncIterator]()

/**
* { value: any, done: boolean }
*/
let message = await messages.next()

/**
* If this is a cold start
* if (message) then check tags and emit generated if type !== 'Process'
* else convert the Process into a Message and emit it
*/
if (isColdStart) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)
yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
noSave: true,
ordinate: '^',
name: `Process Message ${ctx.id}`,
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
const { value, done } = message
/**
* This condition is to handle 2 cases. Before aop6 ECHO Boot loader,
* The first Message in a stream will be an actual Message. But after
* aop6 the first Message is now the process itself, shaped like a Message
*
* As a result, old Processes that were started before the boot loader
* change, can either have no Messages, or have the first Message with a tag
* of type Message, as opposed to Process. In both these cases on a cold start
* we need to inject the Process as the first Message in the stream, as was
* done prior to the Boot Loader change.
*/
if (done || parseTags(value.Tags).Type !== 'Process') {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)
yield {
/**
* Since a process may be spawned from another process,
* the owner may not always be an "end user" wallet,
* but the MU wallet that signed and pushed the spawn.
*
* The MU sets From-Process on any data item it pushes
* on behalf of a process, including spawns.
*
* So we can set From here using the Process tags
* and owner, just like we do for any other message
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
From: mapFrom({ tags: ctx.tags, owner: ctx.owner }),
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
noSave: true,
ordinate: '^',
name: `Process Message ${ctx.id}`,
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
/**
* Since a process may be spawned from another process,
* the owner may not always be an "end user" wallet,
* but the MU wallet that signed and pushed the spawn.
*
* The MU sets From-Process on any data item it pushes
* on behalf of a process, including spawns.
*
* So we can set From here using the Process tags
* and owner, just like we do for any other message
*/
From: mapFrom({ tags: ctx.tags, owner: ctx.owner }),
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags },
Module: { Id: ctx.moduleId, Owner: ctx.moduleOwner, Tags: ctx.moduleTags }
}
}
}
}

/**
* Emit the merged stream of Cron and Scheduled Messages
*/
for await (const message of $messages) yield message
while (true) {
const { value, done } = message
/**
* We're done, so break the loop
*/
if (done) break

yield value
const next = await messages.next()
message = next
}
})
)
})
Expand Down
41 changes: 29 additions & 12 deletions servers/cu/src/domain/lib/loadProcessMeta.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ describe('loadProcess', () => {
signature: 'sig-123',
anchor: null,
data: 'data-123',
block: { height: 123, timestamp: 1697574792000 }
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}
},
logger
Expand Down Expand Up @@ -116,10 +119,13 @@ describe('loadProcess', () => {
return PROCESS
},
locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }),
loadProcess: async (id) => ({
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags,
block: { height: 123, timestamp: 1697574792000 }
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})
Expand All @@ -139,10 +145,13 @@ describe('loadProcess', () => {
findProcess: async () => { throw { status: 404 } },
saveProcess: async () => { throw { status: 409 } },
locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }),
loadProcess: async (id) => ({
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags,
block: { height: 123, timestamp: 1697574792000 }
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})
Expand All @@ -161,14 +170,16 @@ describe('loadProcess', () => {
findProcess: async () => { throw { status: 404 } },
saveProcess: async () => PROCESS,
locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }),
loadProcess: async (id) => ({
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags: [
{ name: 'Not_Module', value: 'foobar' },
{ name: 'Data-Protocol', value: 'ao' },
{ name: 'Type', value: 'Process' }
],
block: { height: 123, timestamp: 1697574792000 }
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})
Expand All @@ -184,14 +195,17 @@ describe('loadProcess', () => {
findProcess: async () => { throw { status: 404 } },
saveProcess: async () => PROCESS,
locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }),
loadProcess: async (id) => ({
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags: [
{ name: 'Module', value: 'foobar' },
{ name: 'Data-Protocol', value: 'not_ao' },
{ name: 'Type', value: 'Process' }
],
block: { height: 123, timestamp: 1697574792000 }
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})
Expand All @@ -207,14 +221,17 @@ describe('loadProcess', () => {
findProcess: async () => { throw { status: 404 } },
saveProcess: async () => PROCESS,
locateProcess: async ({ processId: id }) => ({ url: 'https://foo.bar' }),
loadProcess: async (id) => ({
loadProcess: async ({ processId }) => ({
owner: { address: 'woohoo', key: 'key-123' },
tags: [
{ name: 'Module', value: 'foobar' },
{ name: 'Data-Protocol', value: 'ao' },
{ name: 'Type', value: 'Not_process' }
],
block: { height: 123, timestamp: 1697574792000 }
block: { height: 123, timestamp: 1697574792000 },
timestamp: 1697574792000,
nonce: 0,
processId
}),
logger
})
Expand Down

0 comments on commit 3fae9a1

Please sign in to comment.