Skip to content

Commit

Permalink
Add dual-stack support to etcd registry
Browse files Browse the repository at this point in the history
Take advantage of the etcd keys now being able to serialize an ipv4 and
ipv6 address to allow dual-stack leases. ipv4 is mandatory, ipv6 is
optional.

The lease lookup in etcd is still done based on the external ipv4
address. Generated dual-stack addresses will use the same offset inside
the address range for v4 and v6, so the coupling is deterministic.

Signed-off-by: Sjoerd Simons <[email protected]>
  • Loading branch information
sjoerdsimons committed Jan 16, 2022
1 parent 0b9d879 commit 7267f7b
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 68 deletions.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network, wg

wg.Add(1)
go func() {
subnet.WatchLease(ctx, sm, bn.Lease().Subnet, evts)
l := bn.Lease()
subnet.WatchLease(ctx, sm, l.Subnet, &l.IPv6Subnet, evts)
wg.Done()
}()

Expand Down
4 changes: 4 additions & 0 deletions pkg/ip/ip6net.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (ip6 *IP6) UnmarshalJSON(j []byte) error {
}
}

func (ip6 *IP6) Cmp(other *IP6) int {
return (*big.Int)(ip6).Cmp((*big.Int)(other))
}

// similar to net.IPNet but has uint based representation
type IP6Net struct {
IP *IP6
Expand Down
108 changes: 74 additions & 34 deletions subnet/etcdv2/local_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ func (m *LocalManager) AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Le
l, err := m.tryAcquireLease(ctx, config, attrs.PublicIP, attrs)
switch err {
case nil:
//TODO only vxlan backend and kube subnet manager support dual stack now.
l.EnableIPv4 = true
l.EnableIPv6 = false
return l, nil
case errTryAgain:
continue
Expand Down Expand Up @@ -146,15 +143,15 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, config *Config, extI
// Try to reuse a subnet if there's one that matches our IP
if l := findLeaseByIP(leases, extIaddr); l != nil {
// Make sure the existing subnet is still within the configured network
if isSubnetConfigCompat(config, l.Subnet) {
log.Infof("Found lease (%v) for current IP (%v), reusing", l.Subnet, extIaddr)
if isSubnetConfigCompat(config, l.Subnet) && isIPv6SubnetConfigCompat(config, l.IPv6Subnet) {
log.Infof("Found lease (ip: %v ipv6: %v) for current IP (%v), reusing", l.Subnet, l.IPv6Subnet, extIaddr)

ttl := time.Duration(0)
if !l.Expiration.IsZero() {
// Not a reservation
ttl = subnetTTL
}
exp, err := m.registry.updateSubnet(ctx, l.Subnet, attrs, ttl, 0)
exp, err := m.registry.updateSubnet(ctx, l.Subnet, &l.IPv6Subnet, attrs, ttl, 0)
if err != nil {
return nil, err
}
Expand All @@ -163,28 +160,29 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, config *Config, extI
l.Expiration = exp
return l, nil
} else {
log.Infof("Found lease (%v) for current IP (%v) but not compatible with current config, deleting", l.Subnet, extIaddr)
if err := m.registry.deleteSubnet(ctx, l.Subnet); err != nil {
log.Infof("Found lease (%+v) for current IP (%v) but not compatible with current config, deleting", l, extIaddr)
if err := m.registry.deleteSubnet(ctx, l.Subnet, &l.IPv6Subnet); err != nil {
return nil, err
}
}
}

// no existing match, check if there was a previous subnet to use
var sn ip.IP4Net
var sn6 *ip.IP6Net
if !m.previousSubnet.Empty() {
// use previous subnet
if l := findLeaseBySubnet(leases, m.previousSubnet); l != nil {
// Make sure the existing subnet is still within the configured network
if isSubnetConfigCompat(config, l.Subnet) {
if isSubnetConfigCompat(config, l.Subnet) && isIPv6SubnetConfigCompat(config, l.IPv6Subnet) {
log.Infof("Found lease (%v) matching previously leased subnet, reusing", l.Subnet)

ttl := time.Duration(0)
if !l.Expiration.IsZero() {
// Not a reservation
ttl = subnetTTL
}
exp, err := m.registry.updateSubnet(ctx, l.Subnet, attrs, ttl, 0)
exp, err := m.registry.updateSubnet(ctx, l.Subnet, &l.IPv6Subnet, attrs, ttl, 0)
if err != nil {
return nil, err
}
Expand All @@ -194,12 +192,13 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, config *Config, extI
return l, nil
} else {
log.Infof("Found lease (%v) matching previously leased subnet but not compatible with current config, deleting", l.Subnet)
if err := m.registry.deleteSubnet(ctx, l.Subnet); err != nil {
if err := m.registry.deleteSubnet(ctx, l.Subnet, &l.IPv6Subnet); err != nil {
return nil, err
}
}
} else {
// Check if the previous subnet is a part of the network and of the right subnet length
// TODO ipv6?
if isSubnetConfigCompat(config, m.previousSubnet) {
log.Infof("Found previously leased subnet (%v), reusing", m.previousSubnet)
sn = m.previousSubnet
Expand All @@ -211,18 +210,25 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, config *Config, extI

if sn.Empty() {
// no existing match, grab a new one
sn, err = m.allocateSubnet(config, leases)
sn, sn6, err = m.allocateSubnet(config, leases)
if err != nil {
return nil, err
}
}

exp, err := m.registry.createSubnet(ctx, sn, attrs, subnetTTL)
exp, err := m.registry.createSubnet(ctx, sn, sn6, attrs, subnetTTL)
switch {
case err == nil:
log.Infof("Allocated lease (%v) to current node (%v) ", sn, extIaddr)
log.Infof("Allocated lease (ip: %v ipv6: %v) to current node (%v) ", sn, sn6, extIaddr)
var lsn6 ip.IP6Net
if sn6 != nil {
lsn6 = *sn6
}
return &Lease{
EnableIPv4: true,
Subnet: sn,
EnableIPv6: !lsn6.Empty(),
IPv6Subnet: lsn6,
Attrs: *attrs,
Expiration: exp,
}, nil
Expand All @@ -233,32 +239,63 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, config *Config, extI
}
}

func (m *LocalManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, error) {
func (m *LocalManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, *ip.IP6Net, error) {
log.Infof("Picking subnet in range %s ... %s", config.SubnetMin, config.SubnetMax)
if config.EnableIPv6 {
log.Infof("Picking ipv6 subnet in range %s ... %s", config.IPv6SubnetMin, config.IPv6SubnetMax)
}

type IPs struct {
ip ip.IP4
ip6 *ip.IP6
}
var bag []IPs

var bag []ip.IP4
sn := ip.IP4Net{IP: config.SubnetMin, PrefixLen: config.SubnetLen}
var sn6 ip.IP6Net
if config.EnableIPv6 {
sn6 = ip.IP6Net{IP: config.IPv6SubnetMin, PrefixLen: config.IPv6SubnetLen}
}

OuterLoop:
for ; sn.IP <= config.SubnetMax && len(bag) < 100; sn = sn.Next() {
if !sn6.Empty() {
if sn6.IP.Cmp(config.IPv6SubnetMax) >= 0 {
break OuterLoop
}
}
for _, l := range leases {
if sn.Overlaps(l.Subnet) {
continue OuterLoop
}
if !sn6.Empty() && sn6.Overlaps(l.IPv6Subnet) {
continue OuterLoop
}
}

if !sn6.Empty() {
bag = append(bag, IPs{ip: sn.IP, ip6: sn6.IP})
sn6 = sn6.Next()
} else {
bag = append(bag, IPs{ip: sn.IP, ip6: nil})
}
bag = append(bag, sn.IP)
}

if len(bag) == 0 {
return ip.IP4Net{}, errors.New("out of subnets")
return ip.IP4Net{}, nil, errors.New("out of subnets")
} else {
i := randInt(0, len(bag))
return ip.IP4Net{IP: bag[i], PrefixLen: config.SubnetLen}, nil
ipnet := ip.IP4Net{IP: bag[i].ip, PrefixLen: config.SubnetLen}

if bag[i].ip6 == nil {
return ipnet, nil, nil
}
return ipnet, &ip.IP6Net{IP: bag[i].ip6, PrefixLen: config.IPv6SubnetLen}, nil
}
}

func (m *LocalManager) RenewLease(ctx context.Context, lease *Lease) error {
exp, err := m.registry.updateSubnet(ctx, lease.Subnet, &lease.Attrs, subnetTTL, 0)
exp, err := m.registry.updateSubnet(ctx, lease.Subnet, &lease.IPv6Subnet, &lease.Attrs, subnetTTL, 0)
if err != nil {
return err
}
Expand All @@ -285,47 +322,40 @@ func getNextIndex(cursor interface{}) (uint64, error) {
return nextIndex, nil
}

func (m *LocalManager) leaseWatchReset(ctx context.Context, sn ip.IP4Net) (LeaseWatchResult, error) {
l, index, err := m.registry.getSubnet(ctx, sn)
func (m *LocalManager) leaseWatchReset(ctx context.Context, sn ip.IP4Net, sn6 *ip.IP6Net) (LeaseWatchResult, error) {
l, index, err := m.registry.getSubnet(ctx, sn, sn6)
if err != nil {
return LeaseWatchResult{}, err
}

//TODO only vxlan backend and kube subnet manager support dual stack now.
l.EnableIPv4 = true
l.EnableIPv6 = false

return LeaseWatchResult{
Snapshot: []Lease{*l},
Cursor: watchCursor{index},
}, nil
}

func (m *LocalManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) {
func (m *LocalManager) WatchLease(ctx context.Context, sn ip.IP4Net, sn6 *ip.IP6Net, cursor interface{}) (LeaseWatchResult, error) {
if cursor == nil {
return m.leaseWatchReset(ctx, sn)
return m.leaseWatchReset(ctx, sn, sn6)
}

nextIndex, err := getNextIndex(cursor)
if err != nil {
return LeaseWatchResult{}, err
}

evt, index, err := m.registry.watchSubnet(ctx, nextIndex, sn)
evt, index, err := m.registry.watchSubnet(ctx, nextIndex, sn, sn6)

switch {
case err == nil:
//TODO only vxlan backend and kube subnet manager support dual stack now.
evt.Lease.EnableIPv4 = true
evt.Lease.EnableIPv6 = false
return LeaseWatchResult{
Events: []Event{evt},
Cursor: watchCursor{index},
}, nil

case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
return m.leaseWatchReset(ctx, sn)
return m.leaseWatchReset(ctx, sn, sn6)

default:
return LeaseWatchResult{}, err
Expand All @@ -347,7 +377,6 @@ func (m *LocalManager) WatchLeases(ctx context.Context, cursor interface{}) (Lea
case err == nil:
//TODO only vxlan backend and kube subnet manager support dual stack now.
evt.Lease.EnableIPv4 = true
evt.Lease.EnableIPv6 = false
return LeaseWatchResult{
Events: []Event{evt},
Cursor: watchCursor{index},
Expand Down Expand Up @@ -392,6 +421,17 @@ func isSubnetConfigCompat(config *Config, sn ip.IP4Net) bool {
return sn.PrefixLen == config.SubnetLen
}

func isIPv6SubnetConfigCompat(config *Config, sn6 ip.IP6Net) bool {
if !config.EnableIPv6 {
return sn6.Empty()
}
if sn6.Empty() || sn6.IP.Cmp(config.IPv6SubnetMin) < 0 || sn6.IP.Cmp(config.IPv6SubnetMax) > 0 {
return false
}

return sn6.PrefixLen == config.IPv6SubnetLen
}

func (m *LocalManager) Name() string {
previousSubnet := m.previousSubnet.String()
if m.previousSubnet.Empty() {
Expand Down
14 changes: 9 additions & 5 deletions subnet/etcdv2/mock_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func (msr *MockSubnetRegistry) getSubnets(ctx context.Context) ([]Lease, uint64,
return subs, msr.index, nil
}

func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, sn ip.IP4Net) (*Lease, uint64, error) {
// TODO ignores ipv6
func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, sn ip.IP4Net, sn6 *ip.IP6Net) (*Lease, uint64, error) {
msr.mux.Lock()
defer msr.mux.Unlock()

Expand All @@ -115,7 +116,8 @@ func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, sn ip.IP4Net) (*Le
return nil, msr.index, fmt.Errorf("subnet %s not found", sn)
}

func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
// TOODO ignores ipv6
func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, sn ip.IP4Net, sn6 *ip.IP6Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
msr.mux.Lock()
defer msr.mux.Unlock()

Expand Down Expand Up @@ -152,7 +154,8 @@ func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, sn ip.IP4Net, a
return exp, nil
}

func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
// TODO ignores ipv6
func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, sn ip.IP4Net, sn6 *ip.IP6Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
msr.mux.Lock()
defer msr.mux.Unlock()

Expand Down Expand Up @@ -182,7 +185,7 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, sn ip.IP4Net, a
return sub.Expiration, nil
}

func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net) error {
func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net, sn6 *ip.IP6Net) error {
msr.mux.Lock()
defer msr.mux.Unlock()

Expand Down Expand Up @@ -233,7 +236,8 @@ func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, since uint64) (
}
}

func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error) {
// TODO ignores ip6
func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net, sn6 *ip.IP6Net) (Event, uint64, error) {
for {
msr.mux.Lock()
index := msr.index
Expand Down
Loading

0 comments on commit 7267f7b

Please sign in to comment.