diff --git a/google/resource_pubsub_subscription.go b/google/resource_pubsub_subscription.go index 6745ff64..51754024 100644 --- a/google/resource_pubsub_subscription.go +++ b/google/resource_pubsub_subscription.go @@ -66,6 +66,11 @@ func resourcePubsubSubscription() *schema.Resource { Optional: true, Elem: &schema.Schema{Type: schema.TypeString}, }, + "message_retention_duration": { + Type: schema.TypeString, + Optional: true, + Default: "604800s", + }, "push_config": { Type: schema.TypeList, Optional: true, @@ -84,6 +89,10 @@ func resourcePubsubSubscription() *schema.Resource { }, }, }, + "retain_acked_messages": { + Type: schema.TypeBool, + Optional: true, + }, "path": { Type: schema.TypeString, Computed: true, @@ -132,6 +141,18 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{}) } else if v, ok := d.GetOkExists("ack_deadline_seconds"); !isEmptyValue(reflect.ValueOf(ackDeadlineSecondsProp)) && (ok || !reflect.DeepEqual(v, ackDeadlineSecondsProp)) { obj["ackDeadlineSeconds"] = ackDeadlineSecondsProp } + messageRetentionDurationProp, err := expandPubsubSubscriptionMessageRetentionDuration(d.Get("message_retention_duration"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("message_retention_duration"); !isEmptyValue(reflect.ValueOf(messageRetentionDurationProp)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) { + obj["messageRetentionDuration"] = messageRetentionDurationProp + } + retainAckedMessagesProp, err := expandPubsubSubscriptionRetainAckedMessages(d.Get("retain_acked_messages"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("retain_acked_messages"); !isEmptyValue(reflect.ValueOf(retainAckedMessagesProp)) && (ok || !reflect.DeepEqual(v, retainAckedMessagesProp)) { + obj["retainAckedMessages"] = retainAckedMessagesProp + } url, err := replaceVars(d, config, "https://pubsub.googleapis.com/v1/projects/{{project}}/subscriptions/{{name}}") if err != nil { @@ -197,6 +218,12 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er if err := d.Set("ack_deadline_seconds", flattenPubsubSubscriptionAckDeadlineSeconds(res["ackDeadlineSeconds"], d)); err != nil { return fmt.Errorf("Error reading Subscription: %s", err) } + if err := d.Set("message_retention_duration", flattenPubsubSubscriptionMessageRetentionDuration(res["messageRetentionDuration"], d)); err != nil { + return fmt.Errorf("Error reading Subscription: %s", err) + } + if err := d.Set("retain_acked_messages", flattenPubsubSubscriptionRetainAckedMessages(res["retainAckedMessages"], d)); err != nil { + return fmt.Errorf("Error reading Subscription: %s", err) + } return nil } @@ -223,6 +250,18 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{}) } else if v, ok := d.GetOkExists("ack_deadline_seconds"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, ackDeadlineSecondsProp)) { obj["ackDeadlineSeconds"] = ackDeadlineSecondsProp } + messageRetentionDurationProp, err := expandPubsubSubscriptionMessageRetentionDuration(d.Get("message_retention_duration"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("message_retention_duration"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) { + obj["messageRetentionDuration"] = messageRetentionDurationProp + } + retainAckedMessagesProp, err := expandPubsubSubscriptionRetainAckedMessages(d.Get("retain_acked_messages"), d, config) + if err != nil { + return err + } else if v, ok := d.GetOkExists("retain_acked_messages"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, retainAckedMessagesProp)) { + obj["retainAckedMessages"] = retainAckedMessagesProp + } obj, err = resourcePubsubSubscriptionUpdateEncoder(d, meta, obj) if err != nil { @@ -248,6 +287,14 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{}) if d.HasChange("ack_deadline_seconds") { updateMask = append(updateMask, "ackDeadlineSeconds") } + + if d.HasChange("message_retention_duration") { + updateMask = append(updateMask, "messageRetentionDuration") + } + + if d.HasChange("retain_acked_messages") { + updateMask = append(updateMask, "retainAckedMessages") + } // updateMask is a URL parameter but not present in the schema, so replaceVars // won't set it url, err = addQueryParams(url, map[string]string{"updateMask": strings.Join(updateMask, ",")}) @@ -349,6 +396,14 @@ func flattenPubsubSubscriptionAckDeadlineSeconds(v interface{}, d *schema.Resour return v } +func flattenPubsubSubscriptionMessageRetentionDuration(v interface{}, d *schema.ResourceData) interface{} { + return v +} + +func flattenPubsubSubscriptionRetainAckedMessages(v interface{}, d *schema.ResourceData) interface{} { + return v +} + func expandPubsubSubscriptionName(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { project, err := getProject(d, config) if err != nil { @@ -448,6 +503,14 @@ func expandPubsubSubscriptionAckDeadlineSeconds(v interface{}, d TerraformResour return v, nil } +func expandPubsubSubscriptionMessageRetentionDuration(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + return v, nil +} + +func expandPubsubSubscriptionRetainAckedMessages(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) { + return v, nil +} + func resourcePubsubSubscriptionUpdateEncoder(d *schema.ResourceData, meta interface{}, obj map[string]interface{}) (map[string]interface{}, error) { newObj := make(map[string]interface{}) newObj["subscription"] = obj diff --git a/google/resource_pubsub_subscription_generated_test.go b/google/resource_pubsub_subscription_generated_test.go index 83e6ab34..88c858da 100644 --- a/google/resource_pubsub_subscription_generated_test.go +++ b/google/resource_pubsub_subscription_generated_test.go @@ -62,6 +62,10 @@ resource "google_pubsub_subscription" "example" { foo = "bar" } + # 20 minutes + message_retention_duration = "1200s" + retain_acked_messages = true + ack_deadline_seconds = 20 } `, context) diff --git a/website/docs/r/pubsub_subscription.html.markdown b/website/docs/r/pubsub_subscription.html.markdown index 82c64b59..bb8ed0df 100644 --- a/website/docs/r/pubsub_subscription.html.markdown +++ b/website/docs/r/pubsub_subscription.html.markdown @@ -80,6 +80,10 @@ resource "google_pubsub_subscription" "example" { foo = "bar" } + # 20 minutes + message_retention_duration = "1200s" + retain_acked_messages = true + ack_deadline_seconds = 20 } ``` @@ -143,6 +147,24 @@ The following arguments are supported: for the call to the push endpoint. If the subscriber never acknowledges the message, the Pub/Sub system will eventually redeliver the message. + +* `message_retention_duration` - + (Optional) + How long to retain unacknowledged messages in the subscription's + backlog, from the moment a message is published. If + retainAckedMessages is true, then this also configures the retention + of acknowledged messages, and thus configures how far back in time a + subscriptions.seek can be done. Defaults to 7 days. Cannot be more + than 7 days (`"604800s"`) or less than 10 minutes (`"600s"`). + A duration in seconds with up to nine fractional digits, terminated + by 's'. Example: `"600.5s"`. + +* `retain_acked_messages` - + (Optional) + Indicates whether to retain acknowledged messages. If `true`, then + messages are not expunged from the subscription's backlog, even if + they are acknowledged, until they fall out of the + messageRetentionDuration window. * `project` - (Optional) The ID of the project in which the resource belongs. If it is not provided, the provider project is used.