Skip to content

Commit

Permalink
[Scheduler] Store Tasks on a Min Binary Heap
Browse files Browse the repository at this point in the history
Switches Scheduler's priority queue implementation (for both tasks and
timers) to an array-based min binary heap.

This replaces the naive linked-list implementation that was left over
from the queue we once used to schedule React roots. A list was arguably
fine when it was only used for roots, since the total number of roots is
usually small, and is only 1 in the common case of a single-page app.

Since Scheduler is now used for many types of JavaScript tasks (e.g.
including timers), the total number of tasks can be much larger.

Binary heaps are the standard way to implement priority queues.
Insertion is O(1) in the average case (append to the end) and O(log n)
in the worst. Deletion is O(log n). Peek is O(1).
  • Loading branch information
acdlite committed Jul 29, 2019
1 parent 75ab53b commit 4760cbc
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 196 deletions.
274 changes: 78 additions & 196 deletions packages/scheduler/src/Scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
forceFrameRate,
requestPaint,
} from './SchedulerHostConfig';
import {push, pop, peek} from './SchedulerMinHeap';

// TODO: Use symbols?
var ImmediatePriority = 1;
Expand All @@ -40,9 +41,12 @@ var LOW_PRIORITY_TIMEOUT = 10000;
// Never times out
var IDLE_PRIORITY = maxSigned31BitInt;

// Tasks are stored as a circular, doubly linked list.
var firstTask = null;
var firstDelayedTask = null;
// Tasks are stored on a min heap
var taskQueue = [];
var timerQueue = [];

// Incrementing id counter. Used to maintain insertion order.
var taskIdCounter = 0;

// Pausing the scheduler is useful for debugging.
var isSchedulerPaused = false;
Expand Down Expand Up @@ -73,25 +77,13 @@ function scheduler_flushTaskAtPriority_Idle(callback, didTimeout) {
}

function flushTask(task, currentTime) {
// Remove the task from the list before calling the callback. That way the
// list is in a consistent state even if the callback throws.
const next = task.next;
if (next === task) {
// This is the only scheduled task. Clear the list.
firstTask = null;
} else {
// Remove the task from its position in the list.
if (task === firstTask) {
firstTask = next;
}
const previous = task.previous;
previous.next = next;
next.previous = previous;
}
task.next = task.previous = null;

// Now it's safe to execute the task.
var callback = task.callback;
if (callback === null) {
// The task was canceled.
return;
}
// Clearing the callback marks it as ready for removal from the task queue.
task.callback = null;
var previousPriorityLevel = currentPriorityLevel;
var previousTask = currentTask;
currentPriorityLevel = task.priorityLevel;
Expand Down Expand Up @@ -133,76 +125,34 @@ function flushTask(task, currentTime) {
);
break;
}
} catch (error) {
throw error;
} finally {
currentPriorityLevel = previousPriorityLevel;
currentTask = previousTask;
}

// A callback may return a continuation. The continuation should be scheduled
// with the same priority and expiration as the just-finished callback.
// A callback may return a continuation.
if (typeof continuationCallback === 'function') {
var expirationTime = task.expirationTime;
var continuationTask = task;
continuationTask.callback = continuationCallback;

// Insert the new callback into the list, sorted by its timeout. This is
// almost the same as the code in `scheduleCallback`, except the callback
// is inserted into the list *before* callbacks of equal timeout instead
// of after.
if (firstTask === null) {
// This is the first callback in the list.
firstTask = continuationTask.next = continuationTask.previous = continuationTask;
} else {
var nextAfterContinuation = null;
var t = firstTask;
do {
if (expirationTime <= t.expirationTime) {
// This task times out at or after the continuation. We will insert
// the continuation *before* this task.
nextAfterContinuation = t;
break;
}
t = t.next;
} while (t !== firstTask);
if (nextAfterContinuation === null) {
// No equal or lower priority task was found, which means the new task
// is the lowest priority task in the list.
nextAfterContinuation = firstTask;
} else if (nextAfterContinuation === firstTask) {
// The new task is the highest priority task in the list.
firstTask = continuationTask;
}

const previous = nextAfterContinuation.previous;
previous.next = nextAfterContinuation.previous = continuationTask;
continuationTask.next = nextAfterContinuation;
continuationTask.previous = previous;
}
task.callback = continuationCallback;
}
}

