Skip to content

Commit

Permalink
Merge pull request #28586 from cecheta/b-aws_lambda_event_source_mapp…
Browse files Browse the repository at this point in the history
…ing-retry

Fix retry when updating `aws_lambda_event_source_mapping`
  • Loading branch information
ewbankkit authored Apr 18, 2023
2 parents 1527f15 + 7e25c8f commit f95ac3c
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 124 deletions.
7 changes: 7 additions & 0 deletions .changelog/28586.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
resource/aws_lambda_event_source_mapping: Fix IAM eventual consistency errors on resource Update
```

```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `document_db_event_source_config` configuration block
```
201 changes: 127 additions & 74 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func ResourceEventSourceMapping() *schema.Resource {
}

switch serviceName {
case "dynamodb", "kinesis", "kafka", "mq":
case "dynamodb", "kinesis", "kafka", "mq", "rds":
return old == "100"
case "sqs":
return old == "10"
Expand Down Expand Up @@ -118,6 +118,29 @@ func ResourceEventSourceMapping() *schema.Resource {
},
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
},
"document_db_event_source_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"collection_name": {
Type: schema.TypeString,
Optional: true,
},
"database_name": {
Type: schema.TypeString,
Required: true,
},
"full_document": {
Type: schema.TypeString,
Optional: true,
Default: lambda.FullDocumentDefault,
ValidateFunc: validation.StringInSlice(lambda.FullDocument_Values(), false),
},
},
},
},
"enabled": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -368,6 +391,10 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat
input.DestinationConfig = expandDestinationConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("document_db_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.DocumentDBEventSourceConfig = expandDocumentDBEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("event_source_arn"); ok {
v := v.(string)

Expand Down Expand Up @@ -439,51 +466,24 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat
input.TumblingWindowInSeconds = aws.Int64(int64(v.(int)))
}

log.Printf("[DEBUG] Creating Lambda Event Source Mapping: %s", input)

// IAM profiles and roles can take some time to propagate in AWS:
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#launch-instance-with-role-console
// Error creating Lambda function: InvalidParameterValueException: The
// function defined for the task cannot be assumed by Lambda.
//
// The role may exist, but the permissions may not have propagated, so we
// retry
var eventSourceMappingConfiguration *lambda.EventSourceMappingConfiguration
var err error
err = retry.RetryContext(ctx, propagationTimeout, func() *retry.RetryError {
eventSourceMappingConfiguration, err = conn.CreateEventSourceMappingWithContext(ctx, input)

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "cannot be assumed by Lambda") {
return retry.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "execution role does not have permissions") {
return retry.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "ensure the role can perform") {
return retry.RetryableError(err)
}

if err != nil {
return retry.NonRetryableError(err)
}

return nil
// The role may exist, but the permissions may not have propagated, so we retry.
eventSourceMappingConfiguration, err := retryEventSourceMapping(ctx, func() (*lambda.EventSourceMappingConfiguration, error) {
return conn.CreateEventSourceMappingWithContext(ctx, input)
})

if tfresource.TimedOut(err) {
eventSourceMappingConfiguration, err = conn.CreateEventSourceMappingWithContext(ctx, input)
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "creating Lambda Event Source Mapping (%s): %s", target, err)
}

d.SetId(aws.StringValue(eventSourceMappingConfiguration.UUID))

if _, err := waitEventSourceMappingCreate(ctx, conn, d.Id()); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) to create: %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) create: %s", d.Id(), err)
}

return append(diags, resourceEventSourceMappingRead(ctx, d, meta)...)
Expand Down Expand Up @@ -521,6 +521,13 @@ func resourceEventSourceMappingRead(ctx context.Context, d *schema.ResourceData,
} else {
d.Set("destination_config", nil)
}
if eventSourceMappingConfiguration.DocumentDBEventSourceConfig != nil {
if err := d.Set("document_db_event_source_config", []interface{}{flattenDocumentDBEventSourceConfig(eventSourceMappingConfiguration.DocumentDBEventSourceConfig)}); err != nil {
return sdkdiag.AppendErrorf(diags, "setting document_db_event_source_config: %s", err)
}
} else {
d.Set("document_db_event_source_config", nil)
}
d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn)
if v := eventSourceMappingConfiguration.FilterCriteria; v != nil {
if err := d.Set("filter_criteria", []interface{}{flattenFilterCriteria(v)}); err != nil {
Expand Down Expand Up @@ -596,8 +603,6 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).LambdaConn()

log.Printf("[DEBUG] Updating Lambda Event Source Mapping: %s", d.Id())

input := &lambda.UpdateEventSourceMappingInput{
UUID: aws.String(d.Id()),
}
Expand All @@ -616,6 +621,12 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
}
}

if d.HasChange("document_db_event_source_config") {
if v, ok := d.GetOk("document_db_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.DocumentDBEventSourceConfig = expandDocumentDBEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
}
}

if d.HasChange("enabled") {
input.Enabled = aws.Bool(d.Get("enabled").(bool))
}
Expand Down Expand Up @@ -672,30 +683,16 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
input.TumblingWindowInSeconds = aws.Int64(int64(d.Get("tumbling_window_in_seconds").(int)))
}

err := retry.RetryContext(ctx, eventSourceMappingPropagationTimeout, func() *retry.RetryError {
_, err := conn.UpdateEventSourceMappingWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) {
return retry.RetryableError(err)
}

if err != nil {
return retry.NonRetryableError(err)
}

return nil
_, err := retryEventSourceMapping(ctx, func() (*lambda.EventSourceMappingConfiguration, error) {
return conn.UpdateEventSourceMappingWithContext(ctx, input)
})

if tfresource.TimedOut(err) {
_, err = conn.UpdateEventSourceMappingWithContext(ctx, input)
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "updating Lambda Event Source Mapping (%s): %s", d.Id(), err)
}

if _, err := waitEventSourceMappingUpdate(ctx, conn, d.Id()); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) to update: %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) update: %s", d.Id(), err)
}

return append(diags, resourceEventSourceMappingRead(ctx, d, meta)...)
Expand All @@ -706,28 +703,11 @@ func resourceEventSourceMappingDelete(ctx context.Context, d *schema.ResourceDat
conn := meta.(*conns.AWSClient).LambdaConn()

log.Printf("[INFO] Deleting Lambda Event Source Mapping: %s", d.Id())

input := &lambda.DeleteEventSourceMappingInput{
UUID: aws.String(d.Id()),
}

err := retry.RetryContext(ctx, eventSourceMappingPropagationTimeout, func() *retry.RetryError {
_, err := conn.DeleteEventSourceMappingWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) {
return retry.RetryableError(err)
}

if err != nil {
return retry.NonRetryableError(err)
}

return nil
})

if tfresource.TimedOut(err) {
_, err = conn.DeleteEventSourceMappingWithContext(ctx, input)
}
_, err := tfresource.RetryWhenAWSErrCodeEquals(ctx, eventSourceMappingPropagationTimeout, func() (interface{}, error) {
return conn.DeleteEventSourceMappingWithContext(ctx, &lambda.DeleteEventSourceMappingInput{
UUID: aws.String(d.Id()),
})
}, lambda.ErrCodeResourceInUseException)

if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) {
return diags
Expand All @@ -738,7 +718,7 @@ func resourceEventSourceMappingDelete(ctx context.Context, d *schema.ResourceDat
}

if _, err := waitEventSourceMappingDelete(ctx, conn, d.Id()); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) to delete: %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) delete: %s", d.Id(), err)
}

return diags
Expand All @@ -758,6 +738,28 @@ func expandDestinationConfig(tfMap map[string]interface{}) *lambda.DestinationCo
return apiObject
}

func expandDocumentDBEventSourceConfig(tfMap map[string]interface{}) *lambda.DocumentDBEventSourceConfig {
if tfMap == nil {
return nil
}

apiObject := &lambda.DocumentDBEventSourceConfig{}

if v, ok := tfMap["collection_name"].(string); ok && v != "" {
apiObject.CollectionName = aws.String(v)
}

if v, ok := tfMap["database_name"].(string); ok && v != "" {
apiObject.DatabaseName = aws.String(v)
}

if v, ok := tfMap["full_document"].(string); ok && v != "" {
apiObject.FullDocument = aws.String(v)
}

return apiObject
}

func expandOnFailure(tfMap map[string]interface{}) *lambda.OnFailure {
if tfMap == nil {
return nil
Expand Down Expand Up @@ -786,6 +788,28 @@ func flattenDestinationConfig(apiObject *lambda.DestinationConfig) map[string]in
return tfMap
}

func flattenDocumentDBEventSourceConfig(apiObject *lambda.DocumentDBEventSourceConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.CollectionName; v != nil {
tfMap["collection_name"] = aws.StringValue(v)
}

if v := apiObject.DatabaseName; v != nil {
tfMap["database_name"] = aws.StringValue(v)
}

if v := apiObject.FullDocument; v != nil {
tfMap["full_document"] = aws.StringValue(v)
}

return tfMap
}

func flattenOnFailure(apiObject *lambda.OnFailure) map[string]interface{} {
if apiObject == nil {
return nil
Expand Down Expand Up @@ -1213,3 +1237,32 @@ func waitEventSourceMappingUpdate(ctx context.Context, conn *lambda.Lambda, id s

return nil, err
}

func retryEventSourceMapping(ctx context.Context, f func() (*lambda.EventSourceMappingConfiguration, error)) (*lambda.EventSourceMappingConfiguration, error) {
outputRaw, err := tfresource.RetryWhen(ctx, propagationTimeout,
func() (interface{}, error) {
return f()
},
func(err error) (bool, error) {
if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "cannot be assumed by Lambda") {
return true, err
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "execution role does not have permissions") {
return true, err
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "ensure the role can perform") {
return true, err
}

return false, err
},
)

if err != nil {
return nil, err
}

return outputRaw.(*lambda.EventSourceMappingConfiguration), err
}
Loading

0 comments on commit f95ac3c

Please sign in to comment.