Skip to content

Commit

Permalink
Merge branch 'howe/kcore' of https://github.com/kuzudb/kuzu into howe…
Browse files Browse the repository at this point in the history
…/kcore
  • Loading branch information
WWW0030 committed Dec 14, 2024
2 parents 87c411b + fcc41b6 commit 7ce64b5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
36 changes: 21 additions & 15 deletions src/function/gds/K_Core_Decomposition.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <iostream>

#include "binder/binder.h"
#include "common/types/types.h"
#include "function/gds/gds_frontier.h"
Expand Down Expand Up @@ -32,9 +34,7 @@ class KUZU_API KCoreFrontierPair : public FrontierPair {
}
}

void initRJFromSource(common::nodeID_t source) override {
setActiveNodesForNextIter();
};
void initRJFromSource(common::nodeID_t source) override { setActiveNodesForNextIter(); };

void pinCurrFrontier(common::table_id_t tableID) override {
FrontierPair::pinCurrFrontier(tableID);
Expand All @@ -60,17 +60,20 @@ class KUZU_API KCoreFrontierPair : public FrontierPair {
updateSmallestDegree();
curDenseFrontier->ptrCast<PathLengths>()->incrementCurIter();
nextDenseFrontier->ptrCast<PathLengths>()->incrementCurIter();
}
}

uint64_t addToVertexDegree(common::nodeID_t nodeID, uint64_t degreeToAdd) {
return curVertexValues.getData(nodeID.tableID)[nodeID.offset].fetch_add(degreeToAdd, std::memory_order_relaxed);
return curVertexValues.getData(nodeID.tableID)[nodeID.offset].fetch_add(degreeToAdd,
std::memory_order_relaxed);
}

uint64_t removeFromVertex(common::nodeID_t nodeID) {
int curSmallest = curSmallestDegree.load(std::memory_order_relaxed);
int curVertexDegree = curVertexValues.getData(nodeID.tableID)[nodeID.offset].load(std::memory_order_relaxed);
int curVertexDegree =
curVertexValues.getData(nodeID.tableID)[nodeID.offset].load(std::memory_order_relaxed);
if (curVertexDegree > curSmallest) {
return curVertexValues.getData(nodeID.tableID)[nodeID.offset].fetch_sub(1, std::memory_order_relaxed);
return curVertexValues.getData(nodeID.tableID)[nodeID.offset].fetch_sub(1,
std::memory_order_relaxed);
}
return curVertexDegree;
}
Expand Down Expand Up @@ -121,7 +124,6 @@ class KUZU_API KCoreFrontierPair : public FrontierPair {
return vertexValues[offset].load(std::memory_order_relaxed);
}


private:
bool updateDegreeFlag = false;
bool updated = false;
Expand Down Expand Up @@ -176,10 +178,11 @@ struct KCoreEdgeCompute : public EdgeCompute {
}
};

class KCoreOutputWriter : GDSOutputWriter{
class KCoreOutputWriter : GDSOutputWriter {
public:
explicit KCoreOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask, KCoreFrontierPair* frontierPair)
: GDSOutputWriter{context, outputNodeMask}, frontierPair{frontierPair} {
explicit KCoreOutputWriter(main::ClientContext* context,
processor::NodeOffsetMaskMap* outputNodeMask, KCoreFrontierPair* frontierPair)
: GDSOutputWriter{context, outputNodeMask}, frontierPair{frontierPair} {
nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager());
kValueVector = createVector(LogicalType::UINT64(), context->getMemoryManager());
}
Expand Down Expand Up @@ -211,17 +214,20 @@ class KCoreOutputWriter : GDSOutputWriter{

class KCoreVertexCompute : public VertexCompute {
public:
KCoreVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState, std::unique_ptr<KCoreOutputWriter> outputWriter)
KCoreVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState,
std::unique_ptr<KCoreOutputWriter> outputWriter)
: mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} {
localFT = sharedState->claimLocalTable(mm);
}
~KCoreVertexCompute() override { sharedState->returnLocalTable(localFT); }

bool beginOnTable(common::table_id_t tableID) override {
bool beginOnTable(common::table_id_t tableID) override {
outputWriter->pinTableID(tableID);
return true; }
return true;
}

void vertexCompute(common::offset_t startOffset, common::offset_t endOffset, common::table_id_t tableID) override {
void vertexCompute(common::offset_t startOffset, common::offset_t endOffset,
common::table_id_t tableID) override {
outputWriter->materialize(startOffset, endOffset, tableID, *localFT);
}

Expand Down
1 change: 1 addition & 0 deletions src/function/gds/gds_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "function/gds/gds_utils.h"

#include <iostream>
#include <optional>

#include "common/task_system/task_scheduler.h"
#include "function/gds/gds_task.h"
Expand Down

0 comments on commit 7ce64b5

Please sign in to comment.