-
Notifications
You must be signed in to change notification settings - Fork 978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kademlia: Somewhat complete the records implementation. #1189
Conversation
))); | ||
return; | ||
/// The result of this operation is delivered in [`KademliaEvent::GetRecordResult`]. | ||
pub fn get_record(&mut self, key: &Multihash, quorum: Quorum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we shouldn't be handing out to users the Multihash
types in the api, since it would make a lot of code in substrate depentable on Multihash
and different places in code can hash things differently and so on. What if the api took bytes and did the hashing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Choosing multihashes as record keys was done in your original PR and didn't change here. I think it would be fine (even good) to further generalise over the record key type, but that should be done in another PR - this one is big enough as it is.
What if the api took bytes and did the hashing?
Not sure what you mean by that. Whatever the key is, it is always hashed into the Kademlia keyspace, see protocols/kad/kbucket/key.rs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what it's worth, I have a branch here that sits on top of this PR and generalises record keys. I may open a PR for that once / if this PR is merged.
a1f3f71
to
28aee24
Compare
This commit relates to [libp2p-146] and [libp2p-1089]. * All records expire (by default, configurable). * Provider records are also stored in the RecordStore, and the RecordStore API extended. * Background jobs for periodic (re-)replication and (re-)publication of records. Regular (value-)records are subject to re-replication and re-publication as per standard Kademlia. Provider records are only subject to re-publication. * For standard Kademlia value lookups (quorum = 1), the record is cached at the closest peer to the key that did not return the value, as per standard Kademlia. * Expiration times of regular (value-)records is computed exponentially inversely proportional to the number of nodes between the local node and the closest node known to the key (beyond the k closest), as per standard Kademlia. The protobuf messages are extended with two fields: `ttl` and `publisher` in order to implement the different semantics of re-replication (by any of the k closest peers to the key, not affecting expiry) and re-publication (by the original publisher, resetting the expiry). This is not done yet in other libp2p Kademlia implementations, see e.g. [libp2p-go-323]. The new protobuf fields have been given somewhat unique identifiers to prevent future collision. Similarly, periodic re-publication of provider records does not seem to be done yet in other implementations, see e.g. [libp2p-js-98]. [libp2p-146]: libp2p#146 [libp2p-1089]: libp2p#1089 [libp2p-go-323]: libp2p/go-libp2p-kad-dht#323 [libp2p-js-98]: libp2p/js-libp2p-kad-dht#98
To ensure task notification, since `NotReady` is returned right after.
In order for a user to easily distinguish the result of e.g. a `put_record` operation from the result of a later republication, different event constructors are used. Furthermore, for now, re-replication and "caching" of records (at the closest peer to the key that did not return a value during a successful lookup) do not yield events for now as they are less interesting.
28aee24
to
8111417
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not finished the review, but left some comments/questions.
#[derive(Debug)] | ||
enum PeriodicJobState<T> { | ||
Running(T), | ||
Waiting(Delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why contains PeriodicJobState::Waiting
a future instead of a deadline value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that when a job is polled (see PutRecordJob::poll
and AddProviderJob::poll
) and returns NotReady
the current task is woken up when the job is ready to run, since there may be nothing else going on otherwise. This polling of the jobs is done in Kademlia::poll
. Does that make sense or did you have something else in mind with your question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, thanks. It is a bit subtle and is_ready
does not convey that behind the scenes there is some task registration that will eventually cause poll to be invoked again.
* Guard a node against overriding records for which it considers itself to be the publisher. * Document the jobs module more extensively.
//! whilst (re-)replication primarily ensures persistence for the duration | ||
//! of the TTL in the light of topology changes. Consequently, replication | ||
//! intervals should be shorter than publication intervals and | ||
//! publication intervals should be shorter than the TTL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deviates somewhat from the paper I think. In there, re-publishing is done once per hour on each node and every 24 hours by the owner, or else the key expires. Replication is not periodic but happens when a node discovers a new node closer to some of its keys. Then, those key-value pairs will be replicated to the new node. I have not found anything about "re-replication" in the paper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is just a matter of terminology. In order to distinguish the hourly re-publishing done by every node from the 24h re-publishing done by the node that is the "original publisher", I call the former re-replication. The replication of values to a newly joined node can of course be given separate attention, but since that is also covered by the hourly re-republication / re-replication, I didn't want to over-complicate things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my choice of naming is partly derived from the choice of naming of the constants for the intervals in this design specification.
#[derive(Debug)] | ||
enum PeriodicJobState<T> { | ||
Running(T), | ||
Waiting(Delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, thanks. It is a bit subtle and is_ready
does not convey that behind the scenes there is some task registration that will eventually cause poll to be invoked again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits and questions ; the PR in general looks excellent!
@@ -271,6 +270,8 @@ impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr { | |||
/// Event to send to the handler. | |||
#[derive(Debug)] | |||
pub enum KademliaHandlerIn<TUserData> { | |||
Reset(KademliaRequestId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs some documentation saying what that does.
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) { | ||
match message { | ||
KademliaHandlerIn::Reset(request_id) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just wait for the requests to be answered or to time out instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently used when the node receives a request to store a record, but for some reason (e.g. a storage error) is unable to (or refuses to) store it. We can just let the request time out on the remote, which is what was done before but there was a TODO left for it (by me). I thought it would be preferable to signal errors as quick as possible to not cause queries unnecessary delay in the normal case, and since the protocol itself has no explicit error responses, resetting the stream seemed the only option. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, I misunderstood the purpose.
fn remove(&'a mut self, k: &Multihash); | ||
|
||
/// Gets an iterator over all (value-) records currently stored. | ||
fn records(&'a self) -> Self::RecordsIter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that a good idea? It works for an in-memory records store, but not a records store that stores them on disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think it can only be implemented by an in-memory store? That is certainly not my intention, but every record store must provide a means to iterate through all records - that is simply a requirement of the Kademlia protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My impression is that the minimum requirement would be an iterator over the record keys/ids. My concerns is that the iterator also contains tons of metadata as well as the record's content, which might not be desirable to keep in memory.
This is a minor concern though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - since I implemented the optimisation from the paper that records are skipped from hourly replication if they have recently been received from another peer (i.e. that effectively only 1 of the k
peers closest to a key replicates a record hourly in most cases) just iterating over the keys may indeed be beneficial in terms of memory use, albeit requiring separate lookups from the storage for every record that is not skipped. We should certainly keep that in mind as a trade-off to (re)consider.
protocols/kad/src/addresses.rs
Outdated
/// | ||
/// An address should only be removed if is determined to be invalid or | ||
/// otherwise unreachable. | ||
pub fn remove(&mut self, addr: &Multiaddr) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about returning -> Result<(), ()>
instead? IMO it's more explicit that Ok
means that the removal was successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
|
||
let now = Instant::now(); | ||
|
||
// Calculate the expiration exponentially inversely proportional to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't even know that was in the paper.
type ProvidedIter: Iterator<Item = Cow<'a, ProviderRecord>>; | ||
|
||
/// Gets a record from the store, given its key. | ||
fn get(&'a self, k: &Multihash) -> Option<Cow<Record>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should eventually be asynchronous, but this would probably be extremely hard to implement as long as we don't have async/await, and not worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the same - besides we have #1159 - but I am myself not sure whether the complications w.r.t. the implementation of the Kademlia behaviour that would come along with such a change would be worth it, especially not before async/await. I think a first step would be to actually allow even RecordStore
implementations using synchronous I/O without unwrap
ing, which will require all trait methods to return a Result
and store::Error
would need a constructor for io::Error
. I left that for later as well.
Change the semantics of `Addresses::remove` so that the error case is unambiguous, instead of the success case. Use the `Result` for clearer semantics to that effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
I would normally wait for @twittner's approval, but since he's on holidays I guess we can merge this.
The last feedback I got from him on Friday was that the PR overall looks good. He just had some thoughts around the implementation of the periodic jobs, e.g. for splitting the Thank you both for the review and suggestions. |
Sits on top of #1174 and relates to libp2p-146 and libp2p-1089.
The following are the major changes:
RecordStore
, and theRecordStore
API extended.To that end, the protobuf structure for regular (value-)records is extended with the two fields
ttl
andpublisher
, in order to implement the different semantics of re-replication (by any of the k closest peers to the key, not affecting expiry) and re-publication (by the original publisher, resetting the expiry). This is not done yet in other libp2p Kademlia implementations (see e.g. libp2p-go-323). The new protobuf fields have been given somewhat unique identifiers to prevent future collision, should the libp2p spec and protocol be extended.Similarly, periodic re-publication of provider records does not seem to be done yet in other implementations (see e.g. libp2p-js-98) but was already drafted in the existing Rust implementation, which I thus continued.