Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retry when updating aws_lambda_event_source_mapping #28586

Merged
merged 10 commits into from
Apr 18, 2023
7 changes: 7 additions & 0 deletions .changelog/28586.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
resource/aws_lambda_event_source_mapping: Fix IAM eventual consistency errors on resource Update
```

```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `document_db_event_source_config` configuration block
```
201 changes: 127 additions & 74 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func ResourceEventSourceMapping() *schema.Resource {
}

switch serviceName {
case "dynamodb", "kinesis", "kafka", "mq":
case "dynamodb", "kinesis", "kafka", "mq", "rds":
return old == "100"
case "sqs":
return old == "10"
Expand Down Expand Up @@ -118,6 +118,29 @@ func ResourceEventSourceMapping() *schema.Resource {
},
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
},
"document_db_event_source_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"collection_name": {
Type: schema.TypeString,
Optional: true,
},
"database_name": {
Type: schema.TypeString,
Required: true,
},
"full_document": {
Type: schema.TypeString,
Optional: true,
Default: lambda.FullDocumentDefault,
ValidateFunc: validation.StringInSlice(lambda.FullDocument_Values(), false),
},
},
},
},
"enabled": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -368,6 +391,10 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat
input.DestinationConfig = expandDestinationConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("document_db_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.DocumentDBEventSourceConfig = expandDocumentDBEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("event_source_arn"); ok {
v := v.(string)

Expand Down Expand Up @@ -439,51 +466,24 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat
input.TumblingWindowInSeconds = aws.Int64(int64(v.(int)))
}

log.Printf("[DEBUG] Creating Lambda Event Source Mapping: %s", input)

// IAM profiles and roles can take some time to propagate in AWS:
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#launch-instance-with-role-console
// Error creating Lambda function: InvalidParameterValueException: The
// function defined for the task cannot be assumed by Lambda.
//
// The role may exist, but the permissions may not have propagated, so we
// retry
var eventSourceMappingConfiguration *lambda.EventSourceMappingConfiguration
var err error
err = retry.RetryContext(ctx, propagationTimeout, func() *retry.RetryError {
eventSourceMappingConfiguration, err = conn.CreateEventSourceMappingWithContext(ctx, input)

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "cannot be assumed by Lambda") {
return retry.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "execution role does not have permissions") {
return retry.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "ensure the role can perform") {
return retry.RetryableError(err)
}

if err != nil {
return retry.NonRetryableError(err)
}

return nil
// The role may exist, but the permissions may not have propagated, so we retry.
eventSourceMappingConfiguration, err := retryEventSourceMapping(ctx, func() (*lambda.EventSourceMappingConfiguration, error) {
return conn.CreateEventSourceMappingWithContext(ctx, input)
})

if tfresource.TimedOut(err) {
eventSourceMappingConfiguration, err = conn.CreateEventSourceMappingWithContext(ctx, input)
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "creating Lambda Event Source Mapping (%s): %s", target, err)
}

d.SetId(aws.StringValue(eventSourceMappingConfiguration.UUID))

if _, err := waitEventSourceMappingCreate(ctx, conn, d.Id()); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) to create: %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) create: %s", d.Id(), err)
}

return append(diags, resourceEventSourceMappingRead(ctx, d, meta)...)
Expand Down Expand Up @@ -521,6 +521,13 @@ func resourceEventSourceMappingRead(ctx context.Context, d *schema.ResourceData,
} else {
d.Set("destination_config", nil)
}
if eventSourceMappingConfiguration.DocumentDBEventSourceConfig != nil {
if err := d.Set("document_db_event_source_config", []interface{}{flattenDocumentDBEventSourceConfig(eventSourceMappingConfiguration.DocumentDBEventSourceConfig)}); err != nil {
return sdkdiag.AppendErrorf(diags, "setting document_db_event_source_config: %s", err)
}
} else {
d.Set("document_db_event_source_config", nil)
}
d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn)
if v := eventSourceMappingConfiguration.FilterCriteria; v != nil {
if err := d.Set("filter_criteria", []interface{}{flattenFilterCriteria(v)}); err != nil {
Expand Down Expand Up @@ -596,8 +603,6 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).LambdaConn()

log.Printf("[DEBUG] Updating Lambda Event Source Mapping: %s", d.Id())

input := &lambda.UpdateEventSourceMappingInput{
UUID: aws.String(d.Id()),
}
Expand All @@ -616,6 +621,12 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
}
}

if d.HasChange("document_db_event_source_config") {
if v, ok := d.GetOk("document_db_event_source_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.DocumentDBEventSourceConfig = expandDocumentDBEventSourceConfig(v.([]interface{})[0].(map[string]interface{}))
}
}

if d.HasChange("enabled") {
input.Enabled = aws.Bool(d.Get("enabled").(bool))
}
Expand Down Expand Up @@ -672,30 +683,16 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
input.TumblingWindowInSeconds = aws.Int64(int64(d.Get("tumbling_window_in_seconds").(int)))
}

err := retry.RetryContext(ctx, eventSourceMappingPropagationTimeout, func() *retry.RetryError {
_, err := conn.UpdateEventSourceMappingWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) {
return retry.RetryableError(err)
}

if err != nil {
return retry.NonRetryableError(err)
}

return nil
_, err := retryEventSourceMapping(ctx, func() (*lambda.EventSourceMappingConfiguration, error) {
return conn.UpdateEventSourceMappingWithContext(ctx, input)
})

if tfresource.TimedOut(err) {
_, err = conn.UpdateEventSourceMappingWithContext(ctx, input)
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "updating Lambda Event Source Mapping (%s): %s", d.Id(), err)
}

if _, err := waitEventSourceMappingUpdate(ctx, conn, d.Id()); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) to update: %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) update: %s", d.Id(), err)
}

return append(diags, resourceEventSourceMappingRead(ctx, d, meta)...)
Expand All @@ -706,28 +703,11 @@ func resourceEventSourceMappingDelete(ctx context.Context, d *schema.ResourceDat
conn := meta.(*conns.AWSClient).LambdaConn()

log.Printf("[INFO] Deleting Lambda Event Source Mapping: %s", d.Id())

input := &lambda.DeleteEventSourceMappingInput{
UUID: aws.String(d.Id()),
}

err := retry.RetryContext(ctx, eventSourceMappingPropagationTimeout, func() *retry.RetryError {
_, err := conn.DeleteEventSourceMappingWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) {
return retry.RetryableError(err)
}

if err != nil {
return retry.NonRetryableError(err)
}

return nil
})

if tfresource.TimedOut(err) {
_, err = conn.DeleteEventSourceMappingWithContext(ctx, input)
}
_, err := tfresource.RetryWhenAWSErrCodeEquals(ctx, eventSourceMappingPropagationTimeout, func() (interface{}, error) {
return conn.DeleteEventSourceMappingWithContext(ctx, &lambda.DeleteEventSourceMappingInput{
UUID: aws.String(d.Id()),
})
}, lambda.ErrCodeResourceInUseException)

if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) {
return diags
Expand All @@ -738,7 +718,7 @@ func resourceEventSourceMappingDelete(ctx context.Context, d *schema.ResourceDat
}

if _, err := waitEventSourceMappingDelete(ctx, conn, d.Id()); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) to delete: %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "waiting for Lambda Event Source Mapping (%s) delete: %s", d.Id(), err)
}

return diags
Expand All @@ -758,6 +738,28 @@ func expandDestinationConfig(tfMap map[string]interface{}) *lambda.DestinationCo
return apiObject
}

func expandDocumentDBEventSourceConfig(tfMap map[string]interface{}) *lambda.DocumentDBEventSourceConfig {
if tfMap == nil {
return nil
}

apiObject := &lambda.DocumentDBEventSourceConfig{}

if v, ok := tfMap["collection_name"].(string); ok && v != "" {
apiObject.CollectionName = aws.String(v)
}

if v, ok := tfMap["database_name"].(string); ok && v != "" {
apiObject.DatabaseName = aws.String(v)
}

if v, ok := tfMap["full_document"].(string); ok && v != "" {
apiObject.FullDocument = aws.String(v)
}

return apiObject
}

func expandOnFailure(tfMap map[string]interface{}) *lambda.OnFailure {
if tfMap == nil {
return nil
Expand Down Expand Up @@ -786,6 +788,28 @@ func flattenDestinationConfig(apiObject *lambda.DestinationConfig) map[string]in
return tfMap
}

func flattenDocumentDBEventSourceConfig(apiObject *lambda.DocumentDBEventSourceConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.CollectionName; v != nil {
tfMap["collection_name"] = aws.StringValue(v)
}

if v := apiObject.DatabaseName; v != nil {
tfMap["database_name"] = aws.StringValue(v)
}

if v := apiObject.FullDocument; v != nil {
tfMap["full_document"] = aws.StringValue(v)
}

return tfMap
}

func flattenOnFailure(apiObject *lambda.OnFailure) map[string]interface{} {
if apiObject == nil {
return nil
Expand Down Expand Up @@ -1213,3 +1237,32 @@ func waitEventSourceMappingUpdate(ctx context.Context, conn *lambda.Lambda, id s

return nil, err
}

func retryEventSourceMapping(ctx context.Context, f func() (*lambda.EventSourceMappingConfiguration, error)) (*lambda.EventSourceMappingConfiguration, error) {
outputRaw, err := tfresource.RetryWhen(ctx, propagationTimeout,
func() (interface{}, error) {
return f()
},
func(err error) (bool, error) {
if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "cannot be assumed by Lambda") {
return true, err
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "execution role does not have permissions") {
return true, err
}

if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "ensure the role can perform") {
return true, err
}

return false, err
},
)

if err != nil {
return nil, err
}

return outputRaw.(*lambda.EventSourceMappingConfiguration), err
}
Loading