Skip to content

Commit

Permalink
Improve performance (throughput)
Browse files Browse the repository at this point in the history
It's likely that Deno has the same problem as the following issue that
exists in Node.

nodejs/node#31979

So we tried to avoid this problem by using `AsyncGenerator<T[]>`
instead.
  • Loading branch information
lambdalisue committed Feb 20, 2022
1 parent ebbeaf8 commit 05ff655
Showing 1 changed file with 78 additions and 42 deletions.
120 changes: 78 additions & 42 deletions denops/@ddu-sources/file_rec.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { BaseSource, Item } from "https://deno.land/x/[email protected]/types.ts";
import { Denops, fn } from "https://deno.land/x/[email protected]/deps.ts";
import { ActionData } from "https://deno.land/x/[email protected]/file.ts";

import { join, resolve } from "https://deno.land/[email protected]/path/mod.ts";
import { ActionData } from "https://deno.land/x/[email protected]/file.ts";
import { relative } from "https://deno.land/[email protected]/path/mod.ts";
import { deferred } from "https://deno.land/[email protected]/async/mod.ts";

const chunkMinSize = 100;
const chunkMaxSize = 20000;

const aborted = Symbol("aborted");
const chunkSize = 1000;
const enqueueSize1st = 1000;
const enqueueSize2nd = 100000;

type Params = {
ignoredDirectories: string[];
Expand All @@ -35,24 +33,22 @@ export class Source extends BaseSource<Params> {
sourceParams.ignoredDirectories,
abortController.signal,
);
let chunkSize = chunkMinSize;
let chunk: Item<ActionData>[] = [];
let enqueueSize = enqueueSize1st;
let items: Item<ActionData>[] = [];
try {
for await (const item of it) {
chunk.push(item);
if (chunk.length > chunkSize) {
controller.enqueue(chunk);
chunk = [];
if (chunkSize < chunkMaxSize) {
chunkSize += chunkSize;
}
for await (const chunk of it) {
items = items.concat(chunk);
if (items.length >= enqueueSize) {
enqueueSize = enqueueSize2nd;
controller.enqueue(items);
items = [];
}
}
if (chunk.length) {
controller.enqueue(chunk);
if (items.length) {
controller.enqueue(items);
}
} catch (e: unknown) {
if (e === aborted) {
if (e instanceof AbortedError) {
return;
}
console.error(e);
Expand All @@ -79,44 +75,84 @@ async function* walk(
root: string,
ignoredDirectories: string[],
signal: AbortSignal,
): AsyncGenerator<Item<ActionData>> {
const waiter = deferred<never>();
signal.addEventListener("abort", () => waiter.reject(aborted));

const walk = async function* (dir: string): AsyncGenerator<Item<ActionData>> {
const it = Deno.readDir(dir)[Symbol.asyncIterator]();
while (true) {
try {
const { done, value } = await Promise.race([waiter, it.next()]);
if (done || value == undefined) {
return;
}
const entry = value as Deno.DirEntry;
): AsyncGenerator<Item<ActionData>[]> {
const walk = async function* (
dir: string,
): AsyncGenerator<Item<ActionData>[]> {
let chunk: Item<ActionData>[] = [];
try {
for await (const entry of abortable(Deno.readDir(dir), signal)) {
const abspath = join(dir, entry.name);

if (!entry.isDirectory) {
yield {
const n = chunk.push({
word: relative(root, abspath),
action: {
path: abspath,
},
};
});
if (n >= chunkSize) {
yield chunk;
chunk = [];
}
} else {
if (ignoredDirectories.includes(entry.name)) {
continue;
}
yield* walk(abspath);
}
} catch (e: unknown) {
if (e instanceof Deno.errors.PermissionDenied) {
// Ignore this error
// See https://github.com/Shougo/ddu-source-file_rec/issues/2
continue;
}

throw e;
}
if (chunk.length) {
yield chunk;
}
} catch (e: unknown) {
if (e instanceof Deno.errors.PermissionDenied) {
// Ignore this error
// See https://github.com/Shougo/ddu-source-file_rec/issues/2
return;
}
throw e;
}
};
yield* walk(root);
}

// XXX: Replace it with the following
// https://github.com/denoland/deno_std/pull/1939
class AbortedError extends Error {
// This `reason` comes from `AbortSignal` thus must be `any`.
// deno-lint-ignore no-explicit-any
reason?: any;

// This `reason` comes from `AbortSignal` thus must be `any`.
// deno-lint-ignore no-explicit-any
constructor(reason?: any) {
super(reason ? `Aborted: ${reason}` : "Aborted");
this.name = "AbortedError";
this.reason = reason;
}
}

// XXX: Replace it with the following
// https://github.com/denoland/deno_std/pull/1939
async function* abortable<T>(
p: AsyncIterable<T>,
signal: AbortSignal,
): AsyncGenerator<T> {
if (signal.aborted) {
throw new AbortedError(signal.reason);
}
const waiter = deferred<never>();
const abort = () => waiter.reject(new AbortedError(signal.reason));
signal.addEventListener("abort", abort, { once: true });

const it = p[Symbol.asyncIterator]();
while (true) {
const { done, value } = await Promise.race([waiter, it.next()]);
if (done) {
signal.removeEventListener("abort", abort);
return;
}
yield value;
}
}

0 comments on commit 05ff655

Please sign in to comment.