Skip to content

Commit

Permalink
Spill partitioner data to disk during Rel table copies (kuzudb#4188)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4ba09ea)
  • Loading branch information
benjaminwinger authored and wangqiang committed Oct 8, 2024
1 parent 88b0b84 commit 15b09b6
Show file tree
Hide file tree
Showing 37 changed files with 516 additions and 101 deletions.
4 changes: 2 additions & 2 deletions src/common/in_mem_overflow_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ BufferBlock::BufferBlock(std::unique_ptr<storage::MemoryBuffer> block)
BufferBlock::~BufferBlock() = default;

uint64_t BufferBlock::size() const {
return block->buffer.size();
return block->getBuffer().size();
}

uint8_t* BufferBlock::data() const {
return block->buffer.data();
return block->getBuffer().data();
}

uint8_t* InMemOverflowBuffer::allocateSpace(uint64_t size) {
Expand Down
6 changes: 3 additions & 3 deletions src/function/gds/gds_frontier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PathLengths::PathLengths(std::unordered_map<common::table_id_t, uint64_t> nodeTa
nodeTableIDAndNumNodesMap[tableID] = numNodes;
auto memBuffer = mm->allocateBuffer(false, numNodes * sizeof(std::atomic<uint16_t>));
std::atomic<uint16_t>* memBufferPtr =
reinterpret_cast<std::atomic<uint16_t>*>(memBuffer.get()->buffer.data());
reinterpret_cast<std::atomic<uint16_t>*>(memBuffer.get()->getData());
for (uint64_t i = 0; i < numNodes; ++i) {
memBufferPtr[i].store(UNVISITED, std::memory_order_relaxed);
}
Expand All @@ -60,7 +60,7 @@ void PathLengths::fixCurFrontierNodeTable(common::table_id_t tableID) {
KU_ASSERT(masks.contains(tableID));
curTableID.store(tableID, std::memory_order_relaxed);
curFrontierFixedMask.store(
reinterpret_cast<std::atomic<uint16_t>*>(masks.at(tableID).get()->buffer.data()),
reinterpret_cast<std::atomic<uint16_t>*>(masks.at(tableID).get()->getData()),
std::memory_order_relaxed);
maxNodesInCurFrontierFixedMask.store(
nodeTableIDAndNumNodesMap[curTableID.load(std::memory_order_relaxed)],
Expand All @@ -70,7 +70,7 @@ void PathLengths::fixCurFrontierNodeTable(common::table_id_t tableID) {
void PathLengths::fixNextFrontierNodeTable(common::table_id_t tableID) {
KU_ASSERT(masks.contains(tableID));
nextFrontierFixedMask.store(
reinterpret_cast<std::atomic<uint16_t>*>(masks.at(tableID).get()->buffer.data()),
reinterpret_cast<std::atomic<uint16_t>*>(masks.at(tableID).get()->getData()),
std::memory_order_relaxed);
}

Expand Down
4 changes: 4 additions & 0 deletions src/include/common/file_system/virtual_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ namespace main {
class Database;
}

namespace storage {
class BufferManager;
};
namespace common {

class KUZU_API VirtualFileSystem final : public FileSystem {
friend class storage::BufferManager;

public:
VirtualFileSystem();
Expand Down
4 changes: 2 additions & 2 deletions src/include/function/gds/gds_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ObjectBlock {
}

private:
T* getData() const { return reinterpret_cast<T*>(block->buffer.data()); }
T* getData() const { return reinterpret_cast<T*>(block->getData()); }

private:
std::unique_ptr<storage::MemoryBuffer> block;
Expand All @@ -53,7 +53,7 @@ class ObjectArraysMap {

T* getData(common::table_id_t tableID) const {
KU_ASSERT(bufferPerTable.contains(tableID));
return reinterpret_cast<T*>(bufferPerTable.at(tableID)->buffer.data());
return reinterpret_cast<T*>(bufferPerTable.at(tableID)->getData());
}

private:
Expand Down
2 changes: 2 additions & 0 deletions src/include/main/db_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <optional>
#include <string>

#include "common/types/value/value.h"
Expand Down Expand Up @@ -60,6 +61,7 @@ struct DBConfig {
bool autoCheckpoint;
uint64_t checkpointThreshold;
bool forceCheckpointOnClose;
std::optional<std::string> spillToDiskTmpFile;

explicit DBConfig(const SystemConfig& systemConfig);

Expand Down
12 changes: 12 additions & 0 deletions src/include/main/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,17 @@ struct ForceCheckpointClosingDBSetting {
}
};

struct SpillToDiskFileSetting {
static constexpr auto name = "spill_to_disk_tmp_file";
static constexpr auto inputType = common::LogicalTypeID::STRING;
static void setContext(ClientContext* context, const common::Value& parameter) {
parameter.validateType(inputType);
context->getDBConfigUnsafe()->spillToDiskTmpFile = parameter.getValue<std::string>();
}
static common::Value getSetting(const ClientContext* context) {
return common::Value::createValue(context->getDBConfig()->spillToDiskTmpFile);
}
};

} // namespace main
} // namespace kuzu
30 changes: 22 additions & 8 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ struct PartitionerSharedState {
storage::MemoryManager& mm;

explicit PartitionerSharedState(storage::MemoryManager& mm)
: mtx{}, srcNodeTable{nullptr}, dstNodeTable{nullptr}, relTable(nullptr), mm{mm} {}
: mtx{}, srcNodeTable{nullptr}, dstNodeTable{nullptr}, relTable(nullptr), mm{mm},
maxNodeOffsets{0, 0}, numPartitions{0, 0}, nextPartitionIdx{0} {}

static constexpr size_t DIRECTIONS = 2;
// FIXME(Guodong): we should not maintain maxNodeOffsets.
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
std::vector<common::partition_idx_t> numPartitions; // num of partitions in each direction.
std::array<common::offset_t, DIRECTIONS> maxNodeOffsets; // max node offset in each direction.
std::array<common::partition_idx_t, DIRECTIONS>
numPartitions; // num of partitions in each direction.
std::vector<std::unique_ptr<PartitioningBuffer>> partitioningBuffers;
common::partition_idx_t nextPartitionIdx = 0;
std::atomic<common::partition_idx_t> nextPartitionIdx;
// In copy rdf, we need to access num nodes before it is available in statistics.
std::vector<std::shared_ptr<BatchInsertSharedState>> nodeBatchInsertSharedStates;

Expand All @@ -64,11 +67,21 @@ struct PartitionerSharedState {
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

storage::InMemChunkedNodeGroupCollection& getPartitionBuffer(common::idx_t partitioningIdx,
common::partition_idx_t partitionIdx) const {
// Must only be called once for any given parameters.
// The data gets moved out of the shared state since some of it may be spilled to disk and will
// need to be freed after its processed.
std::unique_ptr<storage::InMemChunkedNodeGroupCollection> getPartitionBuffer(
common::idx_t partitioningIdx, common::partition_idx_t partitionIdx) const {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return *partitioningBuffers[partitioningIdx]->partitions[partitionIdx];

KU_ASSERT(partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get());
auto partitioningBuffer =
std::move(partitioningBuffers[partitioningIdx]->partitions[partitionIdx]);
// This may still run out of memory if there isn't enough space for one partitioningBuffer
// per thread
partitioningBuffer->loadFromDisk(mm);
return partitioningBuffer;
}
};

Expand Down Expand Up @@ -162,7 +175,8 @@ class Partitioner final : public Sink {

static void initializePartitioningStates(const PartitionerDataInfo& dataInfo,
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
const std::vector<common::partition_idx_t>& numPartitions);
const std::array<common::partition_idx_t, PartitionerSharedState::DIRECTIONS>&
numPartitions);

private:
common::DataChunk constructDataChunk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct RelBatchInsertPrintInfo final : OPPrintInfo {
};

struct RelBatchInsertProgressSharedState {
uint64_t partitionsDone;
std::atomic<uint64_t> partitionsDone;
uint64_t partitionsTotal;

RelBatchInsertProgressSharedState() : partitionsDone{0}, partitionsTotal{0} {};
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ class DataBlock {
block = mm->allocateBuffer(true /* initializeToZero */, size);
}

uint8_t* getData() const { return block->buffer.data(); }
uint8_t* getWritableData() const { return block->buffer.last(freeSize).data(); }
uint8_t* getData() const { return block->getBuffer().data(); }
uint8_t* getWritableData() const { return block->getBuffer().last(freeSize).data(); }
void resetNumTuplesAndFreeSize() {
freeSize = block->buffer.size();
freeSize = block->getBuffer().size();
numTuples = 0;
}
void resetToZero() { memset(block->buffer.data(), 0, block->buffer.size()); }
void resetToZero() { memset(block->getBuffer().data(), 0, block->getBuffer().size()); }

static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy,
Expand Down
30 changes: 23 additions & 7 deletions src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@
#include "storage/file_handle.h"

namespace kuzu {
namespace common {
class VirtualFileSystem;
};
namespace testing {
class EmptyBufferManagerTest;
};
namespace storage {
class ChunkedNodeGroup;
class Spiller;

// This class keeps state info for pages potentially can be evicted.
// The page state of a candidate is set to MARKED when it is first enqueued. After enqueued, if the
Expand Down Expand Up @@ -165,14 +173,14 @@ class EvictionQueue {
* Umbra's design in his CS 848 course project:
* https://github.com/fabubaker/kuzu/blob/umbra-bm/final_project_report.pdf.
*/

class BufferManager {
friend class FileHandle;
friend class MemoryManager;

public:
BufferManager(uint64_t bufferPoolSize, uint64_t maxDBSize);
~BufferManager() = default;
BufferManager(const std::string& databasePath, const std::string& spillToDiskPath,
uint64_t bufferPoolSize, uint64_t maxDBSize, common::VirtualFileSystem* vfs, bool readOnly);
~BufferManager();

// Currently, these functions are specifically used only for WAL files.
void removeFilePagesFromFrames(FileHandle& fileHandle);
Expand All @@ -190,6 +198,12 @@ class BufferManager {

uint64_t getUsedMemory() const { return usedMemory; }

void getSpillerOrSkip(std::function<void(Spiller&)> func) {
if (spiller) {
return func(*spiller);
}
}

private:
uint8_t* pin(FileHandle& fileHandle, common::page_idx_t pageIdx,
PageReadPolicy pageReadPolicy = PageReadPolicy::READ_PAGE);
Expand Down Expand Up @@ -219,10 +233,7 @@ class BufferManager {
PageReadPolicy pageReadPolicy);
void removePageFromFrame(FileHandle& fileHandle, common::page_idx_t pageIdx, bool shouldFlush);

uint64_t freeUsedMemory(uint64_t size) {
KU_ASSERT(usedMemory.load() >= size);
return usedMemory.fetch_sub(size);
}
uint64_t freeUsedMemory(uint64_t size);

void releaseFrameForPage(FileHandle& fileHandle, common::page_idx_t pageIdx) {
vmRegions[fileHandle.getPageSizeClass()]->releaseFrame(fileHandle.getFrameIdx(pageIdx));
Expand All @@ -233,11 +244,16 @@ class BufferManager {
private:
std::atomic<uint64_t> bufferPoolSize;
EvictionQueue evictionQueue;
// Total memory used
std::atomic<uint64_t> usedMemory;
// Amount of memory used which cannot be evicted
std::atomic<uint64_t> nonEvictableMemory;
// Each VMRegion corresponds to a virtual memory region of a specific page size. Currently, we
// hold two sizes of REGULAR_PAGE and TEMP_PAGE.
std::vector<std::unique_ptr<VMRegion>> vmRegions;
std::vector<std::unique_ptr<FileHandle>> fileHandles;
std::unique_ptr<Spiller> spiller;
common::VirtualFileSystem* vfs;
};

} // namespace storage
Expand Down
38 changes: 27 additions & 11 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
#include <span>

namespace kuzu {
namespace main {
class ClientContext;
}

namespace common {
class VirtualFileSystem;
Expand All @@ -23,20 +20,38 @@ namespace storage {
class MemoryManager;
class FileHandle;
class BufferManager;
class ChunkedNodeGroup;

class MemoryBuffer {
friend class Spiller;

class KUZU_API MemoryBuffer {
public:
MemoryBuffer(MemoryManager* mm, common::page_idx_t blockIdx, uint8_t* buffer,
KUZU_API MemoryBuffer(MemoryManager* mm, common::page_idx_t blockIdx, uint8_t* buffer,
uint64_t size = common::TEMP_PAGE_SIZE);
~MemoryBuffer();
KUZU_API ~MemoryBuffer();
DELETE_COPY_AND_MOVE(MemoryBuffer);

uint8_t* getData() const { return buffer.data(); }
std::span<uint8_t> getBuffer() const {
KU_ASSERT(!evicted);
return buffer;
}
uint8_t* getData() const { return getBuffer().data(); }

public:
MemoryManager* getMemoryManager() const { return mm; }

private:
// Can be called multiple times safely
void prepareLoadFromDisk();

// Must only be called once before loading from disk
void setSpilledToDisk(uint64_t filePosition);

private:
std::span<uint8_t> buffer;
common::page_idx_t pageIdx;
uint64_t filePosition = UINT64_MAX;
MemoryManager* mm;
common::page_idx_t pageIdx;
bool evicted;
};

/*
Expand All @@ -58,9 +73,9 @@ class KUZU_API MemoryManager {
friend class MemoryBuffer;

public:
MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs, main::ClientContext* context);
MemoryManager(BufferManager* bm, common::VirtualFileSystem* vfs);

~MemoryManager();
~MemoryManager() = default;

std::unique_ptr<MemoryBuffer> mallocBuffer(bool initializeToZero, uint64_t size);
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
Expand All @@ -71,6 +86,7 @@ class KUZU_API MemoryManager {

private:
void freeBlock(common::page_idx_t pageIdx, std::span<uint8_t> buffer);
std::span<uint8_t> mallocBufferInternal(bool initializeToZero, uint64_t size);

private:
FileHandle* fh;
Expand Down
39 changes: 39 additions & 0 deletions src/include/storage/buffer_manager/spiller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <cstdint>

#include "storage/file_handle.h"

namespace kuzu {
namespace common {
class VirtualFileSystem;
};
namespace storage {
class ChunkedNodeGroup;

class BufferManager;
class ColumnChunkData;
class Spiller {
public:
Spiller(const std::string& tmpFilePath, BufferManager& bufferManager,
common::VirtualFileSystem* vfs);
void addUnusedChunk(ChunkedNodeGroup* nodeGroup);
void clearUnusedChunk(ChunkedNodeGroup* nodeGroup);
uint64_t spillToDisk(ColumnChunkData& chunk) const;
void loadFromDisk(ColumnChunkData& chunk) const;
// reclaims memory from the next full partitioner group in the set
// and returns the amount of memory reclaimed
// If the set is empty, returns zero
uint64_t claimNextGroup();
// Must only be used once all chunks have been loaded from disk.
void clearFile();
~Spiller();

private:
std::mutex partitionerGroupsMtx;
std::unordered_set<ChunkedNodeGroup*> fullPartitionerGroups;
FileHandle* dataFH;
};

} // namespace storage
} // namespace kuzu
7 changes: 4 additions & 3 deletions src/include/storage/file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ class FileHandle {
common::page_idx_t getNumPages() const { return numPages; }
common::FileInfo* getFileInfo() const { return fileInfo.get(); }

uint64_t getPageSize() const {
return isLargePaged() ? common::TEMP_PAGE_SIZE : common::PAGE_SIZE;
}

private:
bool isLargePaged() const { return flags & isLargePagedMask; }
bool isNewTmpFile() const { return flags & isNewInMemoryTmpFileMask; }
bool isReadOnlyFile() const { return flags & isReadOnlyMask; }
bool createFileIfNotExists() const { return flags & createIfNotExistsMask; }
uint64_t getPageSize() const {
return isLargePaged() ? common::TEMP_PAGE_SIZE : common::PAGE_SIZE;
}

common::page_idx_t addNewPageWithoutLock();
void constructExistingFileHandle(const std::string& path, common::VirtualFileSystem* vfs,
Expand Down
Loading

0 comments on commit 15b09b6

Please sign in to comment.