Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove split2, refactor Rows for better performance #108

Closed
wants to merge 5 commits into from
Closed

Conversation

slvrtrn
Copy link
Contributor

@slvrtrn slvrtrn commented Oct 2, 2022

After checking out https://github.com/go-faster/ch-bench and writing a simple test for our client as such:

import { createClient } from '@clickhouse/client'
;(async () => {
  const client = createClient()
  const rows = await client.query({
    query: 'SELECT number FROM system.numbers_mt LIMIT 500000000',
    format: 'TabSeparated',
  })
  const start = +new Date()
  for await (const _ of rows.stream()) {
    //
  }
  const end = +new Date()
  console.info(`Execution time: ${end - start} ms`)
})()

I ended up with an "amazing" performance of around ~280 seconds for executing this bit of code (Ryzen 9 5950X).

So, I did the following:

  • Replaced split2 with a simple self-written solution
  • Replaced a Row class created on every iteration with just a slim interface
  • Replaced return type of stream() method with AsyncGenerator<Row, void>, and now we have a proper row type here:
  for await (const row of rows.stream()) {
    await row.text() // <-- `row` is actually of type `Row` instead of `any`
  }

Before: 285 seconds
After: 96 seconds

Still far from perfect (I will investigate more), but that's a start.

@slvrtrn slvrtrn requested a review from mshustov October 2, 2022 23:25
}
expect(last).toBe('9999')
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't need this anymore. It can be paused (or not consumed) just on the application level.

})
} else {
expect(await selectPromise).toEqual(undefined)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprisingly enough, this is no longer the case for Node.js 18.x.

¯\_(ツ)_/¯

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed destroy(), which caused the error. Maybe that's the reason?

}
expect(last).toBe('9999')
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not required anymore with async iterators. It can be paused (not consumed) on the application level, as it is a lazy evaluation.

callback()
},
objectMode: true,
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the best implementation for this (haven't tested how it behaves with large files), but at least it works without split2.

if (!line.length) {
return
} else {
const json = JSON.parse(line)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should support already serialized data somehow. It looks strange to parse it here and then stringify it again in the library.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's insert data in CSV but query in JSONCompactEachRow? It doesn't change the logic much but simplifies the example

@slvrtrn
Copy link
Contributor Author

slvrtrn commented Oct 3, 2022

After some digging in Node.js issues I found this, and indeed, for await const of is about 30% slower than .on('data', callback) approach on my machine.

Using current release version, with the old stream implementation with split2, this code

import { createClient } from '@clickhouse/client';
(async () => {
  const client = createClient()
  const rows = await client.query({
    query: 'SELECT number FROM system.numbers_mt LIMIT 50000000',
    format: 'TabSeparated'
  })
  const start = +new Date()
  const stream = rows.stream()
  stream.on('data', (_) => {
    //
  })
  await new Promise((resolve) => {
    stream.on('end', () => {resolve()})
  })
  const end = +new Date()
  console.info(`Execution time: ${end - start} ms`)
  await client.close()
})()

executes in 20 seconds, while this

import { createClient } from '@clickhouse/client';
(async () => {
  const client = createClient()
  const rows = await client.query({
    query: 'SELECT number FROM system.numbers_mt LIMIT 50000000',
    format: 'TabSeparated'
  })
  const start = +new Date()
  for await (const _ of rows.stream()) {
    //
  }
  const end = +new Date()
  console.info(`Execution time: ${end - start} ms`)
  await client.close()
})()

takes 30 seconds to finish.

So it might be beneficial to return Stream.Readable instead of an AsyncGenerator.
Only need to figure out how to make it faster :)

@slvrtrn
Copy link
Contributor Author

slvrtrn commented Oct 3, 2022

So, with just the allocations removed, keeping the Stream in place

return Stream.pipeline(
  this._stream,
  split((row: string) => ({ // <- no `new Row` here
    text: row, // <- this is not a function anymore
    json<T>() {
      return decode(row, 'JSON')
    },
  })),
  function pipelineCb(err) {
    if (err) {
      console.error(err)
    }
  }
)

using this code (50M numbers)

import { createClient } from '../src'
void (async () => {
  const client = createClient({
    compression: {
      request: false,
      response: false,
    },
  })
  const rows = await client.query({
    query: 'SELECT number FROM system.numbers_mt LIMIT 50000000',
    format: 'TabSeparated',
  })
  const start = +new Date()
  const stream = rows.stream()
  stream.on('data', (_) => {
    //
  })
  await new Promise((resolve) => {
    stream.on('end', () => {
      resolve(0)
    })
  })
  const end = +new Date()
  console.info(`Execution time: ${end - start} ms`)
})()

we can get as fast as ~3.5-4 seconds on my machine.
The current release takes ~18 seconds to execute the same code.

On 500M records instead of 50M, that's ~37-38 seconds vs ~280-300 seconds.

let decodedChunk = ''
for await (const chunk of this._stream) {
decodedChunk += textDecoder.decode(chunk, { stream: true })
let idx = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit it's not changed. let's move lower:

const idx = decodedChunk.indexOf('\n')

},
}
} else {
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a special case at the top to remove a nesting level.

if (idx === -1) break;
const line = decodedChunk.slice(0, idx);
...

yield {
/**
* Returns a string representation of a row.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments belong to Row interface declared below. https://github.com/ClickHouse/clickhouse-js/pull/108/files#diff-b13826a37e4f93783b49eaca9c60dc1d124ee9d6b331be22244f41cc7bb09d39R94-R96
IDE doesn't hint at the method signature.

}
}
)
}
textDecoder.decode() // flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be consumed? The method typings shows it as

    /**
     * Returns the result of running encoding's decoder. The method can be invoked zero or more times with options's stream set to true, and then once without options's stream (or set to false), to process a fragmented input. If the invocation without options's stream (or set to false) has no input, it's clearest to omit both arguments.
     *
     * ```
     * var string = "", decoder = new TextDecoder(encoding), buffer;
     * while(buffer = next_chunk()) {
     *   string += decoder.decode(buffer, {stream:true});
     * }
     * string += decoder.decode(); // end-of-queue
     * ```
     *
     * If the error mode is "fatal" and encoding's decoder returns error, throws a TypeError.
     */
    decode(input?: BufferSource, options?: TextDecodeOptions): string;

if (!line.length) {
return
} else {
const json = JSON.parse(line)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's insert data in CSV but query in JSONCompactEachRow? It doesn't change the logic much but simplifies the example

})
} else {
expect(await selectPromise).toEqual(undefined)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed destroy(), which caused the error. Maybe that's the reason?

/**
* Returns a string representation of a row.
*/
text(): string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's more lightweight than a Row class instance?
The method emits an object literal with 2 methods vs. a class with 2 methods in a prototype

class Row {
    text(){}
}
(new Row()).hasOwnProperty('text') // false
const objectLiteral = { 
    text(){}
}
objectLiteral.hasOwnProperty('text') // true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split((row: string) => new Row(row, 'JSON')),
ts-node --transpile-only --project tsconfig.dev.json examples/many_numbers.ts
Execution time: 1957 ms
split((row: string) => ({
  text: row,
  json<T>() {
    return decode(row, 'JSON')
  }
})),
ts-node --transpile-only --project tsconfig.dev.json examples/many_numbers.ts
Execution time: 394 ms

@slvrtrn
Copy link
Contributor Author

slvrtrn commented Oct 3, 2022

Will create a new one with an updated implementation.

@slvrtrn slvrtrn closed this Oct 3, 2022
@slvrtrn slvrtrn deleted the performance branch October 4, 2022 19:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants