Skip to content

Commit

Permalink
Implemented live-edge walker
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Jun 26, 2017
1 parent 34f7977 commit 216ba14
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 10 deletions.
1 change: 1 addition & 0 deletions Tribler/Core/Config/config.spec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ exitnode_enabled = boolean(default=False)
[trustchain]
enabled = boolean(default=True)
ec_keypair_filename = string(default='')
live_edges_enabled = boolean(default=True)

[metadata]
enabled = boolean(default=True)
Expand Down
6 changes: 6 additions & 0 deletions Tribler/Core/Config/tribler_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ def get_trustchain_permid_keypair_filename(self):
self.set_trustchain_permid_keypair_filename(file_name)
return file_name

def set_trustchain_live_edges_enabled(self, value):
self.config['trustchain']['live_edges_enabled'] = value

def get_trustchain_live_edges_enabled(self):
return self.config['trustchain']['live_edges_enabled']

def set_megacache_enabled(self, value):
self.config['general']['megacache'] = value

Expand Down
45 changes: 39 additions & 6 deletions Tribler/Test/Community/Triblerchain/test_community.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import deferLater
from twisted.internet.threads import blockingCallFromThread

from Tribler.Test.Community.Trustchain.test_community import BaseTestTrustChainCommunity
from Tribler.Test.Community.Trustchain.test_trustchain_utilities import TrustChainTestCase
from Tribler.community.triblerchain.block import TriblerChainBlock
Expand All @@ -7,9 +12,6 @@
from Tribler.dispersy.requestcache import IntroductionRequestCache
from Tribler.dispersy.tests.dispersytestclass import DispersyTestFunc
from Tribler.dispersy.util import blocking_call_on_reactor_thread
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import deferLater


class TestPendingBytes(TrustChainTestCase):
Expand Down Expand Up @@ -307,9 +309,11 @@ def test_crawler_on_introduction_received(self):
# and we don't actually want to send the crawl request since the counter party is fake, just count if it is run
counter = [0]

def replacement(cand, pk):
counter[0] += 1
crawler._community.send_crawl_request = replacement
def on_crawl_request(cand, pk, sequence_number=None):
# Ignore live edge request
if sequence_number != -1:
counter[0] += 1
crawler.community.send_crawl_request = on_crawl_request

# Act
crawler.call(crawler.community.on_introduction_response, [intro_response])
Expand Down Expand Up @@ -353,3 +357,32 @@ def test_get_statistics_for_not_self(self):
statistics = node.community.get_statistics(public_key=other.community.my_member.public_key)
assert isinstance(statistics, dict), type(statistics)
assert len(statistics) > 0

def test_get_trust(self):
"""
Test that the trust nodes have for each other is the upload + the download total of all blocks.
"""
# Arrange
node, other = self.create_nodes(2)
transaction = {'up': 10, 'down': 5, 'total_up': 10, 'total_down': 5}
TestTriblerChainCommunity.create_block(node, other, self._create_target(node, other), transaction)
TestTriblerChainCommunity.create_block(other, node, self._create_target(other, node), transaction)

# Get statistics
node_trust = blockingCallFromThread(reactor, node.community.get_trust, other.community.my_member)
other_trust = blockingCallFromThread(reactor, other.community.get_trust, node.community.my_member)
self.assertEqual(node_trust, 15)
self.assertEqual(other_trust, 15)

def test_get_default_trust(self):
"""
Test that the trust between nodes without blocks is 1.
"""
# Arrange
node, other = self.create_nodes(2)

# Get statistics
node_trust = blockingCallFromThread(reactor, node.community.get_trust, other.community.my_member)
other_trust = blockingCallFromThread(reactor, other.community.get_trust, node.community.my_member)
self.assertEqual(node_trust, 1)
self.assertEqual(other_trust, 1)
190 changes: 189 additions & 1 deletion Tribler/Test/Community/Trustchain/test_community.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
"""
This file contains the tests for the community.py for TrustChain community.
"""
import time

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.threads import blockingCallFromThread

from Tribler.Test.Community.Trustchain.test_trustchain_utilities import TrustChainTestCase
from Tribler.Test.test_as_server import AbstractServer
from Tribler.community.trustchain.block import GENESIS_SEQ
from Tribler.community.trustchain.community import (TrustChainCommunity, HALF_BLOCK, CRAWL)
from Tribler.dispersy.candidate import Candidate
from Tribler.dispersy.message import DelayPacketByMissingMember
from Tribler.dispersy.requestcache import IntroductionRequestCache
from Tribler.dispersy.tests.debugcommunity.node import DebugNode
from Tribler.dispersy.tests.dispersytestclass import DispersyTestFunc
from Tribler.dispersy.util import blocking_call_on_reactor_thread
from twisted.internet.defer import inlineCallbacks, returnValue


class BaseTestTrustChainCommunity(TrustChainTestCase, DispersyTestFunc):
Expand Down Expand Up @@ -42,6 +49,10 @@ def assertBlocksAreEqual(self, node, other):
node.community.persistence.crawl(other.community.my_member.public_key, 0),
other.community.persistence.crawl(other.community.my_member.public_key, 0))

@blocking_call_on_reactor_thread
def get_node_sq_from_db(self, node, sq_owner_node, sequence_number):
return node.community.persistence.get(sq_owner_node.community.my_member.public_key, sequence_number)

@staticmethod
def create_block(req, resp, target_resp, transaction):
req.call(req.community.sign_block, target_resp, resp.my_member.public_key, transaction)
Expand Down Expand Up @@ -266,6 +277,32 @@ def test_crawl_block_specified_sequence_number(self):
self.assertBlocksInDatabase(crawler, 2)
self.assertBlocksAreEqual(node, crawler)

def test_crawl_blocks_negative_sequence_number(self):
"""
Test the crawler to fetch blocks starting from a negative sequence number.
"""
# Arrange
node, other, crawler = self.create_nodes(3)

# Act
TestTrustChainCommunity.create_block(node, other, self._create_target(node, other), {}) # sq 1
TestTrustChainCommunity.create_block(node, other, self._create_target(node, other), {}) # sq 2
TestTrustChainCommunity.create_block(node, other, self._create_target(node, other), {}) # sq 3

self.clean_database(crawler)
self.assertBlocksInDatabase(crawler, 0)
TestTrustChainCommunity.crawl_node(crawler, node, self._create_target(crawler, node), -2)

# Assert
self.assertBlocksInDatabase(node, 6)
self.assertBlocksInDatabase(crawler, 4)
self.assertIsNone(self.get_node_sq_from_db(crawler, node, 1))
self.assertIsNone(self.get_node_sq_from_db(crawler, other, 1))
self.assertIsNotNone(self.get_node_sq_from_db(crawler, node, 2))
self.assertIsNotNone(self.get_node_sq_from_db(crawler, other, 2))
self.assertIsNotNone(self.get_node_sq_from_db(crawler, node, 3))
self.assertIsNotNone(self.get_node_sq_from_db(crawler, other, 3))

def test_crawl_no_block(self):
"""
Test crawl without a block.
Expand Down Expand Up @@ -320,3 +357,154 @@ def test_crawl_batch(self):
self.assertBlocksAreEqual(node, other)
self.assertBlocksAreEqual(node, crawler)
self.assertBlocksAreEqual(other, crawler)

def test_get_trust(self):
"""
Test that the trust nodes have for each other is the sum of the length of both chains.
"""
# Arrange
node, other = self.create_nodes(2)
transaction = {}
TestTrustChainCommunity.create_block(node, other, self._create_target(node, other), transaction)
TestTrustChainCommunity.create_block(other, node, self._create_target(other, node), transaction)

# Get statistics
node_trust = blockingCallFromThread(reactor, node.community.get_trust, other.community.my_member)
other_trust = blockingCallFromThread(reactor, other.community.get_trust, node.community.my_member)
self.assertEqual(node_trust, 2)
self.assertEqual(other_trust, 2)

def test_get_default_trust(self):
"""
Test that the trust between nodes without blocks is 1.
"""
# Arrange
node, other = self.create_nodes(2)

# Get statistics
node_trust = blockingCallFromThread(reactor, node.community.get_trust, other.community.my_member)
other_trust = blockingCallFromThread(reactor, other.community.get_trust, node.community.my_member)
self.assertEqual(node_trust, 1)
self.assertEqual(other_trust, 1)

def test_live_edge_bootstrapping(self):
"""
A node without trust for anyone should still find a candidate.
"""
# Arrange
node, other = self.create_nodes(2)
candidate = node.community.create_or_update_walkcandidate(other.my_candidate.sock_addr,
other.my_candidate.sock_addr,
('0.0.0.0', 0),
other.my_candidate.tunnel,
u"unknown")
candidate.associate(other.community.my_member)
candidate.walk_response(time.time())

# Assert
intro = blockingCallFromThread(reactor, node.community.dispersy_get_introduce_candidate,
node.my_candidate)
self.assertIsNotNone(intro)
self.assertIsInstance(intro, Candidate)
self.assertEqual(intro, candidate)

def test_live_edge_recommend_valid(self):
"""
Live edges should never include invalid/old candidates.
"""
# Arrange
node, other, another = self.create_nodes(3)

# Stop the community from walking/crawling once it gets reactor control
node.community.cancel_all_pending_tasks()
node.community.reset_live_edges()
node.community.candidates.clear()

candidate = node.community.create_or_update_walkcandidate(other.my_candidate.sock_addr,
other.my_candidate.sock_addr,
('0.0.0.0', 0),
other.my_candidate.tunnel,
u"unknown")
candidate.associate(other.community.my_member)
candidate.walk_response(time.time())

node.community.create_or_update_walkcandidate(another.my_candidate.sock_addr,
another.my_candidate.sock_addr,
('0.0.0.0', 0),
another.my_candidate.tunnel,
u"unknown")

# Assert
intro = blockingCallFromThread(reactor, node.community.dispersy_get_introduce_candidate,
node.my_candidate)
self.assertIsNotNone(intro)
self.assertIsInstance(intro, Candidate)
self.assertEqual(intro, candidate)

def test_live_edge_callback_no_candidates(self):
"""
Test live edges start with my member.
"""
# Arrange
node, = self.create_nodes(1)

called = [False]

def check_live_edge(edge_id, candidates):
self.assertEqual(1, edge_id)
self.assertEqual(node.my_member.mid, candidates[0].get_member().mid)
called[0] = True

node.community.set_live_edge_callback(check_live_edge)

# Stop the community from walking/crawling once it gets reactor control
node.community.cancel_all_pending_tasks()
node.community.reset_live_edges()

# Act
node.community.take_step()

# Assert
self.assertTrue(called)

def test_live_edge_callback(self):
"""
Test creation and handling of a new live edge.
"""
# Arrange
node, other = self.create_nodes(2)

# Create a cache, so our introduction response is expected by the node
cache = object.__new__(IntroductionRequestCache)
blockingCallFromThread(reactor, IntroductionRequestCache.__init__, cache,
node.community, other.my_candidate.sock_addr)
cache = blockingCallFromThread(reactor, node.community.request_cache.add, cache)

# Create the actual response message
response = other.create_introduction_response(node.my_candidate,
other.my_candidate.sock_addr,
other.my_candidate.sock_addr,
("0.0.0.0", 0),
("0.0.0.0", 0),
u"unknown",
False,
cache.number)
response._candidate = other.my_candidate # Fake its arrival from other

called = [False]

def check_live_edge(edge_id, candidates):
# We have no more valid candidates, increment id
self.assertEqual(1, edge_id)
# Start with our member
self.assertEqual(node.my_member.mid, candidates[0].get_member().mid)
# End with new member
self.assertEqual(other.my_member.mid, candidates[1].get_member().mid)
called[0] = True
node.community.set_live_edge_callback(check_live_edge)

# Act
blockingCallFromThread(reactor, node.community.on_introduction_response, [response])

# Assert
self.assertTrue(called[0])
18 changes: 17 additions & 1 deletion Tribler/community/triblerchain/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_master_members(cls, dispersy):
return [dispersy.get_member(public_key=master_key.decode("HEX"))]

def initialize(self, tribler_session=None):
super(TriblerChainCommunity, self).initialize()
super(TriblerChainCommunity, self).initialize(tribler_session)
if tribler_session:
self.notifier = tribler_session.notifier
self.notifier.add_observer(self.on_tunnel_remove, NTFY_TUNNEL, [NTFY_REMOVE])
Expand Down Expand Up @@ -163,6 +163,22 @@ def unload_community(self):
self.pending_bytes[pk].clean.reset(0)
yield super(TriblerChainCommunity, self).unload_community()

def get_trust(self, member):
"""
Get the trust for another member.
Currently this is just the amount of MBs downloaded from them.
:param member: the member we interacted with
:type member: dispersy.member.Member
:return: the trust value for this member
:rtype: int
"""
block = self.persistence.get_latest(member.public_key)
if block:
return block.transaction['total_up'] + block.transaction['total_down']
else:
return 1


class TriblerChainCommunityCrawler(TriblerChainCommunity):
"""
Expand Down
Loading

0 comments on commit 216ba14

Please sign in to comment.