-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Direct IO #133
Conversation
Change-type: patch
Change-type: patch
Change-type: major
Change-type: major
Change-type: patch
Change-type: major
this.bytesWritten += chunk.buffer.length; | ||
const unlock = isAlignedLockableBuffer(chunk.buffer) | ||
? await chunk.buffer.rlock() | ||
: noop; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also make this
: noop; | |
: undefined; |
and use unlock?.()
below if you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
this.push({ position: chunk.position, buffer }); | ||
this.bytesWritten += chunk.buffer.length; | ||
this.position += chunk.buffer.length; | ||
callback(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This callback isn't guaranteed to be called if await buffer.lock()
fails - is that a case that should be handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it should be handled, updated
if (this.state.hasher !== undefined) { | ||
this.state.hasher.update(buffer); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be
if (this.state.hasher !== undefined) { | |
this.state.hasher.update(buffer); | |
} | |
this.state.hasher?.update(buffer); |
if you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
this.alignedReadableState !== undefined | ||
? this.alignedReadableState.getCurrentBuffer().slice(0, length) | ||
: Buffer.allocUnsafe(length); | ||
const unlock = isAlignedLockableBuffer(buffer) ? await buffer.lock() : noop; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be
const unlock = isAlignedLockableBuffer(buffer) ? await buffer.lock() : noop; | |
const unlock = isAlignedLockableBuffer(buffer) ? await buffer.lock() : undefined; |
and unlock?.()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
public async createSparseWriteStream(): Promise<SparseWritable> { | ||
return await this.createStream('createSparseWriteStream'); | ||
public async createSparseWriteStream( | ||
...args: any[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this can be properly typed using
...args: any[] | |
...args: Parameters<SourceDestination['createSparseWriteStream']> |
public async createWriteStream(): Promise<NodeJS.WritableStream> { | ||
return await this.createStream('createWriteStream'); | ||
public async createWriteStream( | ||
...args: any[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this can be properly typed using
...args: any[] | |
...args: Parameters<SourceDestination['createWriteStream']> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, thanks
lib/block-transform-stream.ts
Outdated
public _flush(callback: (error?: Error) => void) { | ||
this.writeBuffers(true); | ||
callback(); | ||
public async _flush(callback: (error?: Error) => void): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This returns an immediately-resolved promise, is that intended or should this be
public async _flush(callback: (error?: Error) => void): Promise<void> { | |
public _flush(callback: (error?: Error) => void): void { |
or have an await?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, fixed
lib/block-transform-stream.ts
Outdated
chunk: Buffer, | ||
_encoding: string, | ||
callback: (error?: Error) => void, | ||
) { | ||
): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also an immediately-resolved promise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
lib/block-read-stream.ts
Outdated
const { bytesRead } = await this.tryRead(buffer); | ||
if (bytesRead === 0) { | ||
unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in a finally
block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, added
lib/sparse-stream/shared.ts
Outdated
@@ -55,7 +55,7 @@ export interface SparseWritable extends NodeJS.WritableStream { | |||
_write( | |||
chunk: SparseStreamChunk, | |||
encoding: string, | |||
callback: (err?: Error | void) => void, | |||
callback: (err?: Error | void | null) => void, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be
callback: (err?: Error | void | null) => void, | |
callback: (err?: Error | null) => void, |
as void doesn't really make sense as a parameter value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
lib/utils.ts
Outdated
@@ -84,7 +84,7 @@ export function difference<T>(setA: Set<T>, setB: Set<T>): Set<T> { | |||
|
|||
export async function asCallback<T>( | |||
promise: Promise<T>, | |||
callback: (error: Error | void, value?: T) => void, | |||
callback: (error: Error | void | null, value?: T) => void, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another place to drop void
callback: (error: Error | void | null, value?: T) => void, | |
callback: (error: Error | null, value?: T) => void, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropped
Change-type: major
lib/utils.ts
Outdated
const unlock = await chunk.buffer.rlock(); | ||
const data = Buffer.allocUnsafe(chunk.buffer.length); | ||
chunk.buffer.copy(data); | ||
unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in a try/finally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, added
lib/utils.ts
Outdated
stream.on('data', chunks.push.bind(chunks)); | ||
stream.on('data', async (chunk: SparseStreamChunk) => { | ||
if (isAlignedLockableBuffer(chunk.buffer)) { | ||
const unlock = await chunk.buffer.rlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this lock
fails (can it fail?) then the chunk will be silently lost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this function is only used in tests, fixed it anyway.
lib/utils.ts
Outdated
const unlock = await chunk.rlock(); | ||
data = Buffer.allocUnsafe(chunk.length); | ||
chunk.copy(data); | ||
unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same concerns here about try/catch/finally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
private async createStream( | ||
methodName: 'createWriteStream' | 'createSparseWriteStream', | ||
...args: any[] | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want 100% typing you can do:
private async createStream( | |
methodName: 'createWriteStream' | 'createSparseWriteStream', | |
...args: any[] | |
) { | |
private async createStream( | |
methodName: 'createWriteStream', | |
...args: Parameters<SourceDestination['createWriteStream']> | |
) { | |
private async createStream( | |
methodName: 'createSparseWriteStream', | |
...args: Parameters<SourceDestination['createSparseWriteStream']> | |
) { | |
private async createStream( | |
methodName: 'createWriteStream' | 'createSparseWriteStream', | |
...args: Parameters<SourceDestination['createWriteStream']> | Parameters<SourceDestination['createSparseWriteStream']> | |
) { |
although I'm also OK with skipping it as it's a private method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added, thanks
let block = Buffer.concat(this._buffers); | ||
private async writeBuffers(flush = false): Promise<void> { | ||
if (flush || this.inputBytes >= this.chunkSize) { | ||
// TODO optimize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw node 12.16.2 is slated to include buffer.concat performance improvements: nodejs/node#32313 / nodejs/node#31522
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can probably completely avoid Buffer.concat
, but that'll make it more complex to read.
lib/block-read-stream.ts
Outdated
this.push(null); | ||
return; | ||
const unlock = await buffer.lock(); | ||
let bytesRead: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typescript should be able to infer the type if you want:
let bytesRead: number; | |
let bytesRead; |
but I'm not fussed either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Change-type: patch
Change-type: major
Change-type: major
Change-type: major
Change-type: patch
Change-type: patch
Change-type: patch
Change-type: patch
Change-type: patch
Change-type: patch
Change-type: patch
Major change
Only use aligned buffers to write to block devices.
It means all read streams read into aligned buffers when the target is a block device.
When we don't control the read stream, use a transform that will copy its buffers to aligned buffers.
Each read stream uses a pool of 1 MiB buffers to minimize allocations.
This should improve speed and reduce cpu usage.