Skip to content

Commit

Permalink
Merge pull request #4221 from omnivore-app/fix/content-fetch-error
Browse files Browse the repository at this point in the history
improve content-fetch
  • Loading branch information
sywhb authored Jul 25, 2024
2 parents ee5e66d + 156feb8 commit bdae03e
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 308 deletions.
1 change: 1 addition & 0 deletions packages/api/src/jobs/bulk_action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const bulkAction = async (data: BulkActionData) => {
for (let offset = 0; offset < count; offset += batchSize) {
const searchArgs = {
size: batchSize,
includePending: true,
query: `(${query}) AND updated:*..${now}`, // only process items that have not been updated
}

Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/repository/library_item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const libraryItemRepository = appDataSource
.andWhere('md5(original_url) = md5(:url)', { url })

if (forUpdate) {
qb.setLock('pessimistic_write')
qb.setLock('pessimistic_read')
}

return qb.getOne()
Expand Down
7 changes: 4 additions & 3 deletions packages/api/src/resolvers/article/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,10 @@ export const bulkActionResolver = authorized<
},
})

const batchSize = 100
const batchSize = 20
const searchArgs = {
query,
includePending: true,
size: 0,
}
const count = await countLibraryItems(searchArgs, uid)
Expand All @@ -778,13 +779,13 @@ export const bulkActionResolver = authorized<
action,
count,
})
// if there are less than 100 items, update them synchronously
// if there are less than batchSize items, update them synchronously
await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args)

return { success: true }
}

