Skip to content

Commit

Permalink
Merge branch 'main' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
ipeleg committed Oct 26, 2023
2 parents 4d26c38 + 2e14e16 commit c6da8df
Show file tree
Hide file tree
Showing 19 changed files with 559 additions and 19 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/continuous-delivery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,24 @@ jobs:
./prod/platform/relayer-zora-testnet.yaml \
${{ github.sha }} \
relayer-zora-testnet
./utils/version_update.sh \
./prod/platform/relayer-bsc.yaml \
${{ github.sha }} \
relayer-bsc
./utils/version_update.sh \
./prod/platform/relayer-linea.yaml \
${{ github.sha }} \
relayer-linea
./utils/version_update.sh \
./prod/platform/relayer-polygon-zkevm.yaml \
${{ github.sha }} \
relayer-polygon-zkevm
./utils/version_update.sh \
./prod/platform/relayer-zksync.yaml \
${{ github.sha }} \
relayer-zksync
./utils/version_update.sh \
./prod/platform/relayer-arbitrum-nova.yaml \
${{ github.sha }} \
relayer-arbitrum-nova
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"dependencies": {
"@bull-board/express": "^3.9.0",
"@opensea/stream-js": "^0.0.20",
"@reservoir0x/sdk": "^0.0.338",
"@reservoir0x/sdk": "^0.0.342",
"@types/date-fns": "^2.6.0",
"@types/express": "^4.17.13",
"@types/ioredis": "^4.28.7",
Expand Down
13 changes: 12 additions & 1 deletion src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import {
} from "../jobs/seaport-sync/backfill-queue";
import { addToSyncTokenQueue } from "../jobs/sync-token";
import { addToX2Y2BackfillQueue } from "../jobs/x2y2-sync/queues/backfill-queue";
import { addToCoinbaseBackfillQueue } from "../jobs/coinbase-sync/backfill-queue";
import { addToElementBackfillQueue } from "../jobs/element-sync/queues/backfill-queue";
import { addToCoinbaseBackfillQueue } from "../jobs/coinbase-sync/backfill-queue";
import { addToOkxBackfillQueue } from "../jobs/okx-sync/queues/backfill-queue-listings";

export const start = async () => {
const app = express();
Expand Down Expand Up @@ -97,6 +98,16 @@ export const start = async () => {
})
);

app.post(
"/backfill/okx",
asyncHandler(async (req, res) => {
res.status(202).json({ message: "Request accepted" });

const runId = String(req.body.runId);
await addToOkxBackfillQueue(runId);
})
);

