Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refac(exchange) move bitswap -> exchange/bitswap #91

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 0 additions & 58 deletions bitswap/strategy/strategy.go

This file was deleted.

6 changes: 3 additions & 3 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"time"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
bitswap "github.com/jbenet/go-ipfs/bitswap"
blocks "github.com/jbenet/go-ipfs/blocks"
exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util"

mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
Expand All @@ -16,11 +16,11 @@ import (
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
Datastore ds.Datastore
Remote bitswap.Exchange
Remote exchange.Interface
}

// NewBlockService creates a BlockService with given datastore instance.
func NewBlockService(d ds.Datastore, rem bitswap.Exchange) (*BlockService, error) {
func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, error) {
if d == nil {
return nil, fmt.Errorf("BlockService requires valid datastore")
}
Expand Down
9 changes: 5 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"

bitswap "github.com/jbenet/go-ipfs/bitswap"
bserv "github.com/jbenet/go-ipfs/blockservice"
config "github.com/jbenet/go-ipfs/config"
ci "github.com/jbenet/go-ipfs/crypto"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
merkledag "github.com/jbenet/go-ipfs/merkledag"
inet "github.com/jbenet/go-ipfs/net"
mux "github.com/jbenet/go-ipfs/net/mux"
Expand Down Expand Up @@ -47,7 +48,7 @@ type IpfsNode struct {
Routing routing.IpfsRouting

// the block exchange + strategy (bitswap)
BitSwap bitswap.Exchange
Exchange exchange.Interface

// the block service, get/add blocks.
Blocks *bserv.BlockService
Expand Down Expand Up @@ -88,7 +89,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
net inet.Network
// TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific
route *dht.IpfsDHT
exchangeSession bitswap.Exchange
exchangeSession exchange.Interface
)

if online {
Expand Down Expand Up @@ -140,7 +141,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
Blocks: bs,
DAG: dag,
Resolver: &path.Resolver{DAG: dag},
BitSwap: exchangeSession,
Exchange: exchangeSession,
Identity: local,
Routing: route,
}, nil
Expand Down
45 changes: 28 additions & 17 deletions bitswap/bitswap.go → exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,27 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/bitswap/network"
notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/bitswap/strategy"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)

// TODO rename -> Router?
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
// TODO replace with timeout with context
FindProvidersAsync(u.Key, int, time.Duration) <-chan *peer.Peer

// Provide provides the key to the network
Provide(key u.Key) error
}

// TODO(brian): ensure messages are being received

// PartnerWantListMax is the bound for the number of keys we'll store per
Expand All @@ -37,25 +48,25 @@ type bitswap struct {
blockstore blockstore.Blockstore

// routing interface for communication
routing Directory
routing Routing

notifications notifications.PubSub

// strategist listens to network traffic and makes decisions about how to
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
// TODO(brian): save the strategist's state to the datastore
strategist strategy.Strategist
// TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy
}

// NewSession initializes a bitswap session.
func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory Directory) Exchange {
func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory Routing) exchange.Interface {

// FIXME(brian): instantiate a concrete Strategist
receiver := bsnet.Forwarder{}
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategist: strategy.New(d),
strategy: strategy.New(),
peer: p,
routing: directory,
sender: bsnet.NewNetworkAdapter(s, &receiver),
Expand Down Expand Up @@ -112,7 +123,7 @@ func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc
// that accounting is _always_ performed when SendMessage and
// ReceiveMessage are called
bs.sender.SendMessage(ctx, p, message)
bs.strategist.MessageSent(p, message)
bs.strategy.MessageSent(p, message)

block, ok := <-blockChannel
if !ok {
Expand All @@ -122,9 +133,9 @@ func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc
}

func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
for _, p := range bs.strategist.Peers() {
if bs.strategist.IsWantedByPeer(block.Key(), p) {
if bs.strategist.ShouldSendToPeer(block.Key(), p) {
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
go bs.send(p, block)
}
}
Expand All @@ -144,15 +155,15 @@ func (bs *bitswap) send(p *peer.Peer, b blocks.Block) {
message.AppendBlock(b)
// FIXME(brian): pass ctx
bs.sender.SendMessage(context.Background(), p, message)
bs.strategist.MessageSent(p, message)
bs.strategy.MessageSent(p, message)
}

// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {

bs.strategist.MessageReceived(sender, incoming)
bs.strategy.MessageReceived(sender, incoming)

if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() {
Expand All @@ -163,7 +174,7 @@ func (bs *bitswap) ReceiveMessage(

if incoming.Wantlist() != nil {
for _, key := range incoming.Wantlist() {
if bs.strategist.ShouldSendToPeer(key, sender) {
if bs.strategy.ShouldSendBlockToPeer(key, sender) {
block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil {
// TODO(brian): log/return the error
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package network

import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
netservice "github.com/jbenet/go-ipfs/net/service"

bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
)
Expand Down
3 changes: 2 additions & 1 deletion bitswap/offline.go → exchange/bitswap/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"time"

blocks "github.com/jbenet/go-ipfs/blocks"
exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util"
)

func NewOfflineExchange() Exchange {
func NewOfflineExchange() exchange.Interface {
return &offlineExchange{}
}

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
package strategy

import (
bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)

type Strategist interface {
Accountant

type Strategy interface {
// Returns a slice of Peers that
Peers() []*peer.Peer

// WantList returns the WantList for the given Peer
IsWantedByPeer(u.Key, *peer.Peer) bool
BlockIsWantedByPeer(u.Key, *peer.Peer) bool

// ShouldSendTo(Peer) decides whether to send data to this Peer
ShouldSendToPeer(u.Key, *peer.Peer) bool
ShouldSendBlockToPeer(u.Key, *peer.Peer) bool

// Seed initializes the decider to a deterministic state
Seed(int64)
}

type Accountant interface {
// MessageReceived records receipt of message for accounting purposes
MessageReceived(*peer.Peer, bsmsg.BitSwapMessage) error

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
// access/lookups.
type keySet map[u.Key]struct{}

func newLedger(p *peer.Peer, strategy strategyFunc) *ledger {
return &ledger{
wantList: keySet{},
Strategy: strategy,
Partner: p,
}
}

// ledger stores the data exchange relationship between two peers.
type ledger struct {
lock sync.RWMutex
Expand All @@ -37,9 +45,6 @@ type ledger struct {
Strategy strategyFunc
}

// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[u.Key]*ledger

func (l *ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()
Expand Down
File renamed without changes.
File renamed without changes.
Loading