Skip to content

Commit

Permalink
feat: add Idempotency to Postgres (#7750)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbaker6 authored Jan 2, 2022
1 parent 5e363ea commit 0c3feaa
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 26 deletions.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,26 @@ let api = new ParseServer({
| `idempotencyOptions.paths` | yes | `Array<String>` | `[]` | `.*` (all paths, includes the examples below), <br>`functions/.*` (all functions), <br>`jobs/.*` (all jobs), <br>`classes/.*` (all classes), <br>`functions/.*` (all functions), <br>`users` (user creation / update), <br>`installations` (installation creation / update) | PARSE_SERVER_EXPERIMENTAL_IDEMPOTENCY_PATHS | An array of path patterns that have to match the request path for request deduplication to be enabled. The mount path must not be included, for example to match the request path `/parse/functions/myFunction` specify the path pattern `functions/myFunction`. A trailing slash of the request path is ignored, for example the path pattern `functions/myFunction` matches both `/parse/functions/myFunction` and `/parse/functions/myFunction/`. |
| `idempotencyOptions.ttl` | yes | `Integer` | `300` | `60` (60 seconds) | PARSE_SERVER_EXPERIMENTAL_IDEMPOTENCY_TTL | The duration in seconds after which a request record is discarded from the database. Duplicate requests due to network issues can be expected to arrive within milliseconds up to several seconds. This value must be greater than `0`. |
### Notes <!-- omit in toc -->
### Postgres <!-- omit in toc -->
To use this feature in Postgres, you will need to create a cron job using [pgAdmin](https://www.pgadmin.org/docs/pgadmin4/development/pgagent_jobs.html) or similar to call the Postgres function `idempotency_delete_expired_records()` that deletes expired idempotency records. You can find an example script below. Make sure the script has the same privileges to log into Postgres as Parse Server.
```bash
#!/bin/bash
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
SELECT idempotency_delete_expired_records();
EOSQL
- This feature is currently only available for MongoDB and not for Postgres.
exec "$@"
```
Assuming the script above is named, `parse_idempotency_delete_expired_records.sh`, a cron job that runs the script every 2 minutes may look like:
```bash
2 * * * * /root/parse_idempotency_delete_expired_records.sh >/dev/null 2>&1
```
## Localization
Expand Down
35 changes: 31 additions & 4 deletions spec/Idempotency.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ const rest = require('../lib/rest');
const auth = require('../lib/Auth');
const uuid = require('uuid');

describe_only_db('mongo')('Idempotency', () => {
describe('Idempotency', () => {
// Parameters
/** Enable TTL expiration simulated by removing entry instead of waiting for MongoDB TTL monitor which
runs only every 60s, so it can take up to 119s until entry removal - ain't nobody got time for that */
const SIMULATE_TTL = true;
const ttl = 2;
const maxTimeOut = 4000;

// Helpers
async function deleteRequestEntry(reqId) {
const config = Config.get(Parse.applicationId);
Expand Down Expand Up @@ -38,9 +41,10 @@ describe_only_db('mongo')('Idempotency', () => {
}
await setup({
paths: ['functions/.*', 'jobs/.*', 'classes/.*', 'users', 'installations'],
ttl: 30,
ttl: ttl,
});
});

// Tests
it('should enforce idempotency for cloud code function', async () => {
let counter = 0;
Expand All @@ -56,7 +60,7 @@ describe_only_db('mongo')('Idempotency', () => {
'X-Parse-Request-Id': 'abc-123',
},
};
expect(Config.get(Parse.applicationId).idempotencyOptions.ttl).toBe(30);
expect(Config.get(Parse.applicationId).idempotencyOptions.ttl).toBe(ttl);
await request(params);
await request(params).then(fail, e => {
expect(e.status).toEqual(400);
Expand All @@ -83,12 +87,35 @@ describe_only_db('mongo')('Idempotency', () => {
if (SIMULATE_TTL) {
await deleteRequestEntry('abc-123');
} else {
await new Promise(resolve => setTimeout(resolve, 130000));
await new Promise(resolve => setTimeout(resolve, maxTimeOut));
}
await expectAsync(request(params)).toBeResolved();
expect(counter).toBe(2);
});

it_only_db('postgres')('should delete request entry when postgress ttl function is called', async () => {
const client = Config.get(Parse.applicationId).database.adapter._client;
let counter = 0;
Parse.Cloud.define('myFunction', () => {
counter++;
});
const params = {
method: 'POST',
url: 'http://localhost:8378/1/functions/myFunction',
headers: {
'X-Parse-Application-Id': Parse.applicationId,
'X-Parse-Master-Key': Parse.masterKey,
'X-Parse-Request-Id': 'abc-123',
},
};
await expectAsync(request(params)).toBeResolved();
await expectAsync(request(params)).toBeRejected();
await new Promise(resolve => setTimeout(resolve, maxTimeOut));
await client.one('SELECT idempotency_delete_expired_records()');
await expectAsync(request(params)).toBeResolved();
expect(counter).toBe(2);
});

it('should enforce idempotency for cloud code jobs', async () => {
let counter = 0;
Parse.Cloud.job('myJob', () => {
Expand Down
11 changes: 11 additions & 0 deletions spec/PostgresStorageAdapter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,17 @@ describe_only_db('postgres')('PostgresStorageAdapter', () => {
await new Promise(resolve => setTimeout(resolve, 2000));
expect(adapter._onchange).toHaveBeenCalled();
});

it('Idempotency class should have function', async () => {
await reconfigureServer();
const adapter = Config.get('test').database.adapter;
const client = adapter._client;
const qs = "SELECT format('%I.%I(%s)', ns.nspname, p.proname, oidvectortypes(p.proargtypes)) FROM pg_proc p INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid) WHERE p.proname = 'idempotency_delete_expired_records'";
const foundFunction = await client.one(qs);
expect(foundFunction.format).toBe("public.idempotency_delete_expired_records()");
await adapter.deleteIdempotencyFunction();
await client.none(qs);
});
});

describe_only_db('postgres')('PostgresStorageAdapter shutdown', () => {
Expand Down
52 changes: 49 additions & 3 deletions src/Adapters/Storage/Postgres/PostgresStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -2440,9 +2440,55 @@ export class PostgresStorageAdapter implements StorageAdapter {
? fieldNames.map((fieldName, index) => `lower($${index + 3}:name) varchar_pattern_ops`)
: fieldNames.map((fieldName, index) => `$${index + 3}:name`);
const qs = `CREATE INDEX IF NOT EXISTS $1:name ON $2:name (${constraintPatterns.join()})`;
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames]).catch(error => {
throw error;
});
const setIdempotencyFunction = options.setIdempotencyFunction !== undefined ? options.setIdempotencyFunction : false;
if (setIdempotencyFunction) {
await this.ensureIdempotencyFunctionExists(options);
}
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames])
.catch(error => {
if (
error.code === PostgresDuplicateRelationError &&
error.message.includes(indexNameOptions.name)
) {
// Index already exists. Ignore error.
} else if (
error.code === PostgresUniqueIndexViolationError &&
error.message.includes(indexNameOptions.name)
) {
// Cast the error into the proper parse error
throw new Parse.Error(
Parse.Error.DUPLICATE_VALUE,
'A duplicate value for a field with unique values was provided'
);
} else {
throw error;
}
});
}
async deleteIdempotencyFunction(
options?: Object = {}
): Promise<any> {
const conn = options.conn !== undefined ? options.conn : this._client;
const qs = 'DROP FUNCTION IF EXISTS idempotency_delete_expired_records()';
return conn
.none(qs)
.catch(error => {
throw error;
});
}
async ensureIdempotencyFunctionExists(
options?: Object = {}
): Promise<any> {
const conn = options.conn !== undefined ? options.conn : this._client;
const ttlOptions = options.ttl !== undefined ? `${options.ttl} seconds` : '60 seconds';
const qs = 'CREATE OR REPLACE FUNCTION idempotency_delete_expired_records() RETURNS void LANGUAGE plpgsql AS $$ BEGIN DELETE FROM "_Idempotency" WHERE expire < NOW() - INTERVAL $1; END; $$;';
return conn
.none(qs, [ttlOptions])
.catch(error => {
throw error;
});
}
}
Expand Down
39 changes: 25 additions & 14 deletions src/Controllers/DatabaseController.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import logger from '../logger';
import * as SchemaController from './SchemaController';
import { StorageAdapter } from '../Adapters/Storage/StorageAdapter';
import MongoStorageAdapter from '../Adapters/Storage/Mongo/MongoStorageAdapter';
import PostgresStorageAdapter from '../Adapters/Storage/Postgres/PostgresStorageAdapter';
import SchemaCache from '../Adapters/Cache/SchemaCache';
import type { LoadSchemaOptions } from './types';
import type { QueryOptions, FullQueryOptions } from '../Adapters/Storage/StorageAdapter';
Expand Down Expand Up @@ -394,12 +395,14 @@ const relationSchema = {

class DatabaseController {
adapter: StorageAdapter;
idempotencyOptions: any;
schemaCache: any;
schemaPromise: ?Promise<SchemaController.SchemaController>;
_transactionalSession: ?any;

constructor(adapter: StorageAdapter) {
constructor(adapter: StorageAdapter, idempotencyOptions?: Object = {}) {
this.adapter = adapter;
this.idempotencyOptions = idempotencyOptions;
// We don't want a mutable this.schema, because then you could have
// one request that uses different schemas for different parts of
// it. Instead, use loadSchema to get a schema.
Expand Down Expand Up @@ -1713,9 +1716,7 @@ class DatabaseController {
};
await this.loadSchema().then(schema => schema.enforceClassExists('_User'));
await this.loadSchema().then(schema => schema.enforceClassExists('_Role'));
if (this.adapter instanceof MongoStorageAdapter) {
await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency'));
}
await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency'));

await this.adapter.ensureUniqueness('_User', requiredUserFields, ['username']).catch(error => {
logger.warn('Unable to ensure uniqueness for usernames: ', error);
Expand Down Expand Up @@ -1751,18 +1752,28 @@ class DatabaseController {
logger.warn('Unable to ensure uniqueness for role name: ', error);
throw error;
});
if (this.adapter instanceof MongoStorageAdapter) {
await this.adapter
.ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId'])
.catch(error => {
logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error);
throw error;
});

await this.adapter
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, {
await this.adapter
.ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId'])
.catch(error => {
logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error);
throw error;
});

const isMongoAdapter = this.adapter instanceof MongoStorageAdapter;
const isPostgresAdapter = this.adapter instanceof PostgresStorageAdapter;
if (isMongoAdapter || isPostgresAdapter) {
let options = {};
if (isMongoAdapter) {
options = {
ttl: 0,
})
};
} else if (isPostgresAdapter) {
options = this.idempotencyOptions;
options.setIdempotencyFunction = true;
}
await this.adapter
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, options)
.catch(error => {
logger.warn('Unable to create TTL index for idempotency expire date: ', error);
throw error;
Expand Down
4 changes: 2 additions & 2 deletions src/Controllers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ export function getLiveQueryController(options: ParseServerOptions): LiveQueryCo
}

export function getDatabaseController(options: ParseServerOptions): DatabaseController {
const { databaseURI, collectionPrefix, databaseOptions } = options;
const { databaseURI, collectionPrefix, databaseOptions, idempotencyOptions } = options;
let { databaseAdapter } = options;
if (
(databaseOptions ||
Expand All @@ -157,7 +157,7 @@ export function getDatabaseController(options: ParseServerOptions): DatabaseCont
} else {
databaseAdapter = loadAdapter(databaseAdapter);
}
return new DatabaseController(databaseAdapter);
return new DatabaseController(databaseAdapter, idempotencyOptions);
}

export function getHooksController(
Expand Down
3 changes: 2 additions & 1 deletion src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ClientSDK from './ClientSDK';
import defaultLogger from './logger';
import rest from './rest';
import MongoStorageAdapter from './Adapters/Storage/Mongo/MongoStorageAdapter';
import PostgresStorageAdapter from './Adapters/Storage/Postgres/PostgresStorageAdapter';

export const DEFAULT_ALLOWED_HEADERS =
'X-Parse-Master-Key, X-Parse-REST-API-Key, X-Parse-Javascript-Key, X-Parse-Application-Id, X-Parse-Client-Version, X-Parse-Session-Token, X-Requested-With, X-Parse-Revocable-Session, X-Parse-Request-Id, Content-Type, Pragma, Cache-Control';
Expand Down Expand Up @@ -431,7 +432,7 @@ export function promiseEnforceMasterKeyAccess(request) {
*/
export function promiseEnsureIdempotency(req) {
// Enable feature only for MongoDB
if (!(req.config.database.adapter instanceof MongoStorageAdapter)) {
if (!((req.config.database.adapter instanceof MongoStorageAdapter) || (req.config.database.adapter instanceof PostgresStorageAdapter))) {
return Promise.resolve();
}
// Get parameters
Expand Down

0 comments on commit 0c3feaa

Please sign in to comment.