[TF] Remove draining and cancelling from terminal dataflow job states (#3399)

Signed-off-by: Modular Magician <magic-modules@google.com>
This commit is contained in:
The Magician 2019-04-12 11:17:20 -07:00 committed by emily
parent f67094708f
commit 0f3f1a236a

View File

@ -9,18 +9,17 @@ import (
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/helper/validation"
"github.com/hashicorp/terraform/helper/resource"
"google.golang.org/api/dataflow/v1b3"
"google.golang.org/api/googleapi"
)
var dataflowTerminalStatesMap = map[string]struct{}{
"JOB_STATE_DONE": {},
"JOB_STATE_FAILED": {},
"JOB_STATE_CANCELLED": {},
"JOB_STATE_UPDATED": {},
"JOB_STATE_DRAINING": {},
"JOB_STATE_DRAINED": {},
"JOB_STATE_CANCELLING": {},
"JOB_STATE_DONE": {},
"JOB_STATE_FAILED": {},
"JOB_STATE_CANCELLED": {},
"JOB_STATE_UPDATED": {},
"JOB_STATE_DRAINED": {},
}
func resourceDataflowJob() *schema.Resource {
@ -83,6 +82,7 @@ func resourceDataflowJob() *schema.Resource {
"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
},
@ -134,7 +134,7 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
Environment: &env,
}
job, err := createJob(config, project, region, &request)
job, err := resourceDataflowJobCreateJob(config, project, region, &request)
if err != nil {
return err
}
@ -158,7 +158,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
id := d.Id()
job, err := getJob(config, project, region, id)
job, err := resourceDataflowJobGetJob(config, project, region, id)
if err != nil {
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id))
}
@ -191,51 +191,70 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
}
id := d.Id()
requestedState, err := mapOnDelete(d.Get("on_delete").(string))
requestedState, err := resourceDataflowJobMapRequestedState(d.Get("on_delete").(string))
if err != nil {
return err
}
for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; !ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] {
// Retry updating the state while the job is not ready to be canceled/drained.
err = resource.Retry(time.Minute*time.Duration(15), func() *resource.RetryError {
// To terminate a dataflow job, we update the job with a requested
// terminal state.
job := &dataflow.Job{
RequestedState: requestedState,
}
_, err = updateJob(config, project, region, id, job)
if err != nil {
if gerr, err_ok := err.(*googleapi.Error); !err_ok {
_, updateErr := resourceDataflowJobUpdateJob(config, project, region, id, job)
if updateErr != nil {
gerr, isGoogleErr := err.(*googleapi.Error)
if !isGoogleErr {
// If we have an error and it's not a google-specific error, we should go ahead and return.
return err
} else if err_ok && strings.Contains(gerr.Message, "not yet ready for canceling") {
// We'll sleep below to wait for the job to be ready to cancel.
} else {
return err
return resource.NonRetryableError(err)
}
if strings.Contains(gerr.Message, "not yet ready for canceling") {
// Retry cancelling job if it's not ready.
// Sleep to avoid hitting update quota with repeated attempts.
time.Sleep(5 * time.Second)
return resource.RetryableError(err)
}
if strings.Contains(gerr.Message, "Job has terminated") {
// Job has already been terminated, skip.
return nil
}
}
return nil
})
if err != nil {
return err
}
// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)
err = resourceDataflowJobRead(d, meta)
postReadState := d.Get("state").(string)
log.Printf("[DEBUG] Job state: '%s'.", postReadState)
if _, ok := dataflowTerminalStatesMap[postReadState]; !ok {
// If we're not yet in a terminal state, we need to sleep a few seconds so we don't
// exhaust our update quota with repeated attempts.
time.Sleep(5 * time.Second)
}
if err != nil {
return err
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
}
// Only remove the job from state if it's actually successfully canceled.
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
return fmt.Errorf("There was a problem canceling the dataflow job '%s' - the final state was %s.", d.Id(), d.Get("state").(string))
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}
func mapOnDelete(policy string) (string, error) {
func resourceDataflowJobMapRequestedState(policy string) (string, error) {
switch policy {
case "cancel":
return "JOB_STATE_CANCELLED", nil
@ -246,21 +265,21 @@ func mapOnDelete(policy string) (string, error) {
}
}
func createJob(config *Config, project string, region string, request *dataflow.CreateJobFromTemplateRequest) (*dataflow.Job, error) {
func resourceDataflowJobCreateJob(config *Config, project string, region string, request *dataflow.CreateJobFromTemplateRequest) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Templates.Create(project, request).Do()
}
return config.clientDataflow.Projects.Locations.Templates.Create(project, region, request).Do()
}
func getJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Jobs.Get(project, id).Do()
}
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do()
}
func updateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Jobs.Update(project, id, job).Do()
}