Add retention duraction, retain acked message to pubsub (#3193)

Signed-off-by: Modular Magician <magic-modules@google.com>
This commit is contained in:
The Magician 2019-03-06 13:52:21 -08:00 committed by Riley Karson
parent 92dff97bbc
commit 1f5deb662b
3 changed files with 89 additions and 0 deletions

View File

@ -66,6 +66,11 @@ func resourcePubsubSubscription() *schema.Resource {
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"message_retention_duration": {
Type: schema.TypeString,
Optional: true,
Default: "604800s",
},
"push_config": {
Type: schema.TypeList,
Optional: true,
@ -84,6 +89,10 @@ func resourcePubsubSubscription() *schema.Resource {
},
},
},
"retain_acked_messages": {
Type: schema.TypeBool,
Optional: true,
},
"path": {
Type: schema.TypeString,
Computed: true,
@ -132,6 +141,18 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("ack_deadline_seconds"); !isEmptyValue(reflect.ValueOf(ackDeadlineSecondsProp)) && (ok || !reflect.DeepEqual(v, ackDeadlineSecondsProp)) {
obj["ackDeadlineSeconds"] = ackDeadlineSecondsProp
}
messageRetentionDurationProp, err := expandPubsubSubscriptionMessageRetentionDuration(d.Get("message_retention_duration"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("message_retention_duration"); !isEmptyValue(reflect.ValueOf(messageRetentionDurationProp)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) {
obj["messageRetentionDuration"] = messageRetentionDurationProp
}
retainAckedMessagesProp, err := expandPubsubSubscriptionRetainAckedMessages(d.Get("retain_acked_messages"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("retain_acked_messages"); !isEmptyValue(reflect.ValueOf(retainAckedMessagesProp)) && (ok || !reflect.DeepEqual(v, retainAckedMessagesProp)) {
obj["retainAckedMessages"] = retainAckedMessagesProp
}
url, err := replaceVars(d, config, "https://pubsub.googleapis.com/v1/projects/{{project}}/subscriptions/{{name}}")
if err != nil {
@ -197,6 +218,12 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er
if err := d.Set("ack_deadline_seconds", flattenPubsubSubscriptionAckDeadlineSeconds(res["ackDeadlineSeconds"], d)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("message_retention_duration", flattenPubsubSubscriptionMessageRetentionDuration(res["messageRetentionDuration"], d)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("retain_acked_messages", flattenPubsubSubscriptionRetainAckedMessages(res["retainAckedMessages"], d)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
return nil
}
@ -223,6 +250,18 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("ack_deadline_seconds"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, ackDeadlineSecondsProp)) {
obj["ackDeadlineSeconds"] = ackDeadlineSecondsProp
}
messageRetentionDurationProp, err := expandPubsubSubscriptionMessageRetentionDuration(d.Get("message_retention_duration"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("message_retention_duration"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) {
obj["messageRetentionDuration"] = messageRetentionDurationProp
}
retainAckedMessagesProp, err := expandPubsubSubscriptionRetainAckedMessages(d.Get("retain_acked_messages"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("retain_acked_messages"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, retainAckedMessagesProp)) {
obj["retainAckedMessages"] = retainAckedMessagesProp
}
obj, err = resourcePubsubSubscriptionUpdateEncoder(d, meta, obj)
if err != nil {
@ -248,6 +287,14 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
if d.HasChange("ack_deadline_seconds") {
updateMask = append(updateMask, "ackDeadlineSeconds")
}
if d.HasChange("message_retention_duration") {
updateMask = append(updateMask, "messageRetentionDuration")
}
if d.HasChange("retain_acked_messages") {
updateMask = append(updateMask, "retainAckedMessages")
}
// updateMask is a URL parameter but not present in the schema, so replaceVars
// won't set it
url, err = addQueryParams(url, map[string]string{"updateMask": strings.Join(updateMask, ",")})
@ -349,6 +396,14 @@ func flattenPubsubSubscriptionAckDeadlineSeconds(v interface{}, d *schema.Resour
return v
}
func flattenPubsubSubscriptionMessageRetentionDuration(v interface{}, d *schema.ResourceData) interface{} {
return v
}
func flattenPubsubSubscriptionRetainAckedMessages(v interface{}, d *schema.ResourceData) interface{} {
return v
}
func expandPubsubSubscriptionName(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
project, err := getProject(d, config)
if err != nil {
@ -448,6 +503,14 @@ func expandPubsubSubscriptionAckDeadlineSeconds(v interface{}, d TerraformResour
return v, nil
}
func expandPubsubSubscriptionMessageRetentionDuration(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}
func expandPubsubSubscriptionRetainAckedMessages(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}
func resourcePubsubSubscriptionUpdateEncoder(d *schema.ResourceData, meta interface{}, obj map[string]interface{}) (map[string]interface{}, error) {
newObj := make(map[string]interface{})
newObj["subscription"] = obj

View File

@ -62,6 +62,10 @@ resource "google_pubsub_subscription" "example" {
foo = "bar"
}
# 20 minutes
message_retention_duration = "1200s"
retain_acked_messages = true
ack_deadline_seconds = 20
}
`, context)

View File

@ -80,6 +80,10 @@ resource "google_pubsub_subscription" "example" {
foo = "bar"
}
# 20 minutes
message_retention_duration = "1200s"
retain_acked_messages = true
ack_deadline_seconds = 20
}
```
@ -143,6 +147,24 @@ The following arguments are supported:
for the call to the push endpoint.
If the subscriber never acknowledges the message, the Pub/Sub system
will eventually redeliver the message.
* `message_retention_duration` -
(Optional)
How long to retain unacknowledged messages in the subscription's
backlog, from the moment a message is published. If
retainAckedMessages is true, then this also configures the retention
of acknowledged messages, and thus configures how far back in time a
subscriptions.seek can be done. Defaults to 7 days. Cannot be more
than 7 days (`"604800s"`) or less than 10 minutes (`"600s"`).
A duration in seconds with up to nine fractional digits, terminated
by 's'. Example: `"600.5s"`.
* `retain_acked_messages` -
(Optional)
Indicates whether to retain acknowledged messages. If `true`, then
messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the
messageRetentionDuration window.
* `project` - (Optional) The ID of the project in which the resource belongs.
If it is not provided, the provider project is used.