Skip to content

Commit

Permalink
AWS CNI Mode
Browse files Browse the repository at this point in the history
✨ new operational mode of Direct Ingress Pod access

This adds a new operational mode of adding the Ingress pods as Targets in the Target Groups directly, instead of the nodes where the Ingress is accessed through a node port.

The core idea is based on the fact that standard AWS EKS cluster running AWS VPC CNI have their pods as first class members in the VPCs. Their IPs are directly reachable from the ALB/NLB target groups like the nodes are too, which means there is no necessity for the NodePort extra hop to take.

There are several drivers and advantages accessing the pod directly vs a Nodeport:

## Delay and eventual consistency of AutoscalingGroup to ALB/NLB target group updates

This has been the biggest trouble in operations, that the list of nodes in target groups is slower than the nodes are replaced which ends up in a black hole of no Ingresses available at a time. We are facing regularily downtimes especially when spot interruptions or ASG node rollings happen that the ALB/NLB takes up to 2 minutes to reflect the group change. For smaller clusters this leads to no Skipper instance being registered hence no target available to forward traffic.
With this new mode the registration happens independently and instantly, the registration of targets up to be serving takes less than 10seconds!

## independent scaling of nodes

With nodeports there is an eventual dependency on available nodes to scale the Ingress.
Plus the Ingress pod cannot be replaced in place but requires a termination first and then rescheduling. For a certain time which can be more than a minute, this node is offline as an Ingress.
With this mode the host networking and node port is obsolete, which allows node indepent scaling of Skipper pods! Skipper becomes a regular deployment and its replicaSet can be indepent on the cluster size.

## Save de/registering and instantaneous response

Core idea is the event based registration to Kubernetes using pod `Informer` that receives immediate notifications about pod changes, which allow almost zero delayed updates on the target groups.

The registration happens as soon as the pod received an IP from AWS VPC. Hence the readiness probe of the ALB/NLB starts to monitor already during scheduling of the pod, serving the earliest possible. Tests in lab show pods serving ALB traffic well under 10s from scheduling!

Deregistration happens bound to the kubernetes event. That means the LB is now in sync with the cluster and will stop sending traffic before the pod is actually terminated. This implement save deregistering without traffic loss.

## TG without unhealthy targets

Since the IP based TGs are managed now by this controller, they represent pods and thus all of them are shown healthy, otherwise removed by this controller.

# Implementation details:

* client-go Informer: This high level functions are providing a convenient access to event registrations of kubernetes. Since the event registration is the key of fast response and efficient compared to high rate polling, using this standard factory methods seems standing to reason.

## Todo

Settle on the impelemtation agreement and finish it up by adding unit tests.

## Tests

*  successful transistion of TG from type Instance to type IP vice versa
* the controller registers pods that are discovered
* the controller deregisters pods that are "terminating" status
* the controller recovers desired state if manual intervention on the TG happened by "resyncing"
* it removes pods that are killed or dead

## misc

* extended the service account with required RBAC permissions to watch/list pods
* added example of Skipper without a HostNetwork and NodePort

Signed-off-by: Samuel Lang <[email protected]>
  • Loading branch information
universam1 committed Jan 3, 2022
1 parent 5c66137 commit 712b195
Show file tree
Hide file tree
Showing 19 changed files with 1,380 additions and 25 deletions.
105 changes: 97 additions & 8 deletions aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type Adapter struct {
denyInternalRespBody string
denyInternalRespContentType string
denyInternalRespStatusCode int
TargetCNI *TargetCNIconfig
}

type TargetCNIconfig struct {
Enabled bool
TargetGroupARNs []string
TargetGroupCh chan []string
}

