Skip to content

Commit

Permalink
AWS CNI Mode
Browse files Browse the repository at this point in the history
✨ new operational mode of Direct Ingress Pod access

This adds a new operational mode of adding the Ingress pods as Targets in the Target Groups directly, instead of the nodes where the Ingress is accessed through a node port.

The core idea is based on the fact that standard AWS EKS cluster running AWS VPC CNI have their pods as first class members in the VPCs. Their IPs are directly reachable from the ALB/NLB target groups like the nodes are too, which means there is no necessity for the NodePort extra hop to take.

There are several drivers and advantages accessing the pod directly vs a Nodeport:

## Delay and eventual consistency of AutoscalingGroup to ALB/NLB target group updates

This has been the biggest trouble in operations, that the list of nodes in target groups is slower than the nodes are replaced which ends up in a black hole of no Ingresses available at a time. We are facing regularily downtimes especially when spot interruptions or ASG node rollings happen that the ALB/NLB takes up to 2 minutes to reflect the group change. For smaller clusters this leads to no Skipper instance being registered hence no target available to forward traffic.
With this new mode the registration happens independently and instantly, the registration of targets up to be serving takes less than 10seconds!

## independent scaling of nodes

With nodeports there is an eventual dependency on available nodes to scale the Ingress.
Plus the Ingress pod cannot be replaced in place but requires a termination first and then rescheduling. For a certain time which can be more than a minute, this node is offline as an Ingress.
With this mode the host networking and node port is obsolete, which allows node indepent scaling of Skipper pods! Skipper becomes a regular deployment and its replicaSet can be indepent on the cluster size.

## Save de/registering and instantaneous response

Core idea is the event based registration to Kubernetes using pod `Informer` that receives immediate notifications about pod changes, which allow almost zero delayed updates on the target groups.

The registration happens as soon as the pod received an IP from AWS VPC. Hence the readiness probe of the ALB/NLB starts to monitor already during scheduling of the pod, serving the earliest possible. Tests in lab show pods serving ALB traffic well under 10s from scheduling!

Deregistration happens bound to the kubernetes event. That means the LB is now in sync with the cluster and will stop sending traffic before the pod is actually terminated. This implement save deregistering without traffic loss.

## TG without unhealthy targets

Since the IP based TGs are managed now by this controller, they represent pods and thus all of them are shown healthy, otherwise removed by this controller.

# Implementation details:

* client-go Informer: This high level functions are providing a convenient access to event registrations of kubernetes. Since the event registration is the key of fast response and efficient compared to high rate polling, using this standard factory methods seems standing to reason.

## Todo

Settle on the impelemtation agreement and finish it up by adding unit tests.

## Tests

*  successful transistion of TG from type Instance to type IP vice versa
* the controller registers pods that are discovered
* the controller deregisters pods that are "terminating" status
* the controller recovers desired state if manual intervention on the TG happened by "resyncing"
* it removes pods that are killed or dead

## misc

* extended the service account with required RBAC permissions to watch/list pods
* added example of Skipper without a HostNetwork and NodePort
  • Loading branch information
universam1 committed Dec 17, 2021
1 parent 5c66137 commit f7be221
Show file tree
Hide file tree
Showing 15 changed files with 811 additions and 17 deletions.
105 changes: 97 additions & 8 deletions aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ type Adapter struct {
denyInternalRespBody string
denyInternalRespContentType string
denyInternalRespStatusCode int
TargetCNI targetCNIconfig
}

type targetCNIconfig struct {
Enabled bool
Endpoints []string
TargetGroupARNs []string
TargetGroupCh chan []string
}