function advanceTimers(currentTime) {
// Check for tasks that are no longer delayed and add them to the queue.
if (firstDelayedTask !== null && firstDelayedTask.startTime <= currentTime) {
do {
const task = firstDelayedTask;
const next = task.next;
if (task === next) {
firstDelayedTask = null;
} else {
firstDelayedTask = next;
const previous = task.previous;
previous.next = next;
next.previous = previous;
}
task.next = task.previous = null;
insertScheduledTask(task, task.expirationTime);
} while (
firstDelayedTask !== null &&
firstDelayedTask.startTime <= currentTime
);
let timer = peek(timerQueue);
while (timer !== null) {
if (timer.callback === null) {
// Timer was cancelled.
pop(timerQueue);
} else if (timer.startTime <= currentTime) {
// Timer fired. Transfer to the task queue.
pop(timerQueue);
timer.sortIndex = timer.expirationTime;
push(taskQueue, timer);
} else {
// Remaining timers are pending.
return;
}
timer = peek(timerQueue);
}
}

Expand All @@ -211,14 +161,14 @@ function handleTimeout(currentTime) {
advanceTimers(currentTime);

if (!isHostCallbackScheduled) {
if (firstTask !== null) {
if (peek(taskQueue) !== null) {
isHostCallbackScheduled = true;
requestHostCallback(flushWork);
} else if (firstDelayedTask !== null) {
requestHostTimeout(
handleTimeout,
firstDelayedTask.startTime - currentTime,
);
} else {
const firstTimer = peek(timerQueue);
if (firstTimer !== null) {
requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
}
}
}
}
Expand Down Expand Up @@ -246,38 +196,53 @@ function flushWork(hasTimeRemaining, initialTime) {
// Flush all the expired callbacks without yielding.
// TODO: Split flushWork into two separate functions instead of using
// a boolean argument?
let task = peek(taskQueue);
while (
firstTask !== null &&
firstTask.expirationTime <= currentTime &&
task !== null &&
task.expirationTime <= currentTime &&
!(enableSchedulerDebugging && isSchedulerPaused)
) {
flushTask(firstTask, currentTime);
flushTask(task, currentTime);
// If the task completed, remove it from the queue. Need to confirm
// that it's still the first task in the queue, in case additional
// tasks were scheduled.
if (task === peek(taskQueue) && task.callback === null) {
pop(taskQueue);
}
currentTime = getCurrentTime();
advanceTimers(currentTime);
task = peek(taskQueue);
}
} else {
// Keep flushing callbacks until we run out of time in the frame.
if (firstTask !== null) {
let task = peek(taskQueue);
if (task !== null) {
do {
flushTask(firstTask, currentTime);
flushTask(task, currentTime);
// If the task completed, remove it from the queue. Need to confirm
// that it's still the first task in the queue, in case additional
// tasks were scheduled.
if (task === peek(taskQueue) && task.callback === null) {
pop(taskQueue);
}
currentTime = getCurrentTime();
advanceTimers(currentTime);
task = peek(taskQueue);
} while (
firstTask !== null &&
task !== null &&
!shouldYieldToHost() &&
!(enableSchedulerDebugging && isSchedulerPaused)
);
}
}
// Return whether there's additional work
let firstTask = peek(taskQueue);
if (firstTask !== null) {
return true;
} else {
if (firstDelayedTask !== null) {
requestHostTimeout(
handleTimeout,
firstDelayedTask.startTime - currentTime,
);
let firstTimer = peek(timerQueue);
if (firstTimer !== null) {
requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
}
return false;
}
Expand Down Expand Up @@ -388,18 +353,19 @@ function unstable_scheduleCallback(priorityLevel, callback, options) {
var expirationTime = startTime + timeout;

var newTask = {
id: taskIdCounter++,
callback,
priorityLevel,
startTime,
expirationTime,
next: null,
previous: null,
sortIndex: -1,
};

if (startTime > currentTime) {
// This is a delayed task.
insertDelayedTask(newTask, startTime);
if (firstTask === null && firstDelayedTask === newTask) {
newTask.sortIndex = startTime;
push(timerQueue, newTask);
if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
// All tasks are delayed, and this is the task with the earliest delay.
if (isHostTimeoutScheduled) {
// Cancel an existing timeout.
Expand All @@ -411,7 +377,8 @@ function unstable_scheduleCallback(priorityLevel, callback, options) {
requestHostTimeout(handleTimeout, startTime - currentTime);
}
} else {
insertScheduledTask(newTask, expirationTime);
newTask.sortIndex = expirationTime;
push(taskQueue, newTask);
// Schedule a host callback, if needed. If we're already performing work,
// wait until the next time we yield.
if (!isHostCallbackScheduled && !isPerformingWork) {
Expand All @@ -423,74 +390,6 @@ function unstable_scheduleCallback(priorityLevel, callback, options) {
return newTask;
}

function insertScheduledTask(newTask, expirationTime) {
// Insert the new task into the list, ordered first by its timeout, then by
// insertion. So the new task is inserted after any other task the
// same timeout
if (firstTask === null) {
// This is the first task in the list.
firstTask = newTask.next = newTask.previous = newTask;
} else {
var next = null;
var task = firstTask;
do {
if (expirationTime < task.expirationTime) {
// The new task times out before this one.
next = task;
break;
}
task = task.next;
} while (task !== firstTask);

if (next === null) {
// No task with a later timeout was found, which means the new task has
// the latest timeout in the list.
next = firstTask;
} else if (next === firstTask) {
// The new task has the earliest expiration in the entire list.
firstTask = newTask;
}

var previous = next.previous;
previous.next = next.previous = newTask;
newTask.next = next;
newTask.previous = previous;
}
}

function insertDelayedTask(newTask, startTime) {
// Insert the new task into the list, ordered by its start time.
if (firstDelayedTask === null) {
// This is the first task in the list.
firstDelayedTask = newTask.next = newTask.previous = newTask;
} else {
var next = null;
var task = firstDelayedTask;
do {
if (startTime < task.startTime) {
// The new task times out before this one.
next = task;
break;
}
task = task.next;
} while (task !== firstDelayedTask);

if (next === null) {
// No task with a later timeout was found, which means the new task has
// the latest timeout in the list.
next = firstDelayedTask;
} else if (next === firstDelayedTask) {
// The new task has the earliest expiration in the entire list.
firstDelayedTask = newTask;
}

var previous = next.previous;
previous.next = next.previous = newTask;
newTask.next = next;
newTask.previous = previous;
}
}

function unstable_pauseExecution() {
isSchedulerPaused = true;
}
Expand All @@ -504,34 +403,14 @@ function unstable_continueExecution() {
}

function unstable_getFirstCallbackNode() {
return firstTask;
return peek(taskQueue);
}

function unstable_cancelCallback(task) {
var next = task.next;
if (next === null) {
// Already cancelled.
return;
}

if (task === next) {
if (task === firstTask) {
firstTask = null;
} else if (task === firstDelayedTask) {
firstDelayedTask = null;
}
} else {
if (task === firstTask) {
firstTask = next;
} else if (task === firstDelayedTask) {
firstDelayedTask = next;
}
var previous = task.previous;
previous.next = next;
next.previous = previous;
}

task.next = task.previous = null;
// Null out the callback to indicate the task has been canceled. (Can't remove
// from the queue because you can't remove arbitrary nodes from an array based
// heap, only the first one.)
task.callback = null;
}

function unstable_getCurrentPriorityLevel() {
Expand All @@ -541,9 +420,12 @@ function unstable_getCurrentPriorityLevel() {
function unstable_shouldYield() {
const currentTime = getCurrentTime();
advanceTimers(currentTime);
const firstTask = peek(taskQueue);
return (
(currentTask !== null &&
(firstTask !== currentTask &&
currentTask !== null &&
firstTask !== null &&
firstTask.callback !== null &&
firstTask.startTime <= currentTime &&
firstTask.expirationTime < currentTask.expirationTime) ||
shouldYieldToHost()
Expand Down
Loading

0 comments on commit 4760cbc

Please sign in to comment.