From cd6e9639d0468f22e830a3bb7821d41a2d067e50 Mon Sep 17 00:00:00 2001 From: Yan Ivan Evdokimov Date: Fri, 25 Oct 2024 15:07:49 +0300 Subject: [PATCH] Fix MySqlDestination not emitting loadingBatchError --- package.json | 2 +- src/destinations/mysql-destination.ts | 15 ++++---- tests/mysql-destination.spec.ts | 53 +++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/package.json b/package.json index 3623b5f..792c8bf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bellboy", - "version": "8.7.0", + "version": "8.7.1", "description": "Highly performant JavaScript data stream ETL engine.", "main": "lib/index", "types": "lib/index", diff --git a/src/destinations/mysql-destination.ts b/src/destinations/mysql-destination.ts index 8837271..51af7f6 100644 --- a/src/destinations/mysql-destination.ts +++ b/src/destinations/mysql-destination.ts @@ -31,25 +31,24 @@ export class MySqlDestination extends DatabaseDestination { } const insertQuery = `INSERT INTO ${this.table} (${columnNames.join(', ')}) VALUES ?`; await dbConnection.beginTransaction(); + let result; try { const insertData = data.map(row => columnNames.map(column => row[column.replace(/`/g, '')])); await dbConnection.query(insertQuery, [insertData]); await dbConnection.commit(); + if (this.postLoadQuery) { + result = await dbConnection.query(this.postLoadQuery); + } } catch (err) { await dbConnection.rollback(); throw err; } finally { - let result; try { - if (this.postLoadQuery) { - result = await dbConnection.query(this.postLoadQuery); - } - } catch (postLoadError) { - throw postLoadError; - } finally { await pool.end(); + } catch (poolCloseError) { + throw poolCloseError; } - return result; } + return result; } } \ No newline at end of file diff --git a/tests/mysql-destination.spec.ts b/tests/mysql-destination.spec.ts index 31c1bd8..7e04191 100644 --- a/tests/mysql-destination.spec.ts +++ b/tests/mysql-destination.spec.ts @@ -164,4 +164,57 @@ it('loadedBatch event should include postLoadQuery result', async () => { }); await job.run(); expect(lastInsertId).toEqual(10); +}); + +it('handles loadingBatchError event', async () => { + await db.query(`CREATE TABLE test + ( + id integer PRIMARY KEY, + text text NOT NULL + )`); + const processor = new DynamicProcessor({ + generator: async function* () { + yield { + id: 1, + text2: 'something', + } + }, + }); + const destination = new MySqlDestination({ + connection, + table: 'test', + batchSize: 1, + }); + const job = new Job(processor, [destination]); + job.on('loadingBatchError', async (destinationIndex, rows, error) => { + expect(error.message).toEqual(`Column 'text' cannot be null`); + }); + await job.run(); +}); + +it('handles loadingBatchError event when postLoadQuery fails', async () => { + await db.query(`CREATE TABLE test + ( + id integer PRIMARY KEY, + text text + )`); + const processor = new DynamicProcessor({ + generator: async function* () { + yield { + id: 1, + text: 'something', + } + }, + }); + const destination = new MySqlDestination({ + connection, + table: 'test', + batchSize: 1, + postLoadQuery: `SELECT * FROM non_existent_table`, + }); + const job = new Job(processor, [destination]); + job.on('loadingBatchError', async (destinationIndex, rows, error) => { + expect(error.message).toEqual(`Table 'test.non_existent_table' doesn't exist`); + }); + await job.run(); }); \ No newline at end of file