Skip to content

Commit

Permalink
Fix MySqlDestination not emitting loadingBatchError
Browse files Browse the repository at this point in the history
  • Loading branch information
jansivans committed Oct 25, 2024
1 parent 7daa5f0 commit cd6e963
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bellboy",
"version": "8.7.0",
"version": "8.7.1",
"description": "Highly performant JavaScript data stream ETL engine.",
"main": "lib/index",
"types": "lib/index",
Expand Down
15 changes: 7 additions & 8 deletions src/destinations/mysql-destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,24 @@ export class MySqlDestination extends DatabaseDestination {
}
const insertQuery = `INSERT INTO ${this.table} (${columnNames.join(', ')}) VALUES ?`;
await dbConnection.beginTransaction();
let result;
try {
const insertData = data.map(row => columnNames.map(column => row[column.replace(/`/g, '')]));
await dbConnection.query(insertQuery, [insertData]);
await dbConnection.commit();
if (this.postLoadQuery) {
result = await dbConnection.query(this.postLoadQuery);
}
} catch (err) {
await dbConnection.rollback();
throw err;
} finally {
let result;
try {
if (this.postLoadQuery) {
result = await dbConnection.query(this.postLoadQuery);
}
} catch (postLoadError) {
throw postLoadError;
} finally {
await pool.end();
} catch (poolCloseError) {
throw poolCloseError;
}
return result;
}
return result;
}
}
53 changes: 53 additions & 0 deletions tests/mysql-destination.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,57 @@ it('loadedBatch event should include postLoadQuery result', async () => {
});
await job.run();
expect(lastInsertId).toEqual(10);
});

it('handles loadingBatchError event', async () => {
await db.query(`CREATE TABLE test
(
id integer PRIMARY KEY,
text text NOT NULL
)`);
const processor = new DynamicProcessor({
generator: async function* () {
yield {
id: 1,
text2: 'something',
}
},
});
const destination = new MySqlDestination({
connection,
table: 'test',
batchSize: 1,
});
const job = new Job(processor, [destination]);
job.on('loadingBatchError', async (destinationIndex, rows, error) => {
expect(error.message).toEqual(`Column 'text' cannot be null`);
});
await job.run();
});

it('handles loadingBatchError event when postLoadQuery fails', async () => {
await db.query(`CREATE TABLE test
(
id integer PRIMARY KEY,
text text
)`);
const processor = new DynamicProcessor({
generator: async function* () {
yield {
id: 1,
text: 'something',
}
},
});
const destination = new MySqlDestination({
connection,
table: 'test',
batchSize: 1,
postLoadQuery: `SELECT * FROM non_existent_table`,
});
const job = new Job(processor, [destination]);
job.on('loadingBatchError', async (destinationIndex, rows, error) => {
expect(error.message).toEqual(`Table 'test.non_existent_table' doesn't exist`);
});
await job.run();
});

0 comments on commit cd6e963

Please sign in to comment.