type manifest struct {
Expand Down Expand Up @@ -124,11 +131,14 @@ const (
DefaultNLBCrossZone = false
DefaultNLBHTTPEnabled = false

nameTag = "Name"
LoadBalancerTypeApplication = "application"
LoadBalancerTypeNetwork = "network"
IPAddressTypeIPV4 = "ipv4"
IPAddressTypeDualstack = "dualstack"
nameTag = "Name"
LoadBalancerTypeApplication = "application"
LoadBalancerTypeNetwork = "network"
IPAddressTypeIPV4 = "ipv4"
IPAddressTypeDualstack = "dualstack"
TargetAccessModeAWSCNI = "AWSCNI"
TargetAccessModeNodePort = "NodePort"
DefaultTargetCNILabelSelector = "kube-system/application=skipper-ingress"
)

var (
Expand All @@ -146,6 +156,8 @@ var (
ErrMissingAutoScalingGroupTag = errors.New(`instance is missing the "` + autoScalingGroupNameTag + `" tag`)
// ErrNoRunningInstances is used to signal that no instances were found in the running state
ErrNoRunningInstances = errors.New("no reservations or instances in the running state")
// ErrNoTargetGroupARNs is used to signal that no target group ARNs are available yet
ErrNoTargetGroupARNs = errors.New("no target group ARNs cached")
// SSLPolicies is a map of valid ALB SSL Policies
// https://docs.aws.amazon.com/elasticloadbalancing/latest/application/create-https-listener.html#describe-ssl-policies
SSLPolicies = map[string]bool{
Expand Down Expand Up @@ -225,6 +237,10 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume
nlbCrossZone: DefaultNLBCrossZone,
nlbHTTPEnabled: DefaultNLBHTTPEnabled,
customFilter: DefaultCustomFilter,
TargetCNI: &TargetCNIconfig{
Enabled: false,
TargetGroupCh: make(chan []string, 10),
},
}

adapter.manifest, err = buildManifest(adapter, clusterID, vpcID)
Expand Down Expand Up @@ -432,6 +448,15 @@ func (a *Adapter) WithInternalDomains(domains []string) *Adapter {
return a
}

// WithTargetAccessMode returns the receiver adapter after defining the target access mode
func (a *Adapter) WithTargetAccessMode(t string) *Adapter {
a.TargetCNI.Enabled = false
if t == TargetAccessModeAWSCNI {
a.TargetCNI.Enabled = true
}
return a
}

// WithDenyInternalDomains returns the receiver adapter after setting
// the denyInternalDomains config.
func (a *Adapter) WithDenyInternalDomains(deny bool) *Adapter {
Expand Down Expand Up @@ -568,14 +593,26 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
return
}

// split the full list into relevant TG types
targetTypesARNs, err := categorizeTargetTypeInstance(a.elbv2, targetGroupARNs)
if err != nil {
problems.Add("failed to categorize Target Type Instance: %w", err)
return
}

// update the CNI TG list
if a.TargetCNI.Enabled {
a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp]
}

ownerTags := map[string]string{
clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned,
kubernetesCreatorTag: a.controllerID,
}

for _, asg := range a.TargetedAutoScalingGroups {
// This call is idempotent and safe to execute every time
if err := updateTargetGroupsForAutoScalingGroup(a.autoscaling, a.elbv2, targetGroupARNs, asg.name, ownerTags); err != nil {
if err := updateTargetGroupsForAutoScalingGroup(a.autoscaling, a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], asg.name, ownerTags); err != nil {
problems.Add("failed to update target groups for autoscaling group %q: %w", asg.name, err)
}
}
Expand All @@ -592,13 +629,13 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
runningSingleInstances := a.RunningSingleInstances()
if len(runningSingleInstances) != 0 {
// This call is idempotent too
if err := registerTargetsOnTargetGroups(a.elbv2, targetGroupARNs, runningSingleInstances); err != nil {
if err := registerTargetsOnTargetGroups(a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], runningSingleInstances); err != nil {
problems.Add("failed to register instances %q in target groups: %w", runningSingleInstances, err)
}
}
if len(a.obsoleteInstances) != 0 {
// Deregister instances from target groups and clean up list of obsolete instances
if err := deregisterTargetsOnTargetGroups(a.elbv2, targetGroupARNs, a.obsoleteInstances); err != nil {
if err := deregisterTargetsOnTargetGroups(a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], a.obsoleteInstances); err != nil {
problems.Add("failed to deregister instances %q in target groups: %w", a.obsoleteInstances, err)
} else {
a.obsoleteInstances = make([]string, 0)
Expand Down Expand Up @@ -665,6 +702,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
http2: http2,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
denyInternalDomains: a.denyInternalDomains,
denyInternalDomainsResponse: denyResp{
body: a.denyInternalRespBody,
Expand Down Expand Up @@ -720,6 +758,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
http2: http2,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
denyInternalDomains: a.denyInternalDomains,
denyInternalDomainsResponse: denyResp{
body: a.denyInternalRespBody,
Expand Down Expand Up @@ -1009,3 +1048,53 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails

return nonTargetedASGs
}

// SetTargetsOnCNITargetGroups implements desired state for CNI target groups
// by polling the current list of targets thus creating a diff of what needs to be added and removed.
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []string) error {
if a.TargetCNI.TargetGroupARNs == nil || len(a.TargetCNI.TargetGroupARNs) == 0 {
return ErrNoTargetGroupARNs
}
for _, targetGroupARN := range a.TargetCNI.TargetGroupARNs {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN})
if err != nil {
return fmt.Errorf("unable to describe target health %w", err)
}
registeredInstances := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredInstances[i] = *target.Target.Id
}
toregister := difference(endpoints, registeredInstances)
if len(toregister) > 0 {
log.Info("Registering CNI targets: ", toregister)
err := registerTargetsOnTargetGroups(a.elbv2, a.TargetCNI.TargetGroupARNs, toregister)
if err != nil {
return err
}
}
toderegister := difference(registeredInstances, endpoints)
if len(toderegister) > 0 {
log.Info("Deregistering CNI targets: ", toderegister)
err := deregisterTargetsOnTargetGroups(a.elbv2, a.TargetCNI.TargetGroupARNs, toderegister)
if err != nil {
return err
}
}
}
return nil
}

// difference returns the elements in `a` that aren't in `b`.
func difference(a, b []string) []string {
mb := make(map[string]struct{}, len(b))
for _, x := range b {
mb[x] = struct{}{}
}
var diff []string
for _, x := range a {
if _, found := mb[x]; !found {
diff = append(diff, x)
}
}
return diff
}
67 changes: 67 additions & 0 deletions aws/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,70 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) {
require.Equal(t, uint(4), b.nlbHealthyThresholdCount)
})
}

