Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add provisioned_poller_config for kafka in lambda event source … #40303

3 changes: 3 additions & 0 deletions .changelog/40303.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `provisioned_poller_config` argument
```
77 changes: 77 additions & 0 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ func resourceEventSourceMapping() *schema.Resource {
Computed: true,
ValidateFunc: validation.IntBetween(1, 10),
},
"provisioned_poller_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"maximum_pollers": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(1, 2000),
},
"minimum_pollers": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(1, 200),
},
},
},
},
"queues": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -473,6 +494,10 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat
input.ParallelizationFactor = aws.Int32(int32(v.(int)))
}

if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("queues"); ok && len(v.([]interface{})) > 0 {
input.Queues = flex.ExpandStringValueList(v.([]interface{}))
}
Expand Down Expand Up @@ -614,6 +639,13 @@ func resourceEventSourceMappingRead(ctx context.Context, d *schema.ResourceData,
d.Set("metrics_config", nil)
}
d.Set("parallelization_factor", output.ParallelizationFactor)
if v := output.ProvisionedPollerConfig; v != nil {
if err := d.Set("provisioned_poller_config", []interface{}{flattenProvisionedPollerConfig(v)}); err != nil {
return sdkdiag.AppendErrorf(diags, "setting provisioned_poller_config: %s", err)
}
} else {
d.Set("provisioned_poller_config", nil)
}
d.Set("queues", output.Queues)
if v := output.ScalingConfig; v != nil {
if err := d.Set("scaling_config", []interface{}{flattenScalingConfig(v)}); err != nil {
Expand Down Expand Up @@ -742,6 +774,15 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
input.ParallelizationFactor = aws.Int32(int32(d.Get("parallelization_factor").(int)))
}

if d.HasChange("provisioned_poller_config") {
if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{}))
} else {
// AWS ignores the removal if this is left as nil.
input.ProvisionedPollerConfig = &awstypes.ProvisionedPollerConfig{}
}
}

if d.HasChange("scaling_config") {
if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{}))
Expand Down Expand Up @@ -1144,6 +1185,42 @@ func flattenSelfManagedKafkaEventSourceConfig(apiObject *awstypes.SelfManagedKaf
return tfMap
}

func expandProvisionedPollerConfig(tfMap map[string]interface{}) *awstypes.ProvisionedPollerConfig {
if tfMap == nil {
return nil
}

apiObject := &awstypes.ProvisionedPollerConfig{}

if v, ok := tfMap["maximum_pollers"].(int); ok && v != 0 {
apiObject.MaximumPollers = aws.Int32(int32(v))
}

if v, ok := tfMap["minimum_pollers"].(int); ok && v != 0 {
apiObject.MinimumPollers = aws.Int32(int32(v))
}

return apiObject
}

func flattenProvisionedPollerConfig(apiObject *awstypes.ProvisionedPollerConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.MaximumPollers; v != nil {
tfMap["maximum_pollers"] = aws.ToInt32(v)
}

if v := apiObject.MinimumPollers; v != nil {
tfMap["minimum_pollers"] = aws.ToInt32(v)
}

return tfMap
}

func expandSourceAccessConfiguration(tfMap map[string]interface{}) *awstypes.SourceAccessConfiguration {
if tfMap == nil {
return nil
Expand Down
110 changes: 104 additions & 6 deletions internal/service/lambda/event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestAccLambdaEventSourceMapping_Kinesis_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, names.AttrKMSKeyARN, ""),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "metrics_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"),
),
},
Expand Down Expand Up @@ -878,10 +879,11 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) {
Config: testAccEventSourceMappingConfig_msk(rName, "100"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand All @@ -903,10 +905,10 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) {
Config: testAccEventSourceMappingConfig_msk(rName, "9999"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "batch_size", "9999"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand Down Expand Up @@ -936,11 +938,12 @@ func TestAccLambdaEventSourceMapping_mskWithEventSourceConfig(t *testing.T) {
Config: testAccEventSourceMappingConfig_mskWithEventSourceConfig(rName, "100"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand Down Expand Up @@ -973,11 +976,12 @@ func TestAccLambdaEventSourceMapping_selfManagedKafka(t *testing.T) {
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"),
resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand Down Expand Up @@ -1018,12 +1022,13 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"),
resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.0.consumer_group_id", "self-managed-test-group-id"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand All @@ -1038,6 +1043,53 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te
})
}

func TestAccLambdaEventSourceMapping_selfManagedKafkaWithProvisionedPollerConfig(t *testing.T) {
ctx := acctest.Context(t)
var v lambda.GetEventSourceMappingOutput
resourceName := "aws_lambda_event_source_mapping.test"
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.LambdaServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckEventSourceMappingDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "123", "null"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "123"),
resource.TestCheckResourceAttrSet(resourceName, "provisioned_poller_config.0.minimum_pollers"),
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "150", "15"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "150"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "15"),
),
},
{
Config: testAccEventSourceMappingConfig_selfManagedKafka(rName, "100", "test1:9092,test2:9092"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
),
},
},
})
}

func TestAccLambdaEventSourceMapping_activeMQ(t *testing.T) {
ctx := acctest.Context(t)
if testing.Short() {
Expand Down Expand Up @@ -2530,6 +2582,52 @@ resource "aws_lambda_event_source_mapping" "test" {
`, rName, batchSize, kafkaBootstrapServers))
}

func testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers string) string {
if batchSize == "" {
batchSize = "null"
}
if maxPollers == "" {
maxPollers = "null"
}
if minPollers == "" {
minPollers = "null"
}

return acctest.ConfigCompose(testAccEventSourceMappingConfig_kafkaBase(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
batch_size = %[2]s
enabled = false
function_name = aws_lambda_function.test.arn
topics = ["test"]
starting_position = "TRIM_HORIZON"

self_managed_event_source {
endpoints = {
KAFKA_BOOTSTRAP_SERVERS = %[3]q
}
}

dynamic "source_access_configuration" {
for_each = aws_subnet.test[*].id
content {
type = "VPC_SUBNET"
uri = "subnet:${source_access_configuration.value}"
}
}

source_access_configuration {
type = "VPC_SECURITY_GROUP"
uri = aws_security_group.test.id
}

provisioned_poller_config {
maximum_pollers = %[4]s
minimum_pollers = %[5]s
}
}
`, rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers))
}

func testAccEventSourceMappingConfig_dynamoDBBatchSize(rName, batchSize string) string {
if batchSize == "" {
batchSize = "null"
Expand Down
11 changes: 11 additions & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ resource "aws_lambda_event_source_mapping" "example" {
topics = ["Example"]
starting_position = "TRIM_HORIZON"

provisioned_poller_config {
maximum_poller = 80
minimum_poller = 10
}

self_managed_event_source {
endpoints = {
KAFKA_BOOTSTRAP_SERVERS = "kafka1.example.com:9092,kafka2.example.com:9092"
Expand Down Expand Up @@ -167,6 +172,7 @@ resource "aws_lambda_event_source_mapping" "example" {
* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000.
* `metrics_config`: - (Optional) CloudWatch metrics configuration of the event source. Only available for stream sources (DynamoDB and Kinesis) and SQS queues. Detailed below.
* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10.
* `provisioned_poller_config`: - (Optional) Event poller configuration for the event source. Only valid for Amazon MSK or self-managed Apache Kafka sources. Detailed below.
* `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. The list must contain exactly one queue name.
* `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below.
* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below.
Expand Down Expand Up @@ -208,6 +214,11 @@ resource "aws_lambda_event_source_mapping" "example" {

* `metrics` - (Required) A list containing the metrics to be produced by the event source mapping. Valid values: `EventCount`.

### provisioned_poller_config Configuration Block

* `maximum_pollers` - (Optional) The maximum number of event pollers this event source can scale up to. The range is between 1 and 2000.
* `minimum_pollers` - (Optional) The minimum number of event pollers this event source can scale down to. The range is between 1 and 200.

### scaling_config Configuration Block

* `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be greater than or equal to `2`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency). You need to raise a [Service Quota Ticket](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) to increase the concurrency beyond 1000.
Expand Down
Loading