Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peerstore Fix #128

Merged
merged 11 commits into from
Sep 27, 2014
Merged
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
return nil, err
}

net, err = inet.NewIpfsNetwork(context.TODO(), local, &mux.ProtocolMap{
net, err = inet.NewIpfsNetwork(context.TODO(), local, peerstore, &mux.ProtocolMap{
mux.ProtocolID_Routing: dhtService,
mux.ProtocolID_Exchange: exchangeService,
})
Expand Down
11 changes: 9 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
)

func TestInitialization(t *testing.T) {
id := &config.Identity{
id := config.Identity{
PeerID: "QmNgdzLieYi8tgfo2WfTUzNVH5hQK9oAYGVf6dxN12NrHt",
Address: "/ip4/127.0.0.1/tcp/8000",
PrivKey: "CAASrRIwggkpAgEAAoICAQCwt67GTUQ8nlJhks6CgbLKOx7F5tl1r9zF4m3TUrG3Pe8h64vi+ILDRFd7QJxaJ/n8ux9RUDoxLjzftL4uTdtv5UXl2vaufCc/C0bhCRvDhuWPhVsD75/DZPbwLsepxocwVWTyq7/ZHsCfuWdoh/KNczfy+Gn33gVQbHCnip/uhTVxT7ARTiv8Qa3d7qmmxsR+1zdL/IRO0mic/iojcb3Oc/PRnYBTiAZFbZdUEit/99tnfSjMDg02wRayZaT5ikxa6gBTMZ16Yvienq7RwSELzMQq2jFA4i/TdiGhS9uKywltiN2LrNDBcQJSN02pK12DKoiIy+wuOCRgs2NTQEhU2sXCk091v7giTTOpFX2ij9ghmiRfoSiBFPJA5RGwiH6ansCHtWKY1K8BS5UORM0o3dYk87mTnKbCsdz4bYnGtOWafujYwzueGx8r+IWiys80IPQKDeehnLW6RgoyjszKgL/2XTyP54xMLSW+Qb3BPgDcPaPO0hmop1hW9upStxKsefW2A2d46Ds4HEpJEry7PkS5M4gKL/zCKHuxuXVk14+fZQ1rstMuvKjrekpAC2aVIKMI9VRA3awtnje8HImQMdj+r+bPmv0N8rTTr3eS4J8Yl7k12i95LLfK+fWnmUh22oTNzkRlaiERQrUDyE4XNCtJc0xs1oe1yXGqazCIAQIDAQABAoICAQCk1N/ftahlRmOfAXk//8wNl7FvdJD3le6+YSKBj0uWmN1ZbUSQk64chr12iGCOM2WY180xYjy1LOS44PTXaeW5bEiTSnb3b3SH+HPHaWCNM2EiSogHltYVQjKW+3tfH39vlOdQ9uQ+l9Gh6iTLOqsCRyszpYPqIBwi1NMLY2Ej8PpVU7ftnFWouHZ9YKS7nAEiMoowhTu/7cCIVwZlAy3AySTuKxPMVj9LORqC32PVvBHZaMPJ+X1Xyijqg6aq39WyoztkXg3+Xxx5j5eOrK6vO/Lp6ZUxaQilHDXoJkKEJjgIBDZpluss08UPfOgiWAGkW+L4fgUxY0qDLDAEMhyEBAn6KOKVL1JhGTX6GjhWziI94bddSpHKYOEIDzUy4H8BXnKhtnyQV6ELS65C2hj9D0IMBTj7edCF1poJy0QfdK0cuXgMvxHLeUO5uc2YWfbNosvKxqygB9rToy4b22YvNwsZUXsTY6Jt+p9V2OgXSKfB5VPeRbjTJL6xqvvUJpQytmII/C9JmSDUtCbYceHj6X9jgigLk20VV6nWHqCTj3utXD6NPAjoycVpLKDlnWEgfVELDIk0gobxUqqSm3jTPEKRPJgxkgPxbwxYumtw++1UY2y35w3WRDc2xYPaWKBCQeZy+mL6ByXp9bWlNvxS3Knb6oZp36/ovGnf2pGvdQKCAQEAyKpipz2lIUySDyE0avVWAmQb2tWGKXALPohzj7AwkcfEg2GuwoC6GyVE2sTJD1HRazIjOKn3yQORg2uOPeG7sx7EKHxSxCKDrbPawkvLCq8JYSy9TLvhqKUVVGYPqMBzu2POSLEA81QXas+aYjKOFWA2Zrjq26zV9ey3+6Lc6WULePgRQybU8+RHJc6fdjUCCfUxgOrUO2IQOuTJ+FsDpVnrMUGlokmWn23OjL4qTL9wGDnWGUs2pjSzNbj3qA0d8iqaiMUyHX/D/VS0wpeT1osNBSm8suvSibYBn+7wbIApbwXUxZaxMv2OHGz3empae4ckvNZs7r8wsI9UwFt8mwKCAQEA4XK6gZkv9t+3YCcSPw2ensLvL/xU7i2bkC9tfTGdjnQfzZXIf5KNdVuj/SerOl2S1s45NMs3ysJbADwRb4ahElD/V71nGzV8fpFTitC20ro9fuX4J0+twmBolHqeH9pmeGTjAeL1rvt6vxs4FkeG/yNft7GdXpXTtEGaObn8Mt0tPY+aB3UnKrnCQoQAlPyGHFrVRX0UEcp6wyyNGhJCNKeNOvqCHTFObhbhO+KWpWSN0MkVHnqaIBnIn1Te8FtvP/iTwXGnKc0YXJUG6+LM6LmOguW6tg8ZqiQeYyyR+e9eCFH4csLzkrTl1GxCxwEsoSLIMm7UDcjttW6tYEghkwKCAQEAmeCO5lCPYImnN5Lu71ZTLmI2OgmjaANTnBBnDbi+hgv61gUCToUIMejSdDCTPfwv61P3TmyIZs0luPGxkiKYHTNqmOE9Vspgz8Mr7fLRMNApESuNvloVIY32XVImj/GEzh4rAfM6F15U1sN8T/EUo6+0B/Glp+9R49QzAfRSE2g48/rGwgf1JVHYfVWFUtAzUA+GdqWdOixo5cCsYJbqpNHfWVZN/bUQnBFIYwUwysnC29D+LUdQEQQ4qOm+gFAOtrWU62zMkXJ4iLt8Ify6kbrvsRXgbhQIzzGS7WH9XDarj0eZciuslr15TLMC1Azadf+cXHLR9gMHA13mT9vYIQKCAQA/DjGv8cKCkAvf7s2hqROGYAs6Jp8yhrsN1tYOwAPLRhtnCs+rLrg17M2vDptLlcRuI/vIElamdTmylRpjUQpX7yObzLO73nfVhpwRJVMdGU394iBIDncQ+JoHfUwgqJskbUM40dvZdyjbrqc/Q/4z+hbZb+oN/GXb8sVKBATPzSDMKQ/xqgisYIw+wmDPStnPsHAaIWOtni47zIgilJzD0WEk78/YjmPbUrboYvWziK5JiRRJFA1rkQqV1c0M+OXixIm+/yS8AksgCeaHr0WUieGcJtjT9uE8vyFop5ykhRiNxy9wGaq6i7IEecsrkd6DqxDHWkwhFuO1bSE83q/VAoIBAEA+RX1i/SUi08p71ggUi9WFMqXmzELp1L3hiEjOc2AklHk2rPxsaTh9+G95BvjhP7fRa/Yga+yDtYuyjO99nedStdNNSg03aPXILl9gs3r2dPiQKUEXZJ3FrH6tkils/8BlpOIRfbkszrdZIKTO9GCdLWQ30dQITDACs8zV/1GFGrHFrqnnMe/NpIFHWNZJ0/WZMi8wgWO6Ik8jHEpQtVXRiXLqy7U6hk170pa4GHOzvftfPElOZZjy9qn7KjdAQqy6spIrAE94OEL+fBgbHQZGLpuTlj6w6YGbMtPU8uo7sXKoc6WOCb68JWft3tejGLDa1946HAWqVM9B/UcneNc=",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

such a large string!

}

Expand All @@ -19,6 +18,10 @@ func TestInitialization(t *testing.T) {
Datastore: config.Datastore{
Type: "memory",
},
Addresses: config.Addresses{
Swarm: "/ip4/0.0.0.0/tcp/4001",
API: "/ip4/127.0.0.1/tcp/8000",
},
},

&config.Config{
Expand All @@ -27,6 +30,10 @@ func TestInitialization(t *testing.T) {
Type: "leveldb",
Path: ".testdb",
},
Addresses: config.Addresses{
Swarm: "/ip4/0.0.0.0/tcp/4001",
API: "/ip4/127.0.0.1/tcp/8000",
},
},
}

Expand Down
74 changes: 64 additions & 10 deletions crypto/spipe/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,18 @@ func (s *SecurePipe) handshake() error {
return err
}

s.remote.PubKey, err = ci.UnmarshalPublicKey(proposeResp.GetPubkey())
// get remote identity
remotePubKey, err := ci.UnmarshalPublicKey(proposeResp.GetPubkey())
if err != nil {
return err
}

remoteID, err := IDFromPubKey(s.remote.PubKey)
// get or construct peer
s.remote, err = getOrConstructPeer(s.peers, remotePubKey)
if err != nil {
return err
}

if s.remote.ID != nil && !remoteID.Equal(s.remote.ID) {
e := "Expected pubkey does not match sent pubkey: %v - %v"
return fmt.Errorf(e, s.remote.ID.Pretty(), remoteID.Pretty())
} else if s.remote.ID == nil {
s.remote.ID = remoteID
}
// u.POut("Remote Peer Identified as %s\n", s.remote.ID.Pretty())
u.DOut("[%s] Remote Peer Identified as %s\n", s.local.ID.Pretty(), s.remote.ID.Pretty())

exchange, err := selectBest(SupportedExchanges, proposeResp.GetExchanges())
if err != nil {
Expand Down Expand Up @@ -340,3 +335,62 @@ func selectBest(myPrefs, theirPrefs string) (string, error) {

return "", errors.New("No algorithms in common!")
}

// getOrConstructPeer attempts to fetch a peer from a peerstore.
// if succeeds, verify ID and PubKey match.
// else, construct it.
func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error) {

rid, err := IDFromPubKey(rpk)
if err != nil {
return nil, err
}

npeer, err := peers.Get(rid)
if err != nil {
if err != peer.ErrNotFound {
return nil, err // unexpected error happened.
}

// dont have peer, so construct it + add it to peerstore.
npeer = &peer.Peer{ID: rid, PubKey: rpk}
if err := peers.Put(npeer); err != nil {
return nil, err
}

// done, return the newly constructed peer.
return npeer, nil
}

// did have it locally.

// let's verify ID
if !npeer.ID.Equal(rid) {
e := "Expected peer.ID does not match sent pubkey's hash: %v - %v"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be an interesting scenario that this could fail in..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I added that test case because I could be Dialing to a peer1 and then i receive back peer2's pubkey, and thus am like wat!? (note that if this is an incoming connection, peer is ErrNotFound above, and returns beforehand. Did prompt me to find a bug though (if peers.Get returns nil, nil, we'd panic).

return nil, fmt.Errorf(e, npeer.ID.Pretty(), rid.Pretty())
}

if npeer.PubKey == nil {
// didn't have a pubkey, just set it.
npeer.PubKey = rpk
return npeer, nil
}

// did have pubkey, let's verify it's really the same.
// this shouldn't ever happen, given we hashed, etc, but it could mean
// expected code (or protocol) invariants violated.

lb, err1 := npeer.PubKey.Bytes()
if err1 != nil {
return nil, err1
}
rb, err2 := rpk.Bytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like: PubKey.Equals(otherpubkey) might be nice

if err2 != nil {
return nil, err2
}

if !bytes.Equal(lb, rb) {
return nil, fmt.Errorf("WARNING: PubKey mismatch: %v", npeer.ID.Pretty())
}
return npeer, nil
}
19 changes: 15 additions & 4 deletions crypto/spipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type SecurePipe struct {

local *peer.Peer
remote *peer.Peer
peers peer.Peerstore

params params

Expand All @@ -32,16 +33,16 @@ type params struct {
}

// NewSecurePipe constructs a pipe with channels of a given buffer size.
func NewSecurePipe(ctx context.Context, bufsize int, local,
remote *peer.Peer) (*SecurePipe, error) {
func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer,
peers peer.Peerstore) (*SecurePipe, error) {

sp := &SecurePipe{
Duplex: Duplex{
In: make(chan []byte, bufsize),
Out: make(chan []byte, bufsize),
},
local: local,
remote: remote,
local: local,
peers: peers,
}
return sp, nil
}
Expand All @@ -63,6 +64,16 @@ func (s *SecurePipe) Wrap(ctx context.Context, insecure Duplex) error {
return nil
}

// LocalPeer retrieves the local peer.
func (s *SecurePipe) LocalPeer() *peer.Peer {
return s.local
}

// RemotePeer retrieves the local peer.
func (s *SecurePipe) RemotePeer() *peer.Peer {
return s.remote
}

// Close closes the secure pipe
func (s *SecurePipe) Close() error {
if s.cancel == nil {
Expand Down
11 changes: 9 additions & 2 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ func TestInitializeDaemonListener(t *testing.T) {
privKey := base64.StdEncoding.EncodeToString(prbytes)
pID := ident.Pretty()

id := &config.Identity{
id := config.Identity{
PeerID: pID,
Address: "/ip4/127.0.0.1/tcp/8000",
PrivKey: privKey,
}

Expand All @@ -38,6 +37,10 @@ func TestInitializeDaemonListener(t *testing.T) {
Datastore: config.Datastore{
Type: "memory",
},
Addresses: config.Addresses{
Swarm: "/ip4/0.0.0.0/tcp/4001",
API: "/ip4/127.0.0.1/tcp/8000",
},
},

&config.Config{
Expand All @@ -46,6 +49,10 @@ func TestInitializeDaemonListener(t *testing.T) {
Type: "leveldb",
Path: ".testdb",
},
Addresses: config.Addresses{
Swarm: "/ip4/0.0.0.0/tcp/4001",
API: "/ip4/127.0.0.1/tcp/8000",
},
},
}

Expand Down
17 changes: 6 additions & 11 deletions net/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@ func NewConn(peer *peer.Peer, addr *ma.Multiaddr, nconn net.Conn) (*Conn, error)
return conn, nil
}

// NewNetConn constructs a new connection with given net.Conn
func NewNetConn(nconn net.Conn) (*Conn, error) {

addr, err := ma.FromNetAddr(nconn.RemoteAddr())
if err != nil {
return nil, err
}

return NewConn(new(peer.Peer), addr, nconn)
}

// Dial connects to a particular peer, over a given network
// Example: Dial("udp", peer)
func Dial(network string, peer *peer.Peer) (*Conn, error) {
Expand Down Expand Up @@ -112,3 +101,9 @@ func (c *Conn) Close() error {
c.Closed <- true
return err
}

// NetConnMultiaddr returns the net.Conn's address, recast as a multiaddr.
// (consider moving this directly into the multiaddr package)
func NetConnMultiaddr(nconn net.Conn) (*ma.Multiaddr, error) {
return ma.FromNetAddr(nconn.RemoteAddr())
}
44 changes: 30 additions & 14 deletions net/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mux

import (
"errors"
"sync"

msg "github.com/jbenet/go-ipfs/net/message"
u "github.com/jbenet/go-ipfs/util"
Expand Down Expand Up @@ -30,6 +31,8 @@ type Muxer struct {

// cancel is the function to stop the Muxer
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup

*msg.Pipe
}
Expand Down Expand Up @@ -58,20 +61,30 @@ func (m *Muxer) Start(ctx context.Context) error {
}

// make a cancellable context.
ctx, m.cancel = context.WithCancel(ctx)
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg = sync.WaitGroup{}

go m.handleIncomingMessages(ctx)
m.wg.Add(1)
go m.handleIncomingMessages()
for pid, proto := range m.Protocols {
go m.handleOutgoingMessages(ctx, pid, proto)
m.wg.Add(1)
go m.handleOutgoingMessages(pid, proto)
}

return nil
}

// Stop stops muxer activity.
func (m *Muxer) Stop() {
if m.cancel == nil {
panic("muxer stopped twice.")
}
// issue cancel, and wipe func.
m.cancel()
m.cancel = context.CancelFunc(nil)

// wait for everything to wind down.
m.wg.Wait()
}

// AddProtocol adds a Protocol with given ProtocolID to the Muxer.
Expand All @@ -86,7 +99,8 @@ func (m *Muxer) AddProtocol(p Protocol, pid ProtocolID) error {

// handleIncoming consumes the messages on the m.Incoming channel and
// routes them appropriately (to the protocols).
func (m *Muxer) handleIncomingMessages(ctx context.Context) {
func (m *Muxer) handleIncomingMessages() {
defer m.wg.Done()

for {
if m == nil {
Expand All @@ -98,16 +112,16 @@ func (m *Muxer) handleIncomingMessages(ctx context.Context) {
if !more {
return
}
go m.handleIncomingMessage(ctx, msg)
go m.handleIncomingMessage(msg)

case <-ctx.Done():
case <-m.ctx.Done():
return
}
}
}

// handleIncomingMessage routes message to the appropriate protocol.
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) {
func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {

data, pid, err := unwrapData(m1.Data())
if err != nil {
Expand All @@ -124,31 +138,33 @@ func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) {

select {
case proto.GetPipe().Incoming <- m2:
case <-ctx.Done():
u.PErr("%v\n", ctx.Err())
case <-m.ctx.Done():
u.PErr("%v\n", m.ctx.Err())
return
}
}

// handleOutgoingMessages consumes the messages on the proto.Outgoing channel,
// wraps them and sends them out.
func (m *Muxer) handleOutgoingMessages(ctx context.Context, pid ProtocolID, proto Protocol) {
func (m *Muxer) handleOutgoingMessages(pid ProtocolID, proto Protocol) {
defer m.wg.Done()

for {
select {
case msg, more := <-proto.GetPipe().Outgoing:
if !more {
return
}
go m.handleOutgoingMessage(ctx, pid, msg)
go m.handleOutgoingMessage(pid, msg)

case <-ctx.Done():
case <-m.ctx.Done():
return
}
}
}

// handleOutgoingMessage wraps out a message and sends it out the
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 msg.NetMessage) {
func (m *Muxer) handleOutgoingMessage(pid ProtocolID, m1 msg.NetMessage) {
data, err := wrapData(m1.Data(), pid)
if err != nil {
u.PErr("muxer serializing error: %v\n", err)
Expand All @@ -158,7 +174,7 @@ func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 ms
m2 := msg.New(m1.Peer(), data)
select {
case m.GetPipe().Outgoing <- m2:
case <-ctx.Done():
case <-m.ctx.Done():
return
}
}
Expand Down
Loading