func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}}
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
describeTargetHealth: &apiResponse{response: &thOut, err: nil},
registerTargets: R(mockDTOutput(), nil),
deregisterTargets: R(mockDTOutput(), nil),
},
}
a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}}

t.Run("No target group ARNs cached", func(t *testing.T) {
require.ErrorIs(t, ErrNoTargetGroupARNs, a.SetTargetsOnCNITargetGroups(nil))
})

t.Run("adding a new endpoint", func(t *testing.T) {
a.TargetCNI.TargetGroupARNs = []string{"asg1"}

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}))
require.Equal(t, []*elbv2.RegisterTargetsInput{{
TargetGroupArn: aws.String("asg1"),
Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}},
}}, m.rtinputs)
require.Equal(t, []*elbv2.DeregisterTargetsInput(nil), m.dtinputs)
})

t.Run("two new endpoints, registers the new EPs only", func(t *testing.T) {
thOut = elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("1.1.1.1")}}},
}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}))
require.Equal(t, []*elbv2.TargetDescription{
{Id: aws.String("2.2.2.2")},
{Id: aws.String("3.3.3.3")},
}, m.rtinputs[0].Targets)
require.Equal(t, []*elbv2.DeregisterTargetsInput(nil), m.dtinputs)
})

t.Run("removing one endpoint, causing deregistration of it", func(t *testing.T) {
thOut = elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("1.1.1.1")}},
{Target: &elbv2.TargetDescription{Id: aws.String("2.2.2.2")}},
{Target: &elbv2.TargetDescription{Id: aws.String("3.3.3.3")}},
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}))
require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.rtinputs)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.dtinputs[0].Targets)
})

t.Run("restoring desired state after external manipulation, adding and removing one", func(t *testing.T) {
thOut = elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("1.1.1.1")}},
{Target: &elbv2.TargetDescription{Id: aws.String("2.2.2.2")}},
{Target: &elbv2.TargetDescription{Id: aws.String("4.4.4.4")}},
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}))
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.rtinputs[0].Targets)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.dtinputs[0].Targets)
})
}
19 changes: 19 additions & 0 deletions aws/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er
return targetGroups, err
}

// map the target group slice into specific types such as instance, ip, etc
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) {
targetTypes := make(map[string][]string)
err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{},
func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool {
for _, tg := range resp.TargetGroups {
for _, v := range allTGARNs {
if v != aws.StringValue(tg.TargetGroupArn) {
continue
}
targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn))
}
}
return true
})
log.Debugf("categorized target group arns: %#v", targetTypes)
return targetTypes, err
}