type manifest struct {
Expand Down Expand Up @@ -124,11 +132,14 @@ const (
DefaultNLBCrossZone = false
DefaultNLBHTTPEnabled = false

nameTag = "Name"
LoadBalancerTypeApplication = "application"
LoadBalancerTypeNetwork = "network"
IPAddressTypeIPV4 = "ipv4"
IPAddressTypeDualstack = "dualstack"
nameTag = "Name"
LoadBalancerTypeApplication = "application"
LoadBalancerTypeNetwork = "network"
IPAddressTypeIPV4 = "ipv4"
IPAddressTypeDualstack = "dualstack"
TargetAccessModeAWSCNI = "AWSCNI"
TargetAccessModeNodePort = "NodePort"
DefaultTargetCNILabelSelector = "kube-system/application=skipper-ingress"
)

var (
Expand Down Expand Up @@ -225,6 +236,10 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume
nlbCrossZone: DefaultNLBCrossZone,
nlbHTTPEnabled: DefaultNLBHTTPEnabled,
customFilter: DefaultCustomFilter,
TargetCNI: targetCNIconfig{
Enabled: false,
TargetGroupCh: make(chan []string, 10),
},
}

adapter.manifest, err = buildManifest(adapter, clusterID, vpcID)
Expand Down Expand Up @@ -432,6 +447,15 @@ func (a *Adapter) WithInternalDomains(domains []string) *Adapter {
return a
}

// WithTargetAccessMode returns the receiver adapter after defining the target access mode
func (a *Adapter) WithTargetAccessMode(t string) *Adapter {
a.TargetCNI.Enabled = false
if t == TargetAccessModeAWSCNI {
a.TargetCNI.Enabled = true
}
return a
}

// WithDenyInternalDomains returns the receiver adapter after setting
// the denyInternalDomains config.
func (a *Adapter) WithDenyInternalDomains(deny bool) *Adapter {
Expand Down Expand Up @@ -568,14 +592,26 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
return
}

// split the full list into relevant TG types
targetTypesARNs, err := categorizeTargetTypeInstance(a.elbv2, targetGroupARNs)
if err != nil {
problems.Add("failed to categorize Target Type Instance: %w", err)
return
}

// update the CNI TG list
if a.TargetCNI.Enabled {
a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp]
}

ownerTags := map[string]string{
clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned,
kubernetesCreatorTag: a.controllerID,
}

for _, asg := range a.TargetedAutoScalingGroups {
// This call is idempotent and safe to execute every time
if err := updateTargetGroupsForAutoScalingGroup(a.autoscaling, a.elbv2, targetGroupARNs, asg.name, ownerTags); err != nil {
if err := updateTargetGroupsForAutoScalingGroup(a.autoscaling, a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], asg.name, ownerTags); err != nil {
problems.Add("failed to update target groups for autoscaling group %q: %w", asg.name, err)
}
}
Expand All @@ -592,13 +628,13 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
runningSingleInstances := a.RunningSingleInstances()
if len(runningSingleInstances) != 0 {
// This call is idempotent too
if err := registerTargetsOnTargetGroups(a.elbv2, targetGroupARNs, runningSingleInstances); err != nil {
if err := registerTargetsOnTargetGroups(a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], runningSingleInstances); err != nil {
problems.Add("failed to register instances %q in target groups: %w", runningSingleInstances, err)
}
}
if len(a.obsoleteInstances) != 0 {
// Deregister instances from target groups and clean up list of obsolete instances
if err := deregisterTargetsOnTargetGroups(a.elbv2, targetGroupARNs, a.obsoleteInstances); err != nil {
if err := deregisterTargetsOnTargetGroups(a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], a.obsoleteInstances); err != nil {
problems.Add("failed to deregister instances %q in target groups: %w", a.obsoleteInstances, err)
} else {
a.obsoleteInstances = make([]string, 0)
Expand Down Expand Up @@ -665,6 +701,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
http2: http2,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
denyInternalDomains: a.denyInternalDomains,
denyInternalDomainsResponse: denyResp{
body: a.denyInternalRespBody,
Expand Down Expand Up @@ -720,6 +757,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
http2: http2,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
denyInternalDomains: a.denyInternalDomains,
denyInternalDomainsResponse: denyResp{
body: a.denyInternalRespBody,
Expand Down Expand Up @@ -1009,3 +1047,54 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails

return nonTargetedASGs
}

// SetTargetsOnCNITargetGroups implements desired state for CNI target groups
// by polling the current list of targets thus creating a diff of what needs to be added and removed.
func (a *Adapter) SetTargetsOnCNITargetGroups() error {
if a.TargetCNI.TargetGroupARNs == nil || len(a.TargetCNI.TargetGroupARNs) == 0 {
log.Info("No target group ARNs cached for CNI")
return nil
}
for _, targetGroupARN := range a.TargetCNI.TargetGroupARNs {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN})
if err != nil {
return fmt.Errorf("unable to describe target health %w", err)
}
registeredInstances := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredInstances[i] = *target.Target.Id
}
toregister := difference(a.TargetCNI.Endpoints, registeredInstances)
if len(toregister) > 0 {
log.Info("Registering CNI targets: ", toregister)
err := registerTargetsOnTargetGroups(a.elbv2, a.TargetCNI.TargetGroupARNs, toregister)
if err != nil {
return err
}
}
toderegister := difference(registeredInstances, a.TargetCNI.Endpoints)
if len(toderegister) > 0 {
log.Info("Deregistering CNI targets: ", toderegister)
err := deregisterTargetsOnTargetGroups(a.elbv2, a.TargetCNI.TargetGroupARNs, toderegister)
if err != nil {
return err
}
}
}
return nil
}

