Skip to content

Commit

Permalink
Merge branch 'main' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
georgeroman committed Dec 3, 2023
2 parents 079b335 + 1d3fdd1 commit 8db2d90
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 35 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"dependencies": {
"@bull-board/express": "^3.9.0",
"@opensea/stream-js": "^0.0.20",
"@reservoir0x/sdk": "^0.0.342",
"@reservoir0x/sdk": "^0.0.360",
"@types/date-fns": "^2.6.0",
"@types/express": "^4.17.13",
"@types/ioredis": "^4.28.7",
Expand Down
7 changes: 3 additions & 4 deletions src/jobs/element-sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import * as realtimeQueueOffers from "./queues/realtime-queue-offers";
import { logger } from "../../common/logger";
import { acquireLock, redis } from "../../common/redis";
import { config } from "../../config";
import { Element } from "../../utils/element";

if (config.doRealtimeWork) {
cron.schedule("*/5 * * * * *", async () => {
// Element only supports mainnet
if (_.indexOf([1], config.chainId) !== -1) {
if (new Element().getChainName()) {
const lockAcquired = await acquireLock("element-sync-lock", 60 * 5);
if (lockAcquired) {
const cacheKey = "element-sync-cursor";
Expand All @@ -31,8 +31,7 @@ if (config.doRealtimeWork) {
});

cron.schedule("*/5 * * * * *", async () => {
// Element only supports mainnet
if (_.indexOf([1], config.chainId) !== -1) {
if (new Element().getChainName()) {
const lockAcquired = await acquireLock("element-sync-offers-lock", 60 * 5);
if (lockAcquired) {
const cacheKey = "element-sync-offers-cursor";
Expand Down
2 changes: 1 addition & 1 deletion src/jobs/element-sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const fetchOrders = async (side: "sell" | "buy", listedAfter = 0, listedB
let done = false;
while (!done) {
const url = element.buildFetchOrdersURL({
chain: "eth",
chain: element.getChainName()!,
side: side === "sell" ? "1" : "0",
listed_after: listedAfter > 0 ? listedAfter : undefined,
listed_before: listedBefore > 0 ? listedBefore : undefined,
Expand Down
3 changes: 2 additions & 1 deletion src/jobs/rarible-sync/queues/realtime-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ if (config.doRealtimeWork) {
} else {
await redis.set(cacheKey, newTimestamp);
}
} catch (error) {
} catch (error: any) {
logger.error(
REALTIME_QUEUE_NAME,
JSON.stringify({
message: `Rarible sync failed attempts=${job.attemptsMade}, error=${error}`,
error,
stack: error.stack,
attempts: job.attemptsMade,
syncSource: "Rarible",
})
Expand Down
28 changes: 14 additions & 14 deletions src/jobs/rarible-sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,21 @@ export const fetchOrdersByTimestamp = async (
if (orderTimestamp > timestamp) {
newTimestamp = orderTimestamp;
}
}

const orderTarget =
parsed!.params.side === "buy"
? parsed?.params.take.assetType?.contract || ""
: parsed?.params.make.assetType?.contract || "";

values.push({
hash: parsed?.params.hash,
target: orderTarget!.toLowerCase(),
maker: order.maker,
created_at: order.createdAt,
data: order as any,
source: "rarible",
});
const orderTarget =
parsed!.params.side === "buy"
? parsed?.params.take.assetType?.contract || ""
: parsed?.params.make.assetType?.contract || "";

values.push({
hash: parsed?.params.hash,
target: orderTarget!.toLowerCase(),
maker: order.maker,
created_at: order.createdAt,
data: order as any,
source: "rarible",
});
}
};

const plimit = pLimit(20);
Expand Down
4 changes: 2 additions & 2 deletions src/jobs/seaport-sync/backfill-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ if (config.doBackfillWork && config.doOpenseaWork) {
} catch (error: any) {
job.data.newCursor = cursor;

if (error.response?.status === 429) {
if ([429, 503].includes(error.response?.status)) {
// Wait to avoid rate-limiting
job.data.retry = true;
await new Promise((resolve) => setTimeout(resolve, 1000));
}

logger.error(
BACKFILL_QUEUE_NAME,
`SeaPort Sync failed attempts=${job.attemptsMade}, error=${error}`
`SeaPort Sync failed attempts=${job.attemptsMade}, fromTimestamp=${fromTimestamp}, toTimestamp=${toTimestamp}, cursor=${cursor}, error=${error}`
);
}
},
Expand Down
8 changes: 1 addition & 7 deletions src/jobs/seaport-sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ export const fetchOrders = async (
let total = 0;

let done = false;
while (!done && (details?.maxOrders ? total < details.maxOrders : true)) {
logger.info("fetch_orders_seaport", `Seaport fetch orders. side=${side}, cursor=${cursor}`);

while (!done && (details?.maxOrders ? total < details.maxOrders : true)) {
const url = seaport.buildFetchOrdersURL({
contract: details?.contract,
side,
Expand Down Expand Up @@ -134,11 +133,6 @@ export const fetchOrders = async (
if (parsedOrders.length) {
await addToRelayOrdersQueue(parsedOrders, true);
}

logger.info(
"fetch_orders_seaport",
`Seaport - Batch done. side=${side}, cursor=${cursor} Got ${orders.length} orders`
);
} catch (error: any) {
if (error.response?.status === 429 || error.response?.status === 503) {
logger.warn(
Expand Down
25 changes: 24 additions & 1 deletion src/utils/element.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sdk from "@reservoir0x/sdk";
import { config } from "../config";

type FetchOrdersParams = {
chain: "eth" | "bsc" | "polygon" | "avalanche" | "base";
chain: string;
token_ids?: string;
asset_contract_address?: string;
sale_kind?: string;
Expand Down Expand Up @@ -54,6 +54,29 @@ export enum SaleKind {
}

export class Element {
public getChainName() {
switch (config.chainId) {
case 1:
return "eth";
case 56:
return "bsc";
case 137:
return "polygon";
case 324:
return "zksync";
case 8453:
return "base";
case 42161:
return "arbitrum";
case 43114:
return "avalanche";
case 59144:
return "linea";
default:
return undefined;
}
}

// https://api.element.market/openapi/#/
public buildFetchOrdersURL(params: FetchOrdersParams) {
// For now there's no support for testnets
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1920,10 +1920,10 @@
resolved "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz"
integrity sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=

"@reservoir0x/sdk@^0.0.342":
version "0.0.342"
resolved "https://registry.yarnpkg.com/@reservoir0x/sdk/-/sdk-0.0.342.tgz#cd29d0744134c260b7b3bb56c53b4d8850e9a4e0"
integrity sha512-sNAvs9x/UtQorUWZzjSvcK3MpmesidmSi3sDFWbmQUyKddiHcE9bIq7kU8WyqNAIzvHxxjZhSfxx7naoUyavkA==
"@reservoir0x/sdk@^0.0.360":
version "0.0.360"
resolved "https://registry.yarnpkg.com/@reservoir0x/sdk/-/sdk-0.0.360.tgz#b30cd4dd8279ae1510e21b76164cab521b072bf3"
integrity sha512-3VfY5fAvqNMq6bAikZ2kpSb1k5CZ1zudKUne8lg9X5MQf9SZTwvnwL6coCRiNUdjjW1BtfydYR56nNW1d0AO3g==
dependencies:
"@uniswap/smart-order-router" "^3.0.4"
decimal.js "^10.4.3"
Expand Down

0 comments on commit 8db2d90

Please sign in to comment.