From 0f3f1a236aceb61306ec832f783da7a0cf7439ac Mon Sep 17 00:00:00 2001 From: The Magician Date: Fri, 12 Apr 2019 11:17:20 -0700 Subject: [PATCH] [TF] Remove draining and cancelling from terminal dataflow job states (#3399) Signed-off-by: Modular Magician --- google/resource_dataflow_job.go | 87 ++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 34 deletions(-) diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index ba13a4be..c7b2639d 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -9,18 +9,17 @@ import ( "github.com/hashicorp/terraform/helper/schema" "github.com/hashicorp/terraform/helper/validation" + "github.com/hashicorp/terraform/helper/resource" "google.golang.org/api/dataflow/v1b3" "google.golang.org/api/googleapi" ) var dataflowTerminalStatesMap = map[string]struct{}{ - "JOB_STATE_DONE": {}, - "JOB_STATE_FAILED": {}, - "JOB_STATE_CANCELLED": {}, - "JOB_STATE_UPDATED": {}, - "JOB_STATE_DRAINING": {}, - "JOB_STATE_DRAINED": {}, - "JOB_STATE_CANCELLING": {}, + "JOB_STATE_DONE": {}, + "JOB_STATE_FAILED": {}, + "JOB_STATE_CANCELLED": {}, + "JOB_STATE_UPDATED": {}, + "JOB_STATE_DRAINED": {}, } func resourceDataflowJob() *schema.Resource { @@ -83,6 +82,7 @@ func resourceDataflowJob() *schema.Resource { "project": { Type: schema.TypeString, Optional: true, + Computed: true, ForceNew: true, }, @@ -134,7 +134,7 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { Environment: &env, } - job, err := createJob(config, project, region, &request) + job, err := resourceDataflowJobCreateJob(config, project, region, &request) if err != nil { return err } @@ -158,7 +158,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error { id := d.Id() - job, err := getJob(config, project, region, id) + job, err := resourceDataflowJobGetJob(config, project, region, id) if err != nil { return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id)) } @@ -191,51 +191,70 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error { } id := d.Id() - requestedState, err := mapOnDelete(d.Get("on_delete").(string)) + + requestedState, err := resourceDataflowJobMapRequestedState(d.Get("on_delete").(string)) if err != nil { return err } - for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; !ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] { + + // Retry updating the state while the job is not ready to be canceled/drained. + err = resource.Retry(time.Minute*time.Duration(15), func() *resource.RetryError { + // To terminate a dataflow job, we update the job with a requested + // terminal state. job := &dataflow.Job{ RequestedState: requestedState, } - _, err = updateJob(config, project, region, id, job) - if err != nil { - if gerr, err_ok := err.(*googleapi.Error); !err_ok { + _, updateErr := resourceDataflowJobUpdateJob(config, project, region, id, job) + if updateErr != nil { + gerr, isGoogleErr := err.(*googleapi.Error) + if !isGoogleErr { // If we have an error and it's not a google-specific error, we should go ahead and return. - return err - } else if err_ok && strings.Contains(gerr.Message, "not yet ready for canceling") { - // We'll sleep below to wait for the job to be ready to cancel. - } else { - return err + return resource.NonRetryableError(err) + } + + if strings.Contains(gerr.Message, "not yet ready for canceling") { + // Retry cancelling job if it's not ready. + // Sleep to avoid hitting update quota with repeated attempts. + time.Sleep(5 * time.Second) + return resource.RetryableError(err) + } + + if strings.Contains(gerr.Message, "Job has terminated") { + // Job has already been terminated, skip. + return nil } } + return nil + }) + if err != nil { + return err + } + + // Wait for state to reach terminal state (canceled/drained/done) + _, ok := dataflowTerminalStatesMap[d.Get("state").(string)] + for !ok { + log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string)) + time.Sleep(5 * time.Second) + err = resourceDataflowJobRead(d, meta) - postReadState := d.Get("state").(string) - log.Printf("[DEBUG] Job state: '%s'.", postReadState) - if _, ok := dataflowTerminalStatesMap[postReadState]; !ok { - // If we're not yet in a terminal state, we need to sleep a few seconds so we don't - // exhaust our update quota with repeated attempts. - time.Sleep(5 * time.Second) - } if err != nil { - return err + return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err) } + _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] } // Only remove the job from state if it's actually successfully canceled. if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok { + log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string)) d.SetId("") return nil } - - return fmt.Errorf("There was a problem canceling the dataflow job '%s' - the final state was %s.", d.Id(), d.Get("state").(string)) - + return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string)) } -func mapOnDelete(policy string) (string, error) { +func resourceDataflowJobMapRequestedState(policy string) (string, error) { switch policy { case "cancel": return "JOB_STATE_CANCELLED", nil @@ -246,21 +265,21 @@ func mapOnDelete(policy string) (string, error) { } } -func createJob(config *Config, project string, region string, request *dataflow.CreateJobFromTemplateRequest) (*dataflow.Job, error) { +func resourceDataflowJobCreateJob(config *Config, project string, region string, request *dataflow.CreateJobFromTemplateRequest) (*dataflow.Job, error) { if region == "" { return config.clientDataflow.Projects.Templates.Create(project, request).Do() } return config.clientDataflow.Projects.Locations.Templates.Create(project, region, request).Do() } -func getJob(config *Config, project string, region string, id string) (*dataflow.Job, error) { +func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) { if region == "" { return config.clientDataflow.Projects.Jobs.Get(project, id).Do() } return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do() } -func updateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) { +func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) { if region == "" { return config.clientDataflow.Projects.Jobs.Update(project, id, job).Do() }