// if there are more than 100 items, update them asynchronously
// if there are more than batchSize items, update them asynchronously
const data = {
userId: uid,
action,
Expand Down
4 changes: 2 additions & 2 deletions packages/api/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import { analytics } from './utils/analytics'
import { corsConfig } from './utils/corsConfig'
import { getClientFromUserAgent } from './utils/helpers'
import { buildLogger, buildLoggerTransport, logger } from './utils/logger'
import { apiLimiter, authLimiter } from './utils/rate_limit'
import { apiHourLimiter, apiLimiter, authLimiter } from './utils/rate_limit'
import { shortcutsRouter } from './routers/shortcuts_router'

const PORT = process.env.PORT || 4000
Expand All @@ -68,7 +68,7 @@ export const createApp = (): Express => {
app.set('trust proxy', env.server.trustProxy)

// Apply the rate limiting middleware to API calls only
app.use('/api/', apiLimiter)
app.use('/api/', apiLimiter, apiHourLimiter)

// set client info in the request context
app.use(httpContext.middleware)
Expand Down
48 changes: 43 additions & 5 deletions packages/api/src/services/create_page_save_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import {
PageType,
} from '../generated/graphql'
import { createPubSubClient, PubsubClient } from '../pubsub'
import { Merge } from '../util'
import { redisDataSource } from '../redis_data_source'
import { enqueueParseRequest } from '../utils/createTask'
import { cleanUrl, generateSlug } from '../utils/helpers'
import { logger } from '../utils/logger'
import { countBySavedAt, createOrUpdateLibraryItem } from './library_item'
import { createOrUpdateLibraryItem } from './library_item'

interface PageSaveRequest {
user: User
Expand All @@ -34,13 +34,47 @@ const SAVING_CONTENT = 'Your link is being saved...'

const isPrivateIP = privateIpLib.default

const recentSavedItemKey = (userId: string) => `recent-saved-item:${userId}`

const addRecentSavedItem = async (userId: string) => {
const redisClient = redisDataSource.redisClient

if (redisClient) {
const key = recentSavedItemKey(userId)
try {
// add now to the sorted set for rate limiting
await redisClient.zadd(key, Date.now(), Date.now())
} catch (error) {
logger.error('error adding recently saved item in redis', {
key,
error,
})
}
}
}

// 5 items saved in the last minute: use low queue
// default: use normal queue
const getPriorityByRateLimit = async (
userId: string
): Promise<'low' | 'high'> => {
const count = await countBySavedAt(userId, new Date(Date.now() - 60 * 1000))
return count >= 5 ? 'low' : 'high'
): Promise<'low' | 'high' | undefined> => {
const redisClient = redisDataSource.redisClient
if (redisClient) {
const oneMinuteAgo = Date.now() - 60 * 1000
const key = recentSavedItemKey(userId)

try {
// Remove items older than one minute
await redisClient.zremrangebyscore(key, '-inf', oneMinuteAgo)

// Count items in the last minute
const count = await redisClient.zcard(key)

return count >= 5 ? 'low' : 'high'
} catch (error) {
logger.error('Failed to get priority by rate limit', { userId, error })
}
}
}

export const validateUrl = (url: string): URL => {
Expand Down Expand Up @@ -118,8 +152,12 @@ export const createPageSaveRequest = async ({
pubsub
)

// add to recent saved item
await addRecentSavedItem(userId)

// get priority by checking rate limit if not specified
priority = priority || (await getPriorityByRateLimit(userId))
logger.debug('priority', { priority })

// enqueue task to parse item
await enqueueParseRequest({
Expand Down
6 changes: 3 additions & 3 deletions packages/api/src/services/library_item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ export const findLibraryItemsByPrefix = async (
)
}

export const countBySavedAt = async (
export const countByCreatedAt = async (
userId: string,
startDate = new Date(0),
endDate = new Date()
Expand All @@ -1202,7 +1202,7 @@ export const countBySavedAt = async (
tx
.createQueryBuilder(LibraryItem, 'library_item')
.where('library_item.user_id = :userId', { userId })
.andWhere('library_item.saved_at between :startDate and :endDate', {
.andWhere('library_item.created_at between :startDate and :endDate', {
startDate,
endDate,
})
Expand Down Expand Up @@ -1256,7 +1256,7 @@ export const batchUpdateLibraryItems = async (
const queryBuilder = getQueryBuilder(userId, em)

if (forUpdate) {
queryBuilder.setLock('pessimistic_write')
queryBuilder.setLock('pessimistic_read')
}

const libraryItems = await queryBuilder
Expand Down
22 changes: 21 additions & 1 deletion packages/api/src/utils/rate_limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const configs: Partial<Options> = {
export const apiLimiter = rateLimit({
...configs,
max: async (req) => {
// 100 RPM for an authenticated request, 15 for a non-authenticated request
// 60 RPM for authenticated request, 15 for non-authenticated request
const token = getTokenByRequest(req)
try {
const claims = await getClaimsByToken(token)
Expand All @@ -43,6 +43,26 @@ export const apiLimiter = rateLimit({
store: getStore('api-rate-limit'),
})

export const apiHourLimiter = rateLimit({
...configs,
windowMs: 60 * 60 * 1000, // 1 hour
max: async (req) => {
// 600 for authenticated request, 150 for non-authenticated request
const token = getTokenByRequest(req)
try {
const claims = await getClaimsByToken(token)
return claims ? 600 : 150
} catch (e) {
console.log('non-authenticated request')
return 150
}
},
keyGenerator: (req) => {
return getTokenByRequest(req) || req.ip
},
store: getStore('api-hour-rate-limit'),
})

// 5 RPM for auth requests
export const authLimiter = rateLimit({
...configs,
Expand Down
2 changes: 0 additions & 2 deletions packages/content-fetch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ COPY .prettierrc .
COPY .eslintrc .

COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
COPY /packages/utils/package.json ./packages/utils/package.json
Expand All @@ -33,7 +32,6 @@ RUN yarn install --pure-lockfile
ADD /packages/content-fetch ./packages/content-fetch
ADD /packages/content-handler ./packages/content-handler
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
ADD /packages/readabilityjs ./packages/readabilityjs
ADD /packages/utils ./packages/utils
RUN yarn workspace @omnivore/utils build
RUN yarn workspace @omnivore/content-handler build
Expand Down
2 changes: 0 additions & 2 deletions packages/content-fetch/Dockerfile-gcf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ COPY .prettierrc .
COPY .eslintrc .

COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
COPY /packages/utils/package.json ./packages/utils/package.json
Expand All @@ -37,7 +36,6 @@ RUN yarn install --pure-lockfile
ADD /packages/content-handler ./packages/content-handler
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
ADD /packages/content-fetch ./packages/content-fetch
ADD /packages/readabilityjs ./packages/readabilityjs
ADD /packages/utils ./packages/utils
RUN yarn workspace @omnivore/utils build
RUN yarn workspace @omnivore/content-handler build
Expand Down
5 changes: 4 additions & 1 deletion packages/content-fetch/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ export const queueSavePageJob = async (
data: job.data,
opts: getOpts(job),
}))
console.log('queue save page jobs:', JSON.stringify(jobs, null, 2))
console.log(
'queue save page jobs:',
jobs.map((job) => job.data.finalUrl)
)

const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
Expand Down
64 changes: 62 additions & 2 deletions packages/content-fetch/src/request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,52 @@ const getCachedFetchResult = async (
return fetchResult
}

const failureRedisKey = (domain: string) => `fetch-failure:${domain}`

const isDomainBlocked = async (
redisDataSource: RedisDataSource,
domain: string
) => {
const blockedDomains = ['localhost', 'weibo.com']
if (blockedDomains.includes(domain)) {
return true
}

const key = failureRedisKey(domain)
const redisClient = redisDataSource.cacheClient
try {
const result = await redisClient.get(key)
// if the domain has failed to fetch more than certain times, block it
const maxFailures = parseInt(process.env.MAX_FEED_FETCH_FAILURES ?? '10')
if (result && parseInt(result) > maxFailures) {
console.info(`domain is blocked: ${domain}`)
return true
}
} catch (error) {
console.error('Failed to check domain block status', { domain, error })
}

return false
}

const incrementContentFetchFailure = async (
redisDataSource: RedisDataSource,
domain: string
) => {
const redisClient = redisDataSource.cacheClient
const key = failureRedisKey(domain)
try {
const result = await redisClient.incr(key)
// expire the key in 1 day
await redisClient.expire(key, 24 * 60 * 60)

return result
} catch (error) {
console.error('Failed to increment failure in redis', { domain, error })
return null
}
}

export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
const functionStartTime = Date.now()

Expand Down Expand Up @@ -200,8 +246,22 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
url
)

fetchResult = await fetchContent(url, locale, timezone)
console.log('content has been fetched')
const domain = new URL(url).hostname
const isBlocked = await isDomainBlocked(redisDataSource, domain)
if (isBlocked) {
console.log('domain is blocked', domain)

return res.sendStatus(200)
}

try {
fetchResult = await fetchContent(url, locale, timezone)
console.log('content has been fetched')
} catch (error) {
await incrementContentFetchFailure(redisDataSource, domain)

throw error
}

if (fetchResult.content && !NO_CACHE_URLS.includes(url)) {
const cacheResult = await cacheFetchResult(
Expand Down
10 changes: 1 addition & 9 deletions packages/puppeteer-parse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,12 @@
],
"dependencies": {
"@omnivore/content-handler": "1.0.0",
"@omnivore/readability": "1.0.0",
"axios": "^1.4.0",
"crypto": "^1.0.1",
"dompurify": "^2.4.1",
"linkedom": "^0.14.9",
"puppeteer-core": "^22.12.1",
"puppeteer-extra": "^3.3.6",
"puppeteer-extra-plugin-adblocker": "^2.13.6",
"puppeteer-extra-plugin-stealth": "^2.11.2",
"urlsafe-base64": "^1.0.0"
"puppeteer-extra-plugin-stealth": "^2.11.2"
},
"devDependencies": {
"@types/dompurify": "^3.0.5",
"@types/urlsafe-base64": "^1.0.31",
"chai": "^4.3.6",
"mocha": "^10.0.0"
},
Expand Down
Loading

0 comments on commit bdae03e

Please sign in to comment.