// difference returns the elements in `a` that aren't in `b`.
func difference(a, b []string) []string {
mb := make(map[string]struct{}, len(b))
for _, x := range b {
mb[x] = struct{}{}
}
var diff []string
for _, x := range a {
if _, found := mb[x]; !found {
diff = append(diff, x)
}
}
return diff
}
19 changes: 19 additions & 0 deletions aws/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er
return targetGroups, err
}

// map the target group slice into specific types such as instance, ip, etc
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) {
targetTypes := make(map[string][]string)
err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{},
func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool {
for _, tg := range resp.TargetGroups {
for _, v := range allTGARNs {
if v != aws.StringValue(tg.TargetGroupArn) {
continue
}
targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn))
}
}
return true
})
log.Debugf("categorized target group arns: %#v", targetTypes)
return targetTypes, err
}

// tgHasTags returns true if the specified resource has the expected tags.
func tgHasTags(descs []*elbv2.TagDescription, arn string, tags map[string]string) bool {
for _, desc := range descs {
Expand Down
4 changes: 4 additions & 0 deletions aws/asg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func TestAttach(t *testing.T) {
TargetGroups: []*elbv2.TargetGroup{
{
TargetGroupArn: aws.String("foo"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
},
}, nil),
Expand Down Expand Up @@ -301,6 +302,7 @@ func TestAttach(t *testing.T) {
TargetGroups: []*elbv2.TargetGroup{
{
TargetGroupArn: aws.String("foo"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
},
}, nil),
Expand Down Expand Up @@ -466,9 +468,11 @@ func TestAttach(t *testing.T) {
TargetGroups: []*elbv2.TargetGroup{
{
TargetGroupArn: aws.String("foo"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
{
TargetGroupArn: aws.String("bar"),
TargetType: aws.String(elbv2.TargetTypeEnumInstance),
},
},
}, nil),
Expand Down
1 change: 1 addition & 0 deletions aws/cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type stackSpec struct {
denyInternalDomainsResponse denyResp
internalDomains []string
tags map[string]string
targetAccessModeCNI bool
}

type healthCheck struct {
Expand Down
6 changes: 6 additions & 0 deletions aws/cf_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/sha256"
"sort"

"github.com/aws/aws-sdk-go/service/elbv2"
cloudformation "github.com/mweagle/go-cloudformation"
)

Expand Down Expand Up @@ -444,6 +445,10 @@ func generateDenyInternalTrafficRule(listenerName string, rulePriority int64, in
}

func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation.ElasticLoadBalancingV2TargetGroup {
targetType := elbv2.TargetTypeEnumInstance
if spec.targetAccessModeCNI {
targetType = elbv2.TargetTypeEnumIp
}
protocol := "HTTP"
healthCheckProtocol := "HTTP"
healthyThresholdCount, unhealthyThresholdCount := spec.albHealthyThresholdCount, spec.albUnhealthyThresholdCount
Expand Down Expand Up @@ -472,6 +477,7 @@ func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation
UnhealthyThresholdCount: cloudformation.Integer(int64(unhealthyThresholdCount)),
Port: cloudformation.Ref(targetPortParameter).Integer(),
Protocol: cloudformation.String(protocol),
TargetType: cloudformation.String(targetType),
VPCID: cloudformation.Ref(parameterTargetGroupVPCIDParameter).String(),
}

Expand Down
16 changes: 15 additions & 1 deletion controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ var (
nlbHTTPEnabled bool
ingressAPIVersion string
internalDomains []string
targetAccessMode string
targetCNIPodSelector string
denyInternalDomains bool
denyInternalRespBody string
denyInternalRespContentType string
Expand Down Expand Up @@ -280,13 +282,21 @@ func loadSettings() error {
Default("text/plain").StringVar(&denyInternalRespContentType)
kingpin.Flag("deny-internal-domains-response-status-code", "Defines the response status code for a request identified as to an internal domain when -deny-internal-domains is set.").
Default("401").IntVar(&denyInternalRespStatusCode)
kingpin.Flag("target-access-mode", "Target group accessing Ingress via NodePort or AWS VPC CNI. Set to ASG for NodePort access or CNI for pod direct IP access.").
Default(aws.TargetAccessModeNodePort).EnumVar(&targetAccessMode, aws.TargetAccessModeNodePort, aws.TargetAccessModeAWSCNI)
kingpin.Flag("target-cni-pod-selector", "AWS VPC CNI only. Defines the query (namespace/labelselector) for ingress pods that should be linked to target group. Format 'namespace/key=value'.").
Default(aws.DefaultTargetCNILabelSelector).StringVar(&targetCNIPodSelector)
kingpin.Parse()

blacklistCertArnMap = make(map[string]bool)
for _, s := range blacklistCertARNs {
blacklistCertArnMap[s] = true
}

if sl := strings.SplitN(targetCNIPodSelector, "/", 2); len(sl) != 2 || sl[0] == "" || sl[1] == "" {
return fmt.Errorf("Invalid target-cni-pod-selector format")
}

if creationTimeout < 1*time.Minute {
return fmt.Errorf("invalid creation timeout %d. please specify a value > 1min", creationTimeout)
}
Expand Down Expand Up @@ -410,7 +420,8 @@ func main() {
WithDenyInternalDomains(denyInternalDomains).
WithInternalDomainsDenyResponse(denyInternalRespBody).
WithInternalDomainsDenyResponseStatusCode(denyInternalRespStatusCode).
WithInternalDomainsDenyResponseContenType(denyInternalRespContentType)
WithInternalDomainsDenyResponseContenType(denyInternalRespContentType).
WithTargetAccessMode(targetAccessMode)

log.Debug("certs.NewCachingProvider")
certificatesProvider, err := certs.NewCachingProvider(
Expand Down Expand Up @@ -444,6 +455,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
kubeAdapter.WithTargetCNIPodSelector(targetCNIPodSelector)

certificatesPerALB := maxCertsPerALB
if disableSNISupport {
Expand All @@ -465,10 +477,12 @@ func main() {
log.Infof("ALB Logging S3 Prefix: %s", awsAdapter.S3Prefix())
log.Infof("CloudWatch Alarm ConfigMap: %s", cwAlarmConfigMapLocation)
log.Infof("Default LoadBalancer type: %s", loadBalancerType)
log.Infof("Target access mode: %s", targetAccessMode)

ctx, cancel := context.WithCancel(context.Background())
go handleTerminationSignals(cancel, syscall.SIGTERM, syscall.SIGQUIT)
go serveMetrics(metricsAddress)
go cniEventHandler(ctx, awsAdapter, kubeAdapter)
startPolling(
ctx,
certificatesProvider,
Expand Down
2 changes: 2 additions & 0 deletions deploy/ingress-serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ rules:
- apiGroups: # only one of extensions, networking.k8s.io is needed depending on the --ingress-api-version flag
- extensions
- networking.k8s.io
- ""
resources:
- ingresses
- pods
verbs:
- get
- list
Expand Down
Loading

0 comments on commit f7be221

Please sign in to comment.