Properly delete dataflow jobs in the event of terraform destroy. (#1194)

This commit is contained in:
Nathan McKinley 2018-03-16 15:32:40 -07:00 committed by GitHub
parent 8ada4ffdb2
commit 70ec9e5341
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 14 deletions

View File

@ -139,14 +139,15 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id)) return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id))
} }
d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok { if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState) log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("") d.SetId("")
return nil return nil
} }
d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
d.SetId(job.Id) d.SetId(job.Id)
return nil return nil
@ -165,28 +166,44 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
if err != nil { if err != nil {
return err return err
} }
for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] { for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; !ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] {
job := &dataflow.Job{ job := &dataflow.Job{
RequestedState: requestedState, RequestedState: requestedState,
} }
_, err = config.clientDataflow.Projects.Jobs.Update(project, id, job).Do() _, err = config.clientDataflow.Projects.Jobs.Update(project, id, job).Do()
if gerr, ok := err.(*googleapi.Error); !ok { if err != nil {
// If we have an error and it's not a google-specific error, we should go ahead and return. if gerr, err_ok := err.(*googleapi.Error); !err_ok {
return err // If we have an error and it's not a google-specific error, we should go ahead and return.
} else if ok && strings.Contains(gerr.Message, "not yet ready for canceling") { return err
time.Sleep(5 * time.Second) } else if err_ok && strings.Contains(gerr.Message, "not yet ready for canceling") {
} else { // We'll sleep below to wait for the job to be ready to cancel.
return err } else {
return err
}
} }
err = resourceDataflowJobRead(d, meta) err = resourceDataflowJobRead(d, meta)
postReadState := d.Get("state").(string)
log.Printf("[DEBUG] Job state: '%s'.", postReadState)
if _, ok := dataflowTerminalStatesMap[postReadState]; !ok {
// If we're not yet in a terminal state, we need to sleep a few seconds so we don't
// exhaust our update quota with repeated attempts.
time.Sleep(5 * time.Second)
}
if err != nil { if err != nil {
return err return err
} }
} }
d.SetId("")
return nil // Only remove the job from state if it's actually successfully canceled.
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
d.SetId("")
return nil
}
return fmt.Errorf("There was a problem canceling the dataflow job '%s' - the final state was %s.", d.Id(), d.Get("state").(string))
} }
func mapOnDelete(policy string) (string, error) { func mapOnDelete(policy string) (string, error) {

View File

@ -36,7 +36,7 @@ func testAccCheckDataflowJobDestroy(s *terraform.State) error {
config := testAccProvider.Meta().(*Config) config := testAccProvider.Meta().(*Config)
job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do() job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if job != nil { if job != nil {
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok { if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
return fmt.Errorf("Job still present") return fmt.Errorf("Job still present")
} }
} else if err != nil { } else if err != nil {