// tgHasTags returns true if the specified resource has the expected tags.
func tgHasTags(descs []*elbv2.TagDescription, arn string, tags map[string]string) bool {
for _, desc := range descs {
Expand Down
54 changes: 54 additions & 0 deletions aws/asg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func TestAttach(t *testing.T) {
TargetGroups: []*elbv2.TargetGroup{
{
TargetGroupArn: aws.String("foo"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
},
}, nil),
Expand Down Expand Up @@ -301,6 +302,7 @@ func TestAttach(t *testing.T) {
TargetGroups: []*elbv2.TargetGroup{
{
TargetGroupArn: aws.String("foo"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
},
}, nil),
Expand Down Expand Up @@ -466,9 +468,11 @@ func TestAttach(t *testing.T) {
TargetGroups: []*elbv2.TargetGroup{
{
TargetGroupArn: aws.String("foo"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
{
TargetGroupArn: aws.String("bar"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
},
}, nil),
Expand Down Expand Up @@ -674,3 +678,53 @@ func TestProcessChunked(t *testing.T) {
})
}
}

func Test_categorizeTargetTypeInstance(t *testing.T) {
for _, test := range []struct {
name string
targetGroups map[string][]string
}{
{
name: "one from any type",
targetGroups: map[string][]string{
elbv2.TargetTypeEnumInstance: {"instancy"},
elbv2.TargetTypeEnumAlb: {"albly"},
elbv2.TargetTypeEnumIp: {"ipvy"},
elbv2.TargetTypeEnumLambda: {"lambada"},
},
},
{
name: "one type many target groups",
targetGroups: map[string][]string{
elbv2.TargetTypeEnumInstance: {"instancy", "foo", "void", "bar", "blank"},
},
},
{
name: "several types many target groups",
targetGroups: map[string][]string{
elbv2.TargetTypeEnumInstance: {"instancy", "foo", "void", "bar", "blank"},
elbv2.TargetTypeEnumAlb: {"albly", "alblily"},
elbv2.TargetTypeEnumIp: {"ipvy"},
},
},
} {
t.Run(test.name, func(t *testing.T) {
tg := []string{}
tgResponse := []*elbv2.TargetGroup{}
for k, v := range test.targetGroups {
for _, i := range v {
tg = append(tg, i)
tgResponse = append(tgResponse, &elbv2.TargetGroup{TargetGroupArn: aws.String(i), TargetType: aws.String(k)})
}
}

mockElbv2Svc := &mockElbv2Client{outputs: elbv2MockOutputs{describeTargetGroups: R(&elbv2.DescribeTargetGroupsOutput{TargetGroups: tgResponse}, nil)}}
got, err := categorizeTargetTypeInstance(mockElbv2Svc, tg)
assert.NoError(t, err)
for k, v := range test.targetGroups {
assert.Len(t, got[k], len(v))
assert.Equal(t, got[k], v)
}
})
}
}
1 change: 1 addition & 0 deletions aws/cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type stackSpec struct {
denyInternalDomainsResponse denyResp
internalDomains []string
tags map[string]string
targetAccessModeCNI bool
}

type healthCheck struct {
Expand Down
6 changes: 6 additions & 0 deletions aws/cf_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/sha256"
"sort"

"github.com/aws/aws-sdk-go/service/elbv2"
cloudformation "github.com/mweagle/go-cloudformation"
)

Expand Down Expand Up @@ -444,6 +445,10 @@ func generateDenyInternalTrafficRule(listenerName string, rulePriority int64, in
}

func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation.ElasticLoadBalancingV2TargetGroup {
targetType := elbv2.TargetTypeEnumInstance
if spec.targetAccessModeCNI {
targetType = elbv2.TargetTypeEnumIp
}
protocol := "HTTP"
healthCheckProtocol := "HTTP"
healthyThresholdCount, unhealthyThresholdCount := spec.albHealthyThresholdCount, spec.albUnhealthyThresholdCount
Expand Down Expand Up @@ -472,6 +477,7 @@ func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation
UnhealthyThresholdCount: cloudformation.Integer(int64(unhealthyThresholdCount)),
Port: cloudformation.Ref(targetPortParameter).Integer(),
Protocol: cloudformation.String(protocol),
TargetType: cloudformation.String(targetType),
VPCID: cloudformation.Ref(parameterTargetGroupVPCIDParameter).String(),
}

Expand Down
Loading

0 comments on commit 712b195

Please sign in to comment.