Skip to content

Commit

Permalink
[core][object spillin] Fix bugs in admission control (#13781)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang authored and Alex committed Feb 11, 2021
1 parent 7779637 commit 311cf74
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class ObjectInfoAccessor {
/// \return Status
virtual Status AsyncAddSpilledUrl(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &spilled_node_id,
const NodeID &spilled_node_id, size_t object_size,
const StatusCallback &callback) = 0;

/// Remove location of object from GCS asynchronously.
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,14 +1102,15 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i

Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
const ObjectID &object_id, const std::string &spilled_url,
const NodeID &spilled_node_id, const StatusCallback &callback) {
const NodeID &spilled_node_id, size_t object_size, const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id
<< ", spilled_url = " << spilled_url
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_spilled_url(spilled_url);
request.set_spilled_node_id(spilled_node_id.Binary());
request.set_size(object_size);

auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().AddObjectLocation(
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
size_t object_size, const StatusCallback &callback) override;

Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const NodeID &node_id,
const NodeID &node_id, size_t object_size,
const StatusCallback &callback) override;

Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id,
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, ObjectTableEnt
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
eviction_policy_.BeginObjectAccess(object_id);
num_bytes_in_use_ += entry->data_size + entry->metadata_size;
}
// Increase reference count.
entry->ref_count++;
Expand Down Expand Up @@ -537,6 +538,7 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id,
// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
if (entry->ref_count == 0) {
num_bytes_in_use_ -= entry->data_size + entry->metadata_size;
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id;
if (deletion_cache_.count(object_id) == 0) {
// Tell the eviction policy that this object is no longer being used.
Expand Down
7 changes: 5 additions & 2 deletions src/ray/object_manager/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ class PlasmaStore {
void ProcessCreateRequests();

void GetAvailableMemory(std::function<void(size_t)> callback) const {
size_t available =
PlasmaAllocator::GetFootprintLimit() - eviction_policy_.GetPinnedMemoryBytes();
int64_t num_bytes_in_use = static_cast<int64_t>(num_bytes_in_use_);
RAY_CHECK(PlasmaAllocator::GetFootprintLimit() >= num_bytes_in_use);
size_t available = PlasmaAllocator::GetFootprintLimit() - num_bytes_in_use;
callback(available);
}

Expand Down Expand Up @@ -313,6 +314,8 @@ class PlasmaStore {
/// interface that node manager or object manager can access the plasma store with this
/// mutex if it is not absolutely necessary.
std::recursive_mutex mutex_;

size_t num_bytes_in_use_ = 0;
};

} // namespace plasma
10 changes: 8 additions & 2 deletions src/ray/object_manager/pull_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,17 @@ void PullManager::OnLocationChange(const ObjectID &object_id,
it->second.spilled_url = spilled_url;
it->second.spilled_node_id = spilled_node_id;
if (!it->second.object_size_set) {
RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size
<< ", num bytes being pulled is now " << num_bytes_being_pulled_;
it->second.object_size = object_size;
it->second.object_size_set = true;
UpdatePullsBasedOnAvailableMemory(num_bytes_available_);
RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size
<< ", num bytes being pulled is now " << num_bytes_being_pulled_;
if (it->second.object_size == 0) {
RAY_LOG(WARNING) << "Size of object " << object_id
<< " stored in object store is zero. This may be a bug since "
"objects in the object store should be large, and can result "
"in too many objects being fetched to this node";
}
}
RAY_LOG(DEBUG) << "OnLocationChange " << spilled_url << " num clients "
<< client_ids.size();
Expand Down
6 changes: 5 additions & 1 deletion src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,15 @@ void LocalObjectManager::AddSpilledUrls(
// don't need to report where this object is spilled.
const auto node_id_object_spilled =
is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil();

auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());

// Write to object directory. Wait for the write to finish before
// releasing the object to make sure that the spilled object can
// be retrieved by other raylets.
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
object_id, object_url, node_id_object_spilled,
object_id, object_url, node_id_object_spilled, it->second->GetSize(),
[this, object_id, object_url, callback, num_remaining](Status status) {
RAY_CHECK_OK(status);
// Unpin the object.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class MockObjectInfoAccessor : public gcs::ObjectInfoAccessor {
size_t object_size, const gcs::StatusCallback &callback));

Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const NodeID &spilled_node_id,
const NodeID &spilled_node_id, size_t object_size,
const gcs::StatusCallback &callback) {
object_urls[object_id] = spilled_url;
callbacks.push_back(callback);
Expand Down

0 comments on commit 311cf74

Please sign in to comment.