app.post(
"/backfill/coinbase",
asyncHandler(async (req, res) => {
Expand Down
12 changes: 12 additions & 0 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ const log = (level: "debug" | "error" | "info" | "warn") => {
case 11155111:
network = "sepolia";
break;

case 59144:
network = "linea";
break;

case 1101:
network = "polygon-zkevm";
break;

case 324:
network = "zksync";
break;
}

const service = `relayer-${network}`;
Expand Down
12 changes: 12 additions & 0 deletions src/common/tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ if (process.env.DATADOG_AGENT_URL) {
case 11155111:
network = "sepolia";
break;

case 59144:
network = "linea";
break;

case 1101:
network = "polygon-zkevm";
break;

case 324:
network = "zksync";
break;
}

const service = `relayer-${network}`;
Expand Down
5 changes: 4 additions & 1 deletion src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ export const config = {
elementApiKey: String(process.env.ELEMENT_API_KEY),
coinbaseApiKey: String(process.env.COINBASE_API_KEY),
blurApiKey: String(process.env.BLUR_API_KEY),
okxApiKey: String(process.env.OKX_API_KEY),
okxSecretKey: String(process.env.OKX_SECRET_KEY),
okxPassphrase: String(process.env.OKX_PASSPHRASE),

blurUrl: String(process.env.BLUR_URL),
openseaApiUrl: String(process.env.OPENSEA_API_URL),

doBackgroundWork: Boolean(Number(process.env.DO_BACKGROUND_WORK)),
doBackfillWork: Boolean(Number(process.env.DO_BACKFILL_WORK)),
doRealtimeWork: Boolean(Number(process.env.DO_REALTIME_WORK)),
doLiveWork: Boolean(Number(process.env.DO_LIVE_WORK)),

doOpenseaWork: Boolean(Number(process.env.DO_OPENSEA_WORK)),

databaseUrl: String(process.env.DATABASE_URL),
Expand Down
27 changes: 22 additions & 5 deletions src/jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ import "./coinbase-sync";
import "./rarible-sync";
import "./manifold-sync";
import "./looksrare-v2-sync";
import "./okx-sync";

import * as relayOrders from "./relay-orders";
import * as syncToken from "./sync-token";

import * as looksRareV2SyncRealtime from "./looksrare-v2-sync/realtime-queue";
import * as looksRareV2SyncSeaportRealtime from "./looksrare-v2-sync/realtime-queue-seaport";
import * as relayOrders from "./relay-orders";

import * as seaportSyncListingsRealtime from "./seaport-sync/realtime-queue";
import * as seaportSyncOffersRealtime from "./seaport-sync/realtime-queue-offers";
import * as seaportSyncCollectionOffersRealtime from "./seaport-sync/realtime-queue-collection-offers";

import * as seaportSyncBackfill from "./seaport-sync/backfill-queue";
import * as syncToken from "./sync-token";

import * as x2y2SyncListingsRealtime from "./x2y2-sync/queues/realtime-queue";
import * as x2y2SyncOffersRealtime from "./x2y2-sync/queues/realtime-queue-offers";
import * as x2y2SyncListingsBackfill from "./x2y2-sync/queues/backfill-queue";
import * as x2y2SyncOffersBackfill from "./x2y2-sync/queues/backfill-queue-offers";

import * as raribleSyncRealtime from "./rarible-sync/queues/realtime-queue";
import * as raribleSyncBackfill from "./rarible-sync/queues/backfill-queue";

Expand All @@ -35,27 +39,40 @@ import * as coinbaseSyncOffersRealtime from "./coinbase-sync/realtime-queue-offe

import * as manifoldSyncListingsRealtime from "./manifold-sync/realtime-queue";

import * as okxSyncListingsRealtime from "./okx-sync/queues/realtime-queue-listings";
import * as okxSyncListingsBackfill from "./okx-sync/queues/backfill-queue-listings";

export const allQueues = [
relayOrders.queue,
syncToken.queue,

looksRareV2SyncRealtime.realtimeQueue,
looksRareV2SyncSeaportRealtime.realtimeQueue,
relayOrders.queue,

seaportSyncListingsRealtime.realtimeQueue,
seaportSyncOffersRealtime.realtimeQueue,
seaportSyncCollectionOffersRealtime.realtimeQueue,
seaportSyncBackfill.backfillQueue,
syncToken.queue,

x2y2SyncListingsRealtime.realtimeQueue,
x2y2SyncOffersRealtime.realtimeQueue,
x2y2SyncListingsBackfill.backfillQueue,
x2y2SyncOffersBackfill.backfillQueue,

elementSyncListingsRealtime.realtimeQueue,
elementSyncOffersRealtime.realtimeQueue,
elementSyncListingsBackfill.backfillQueue,
elementSyncOffersBackfill.backfillQueue,

coinbaseSyncListingsRealtime.realtimeQueue,
coinbaseSyncListingsBackfill.backfillQueue,
coinbaseSyncOffersRealtime.realtimeQueue,

raribleSyncRealtime.realtimeQueue,
raribleSyncBackfill.backfillQueue,

manifoldSyncListingsRealtime.realtimeQueue,

okxSyncListingsRealtime.realtimeQueue,
okxSyncListingsBackfill.backfillQueue,
];
18 changes: 18 additions & 0 deletions src/jobs/okx-sync/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import _ from "lodash";
import cron from "node-cron";

import * as realtimeQueueListings from "./queues/realtime-queue-listings";
import { acquireLock } from "../../common/redis";
import { config } from "../../config";
import { Okx } from "../../utils/okx";

if (config.doRealtimeWork) {
if (new Okx().getChainName()) {
cron.schedule("*/20 * * * * *", async () => {
const lockAcquired = await acquireLock(realtimeQueueListings.getLockKey(), 30);
if (lockAcquired) {
await realtimeQueueListings.addToRealtimeQueue();
}
});
}
}
77 changes: 77 additions & 0 deletions src/jobs/okx-sync/queues/backfill-queue-listings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Job, Queue, QueueScheduler, Worker } from "bullmq";

import { logger } from "../../../common/logger";
import { redis } from "../../../common/redis";
import { config } from "../../../config";
import { fetchOrders } from "../utils";

const BACKFILL_QUEUE_NAME = "backfill-okx-listings-sync";

export const backfillQueue = new Queue(BACKFILL_QUEUE_NAME, {
connection: redis.duplicate(),
defaultJobOptions: {
attempts: 1,
backoff: {
type: "fixed",
delay: 3,
},
timeout: 60000,
removeOnComplete: 100,
removeOnFail: 1000,
},
});
new QueueScheduler(BACKFILL_QUEUE_NAME, { connection: redis.duplicate() });

if (config.doBackfillWork) {
const realtimeWorker = new Worker(
BACKFILL_QUEUE_NAME,
async (job: Job) => {
const { runId } = job.data;

try {
const createBefore = await redis
.get(getCreateBeforeKey(runId))
.then((c) => (c ? c : Math.floor(Date.now() / 1000)));

logger.info(
BACKFILL_QUEUE_NAME,
`Start syncing OKX listings (runId=${runId} createBefore=${createBefore})`
);

const { minTimestamp } = await fetchOrders({
side: "sell",
createBefore: Number(createBefore),
maxIterations: 10,
});
if (minTimestamp && minTimestamp + 1 !== Number(createBefore)) {
await redis.set(getCreateBeforeKey(runId), minTimestamp + 1);
await addToOkxBackfillQueue(runId);
}
} catch (error) {
logger.error(
BACKFILL_QUEUE_NAME,
JSON.stringify({
message: `OKX listings sync failed (runId=${runId})`,
error,
stack: (error as any).stack,
attempts: job.attemptsMade,
syncSource: "OKX",
})
);
throw error;
}
},
{ connection: redis.duplicate(), concurrency: 1 }
);

realtimeWorker.on("error", async (error) => {
logger.error(BACKFILL_QUEUE_NAME, `Worker errored: ${error}`);
});
}

export const addToOkxBackfillQueue = async (runId = "", delayMs = 0) => {
await backfillQueue.add(BACKFILL_QUEUE_NAME, { runId }, { delay: delayMs });
};

const getCreateBeforeKey = (runId: string) => `${BACKFILL_QUEUE_NAME}-${runId}-create-before`;
export const getLockKey = (runId: string) => `${BACKFILL_QUEUE_NAME}-${runId}-lock`;
83 changes: 83 additions & 0 deletions src/jobs/okx-sync/queues/realtime-queue-listings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Job, Queue, QueueScheduler, Worker } from "bullmq";

import { logger } from "../../../common/logger";
import { redis, releaseLock } from "../../../common/redis";
import { config } from "../../../config";
import { fetchOrders } from "../utils";

const REALTIME_QUEUE_NAME = "realtime-okx-listings-sync";

export const realtimeQueue = new Queue(REALTIME_QUEUE_NAME, {
connection: redis.duplicate(),
defaultJobOptions: {
attempts: 1,
backoff: {
type: "fixed",
delay: 3,
},
timeout: 60000,
removeOnComplete: 100,
removeOnFail: 1000,
},
});
new QueueScheduler(REALTIME_QUEUE_NAME, { connection: redis.duplicate() });

if (config.doRealtimeWork) {
const realtimeWorker = new Worker(
REALTIME_QUEUE_NAME,
async (job: Job) => {
try {
const createAfter = await redis
.get(getCreateAfterKey())
.then((c) => (c ? c : Math.floor(Date.now() / 1000 - 30)));

logger.info(REALTIME_QUEUE_NAME, `Start syncing OKX listings (createAfter=${createAfter})`);

const { maxTimestamp } = await fetchOrders({
side: "sell",
createAfter: Number(createAfter),
});
if (maxTimestamp) {
await redis.set(getCreateAfterKey(), maxTimestamp - 1);
}
} catch (error) {
logger.error(
REALTIME_QUEUE_NAME,
JSON.stringify({
message: "OKX listings sync failed",
error,
stack: (error as any).stack,
attempts: job.attemptsMade,
syncSource: "OKX",
})
);
throw error;
}
},
{ connection: redis.duplicate(), concurrency: 2 }
);

realtimeWorker.on("completed", async (job) => {
await releaseLock(getLockKey(), false);

if (job.attemptsMade > 0) {
logger.info(
REALTIME_QUEUE_NAME,
`OKX listings sync recovered (attempts=${job.attemptsMade})`
);
}
});

realtimeWorker.on("error", async (error) => {
await releaseLock(getLockKey(), false);

logger.error(REALTIME_QUEUE_NAME, `Worker errored: ${error}`);
});
}

export const addToRealtimeQueue = async (delayMs: number = 0) => {
await realtimeQueue.add(REALTIME_QUEUE_NAME, {}, { delay: delayMs });
};

const getCreateAfterKey = () => `${REALTIME_QUEUE_NAME}-create-after`;
export const getLockKey = () => `${REALTIME_QUEUE_NAME}-lock`;
Loading

0 comments on commit c6da8df

Please sign in to comment.