2018-01-10 22:38:15 +00:00
|
|
|
package google
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/hashicorp/terraform/helper/schema"
|
|
|
|
"github.com/hashicorp/terraform/helper/validation"
|
|
|
|
|
|
|
|
"google.golang.org/api/dataflow/v1b3"
|
|
|
|
"google.golang.org/api/googleapi"
|
|
|
|
)
|
|
|
|
|
|
|
|
var dataflowTerminalStatesMap = map[string]struct{}{
|
|
|
|
"JOB_STATE_DONE": {},
|
|
|
|
"JOB_STATE_FAILED": {},
|
|
|
|
"JOB_STATE_CANCELLED": {},
|
|
|
|
"JOB_STATE_UPDATED": {},
|
|
|
|
"JOB_STATE_DRAINING": {},
|
|
|
|
"JOB_STATE_DRAINED": {},
|
|
|
|
"JOB_STATE_CANCELLING": {},
|
|
|
|
}
|
|
|
|
|
|
|
|
func resourceDataflowJob() *schema.Resource {
|
|
|
|
return &schema.Resource{
|
|
|
|
Create: resourceDataflowJobCreate,
|
|
|
|
Read: resourceDataflowJobRead,
|
|
|
|
Delete: resourceDataflowJobDelete,
|
|
|
|
|
|
|
|
Schema: map[string]*schema.Schema{
|
|
|
|
"name": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Required: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"template_gcs_path": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Required: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"temp_gcs_location": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Required: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"zone": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Optional: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
2018-09-07 17:48:33 +00:00
|
|
|
"region": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Optional: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
"max_workers": &schema.Schema{
|
|
|
|
Type: schema.TypeInt,
|
|
|
|
Optional: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"parameters": {
|
|
|
|
Type: schema.TypeMap,
|
|
|
|
Optional: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"on_delete": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
|
|
|
|
Optional: true,
|
|
|
|
Default: "drain",
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"project": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Optional: true,
|
|
|
|
ForceNew: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
"state": &schema.Schema{
|
|
|
|
Type: schema.TypeString,
|
|
|
|
Computed: true,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
|
|
|
|
config := meta.(*Config)
|
|
|
|
|
|
|
|
project, err := getProject(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
zone, err := getZone(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-09-07 17:48:33 +00:00
|
|
|
|
|
|
|
region, err := getRegion(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
params := expandStringMap(d, "parameters")
|
|
|
|
|
|
|
|
env := dataflow.RuntimeEnvironment{
|
|
|
|
TempLocation: d.Get("temp_gcs_location").(string),
|
|
|
|
Zone: zone,
|
|
|
|
MaxWorkers: int64(d.Get("max_workers").(int)),
|
|
|
|
}
|
|
|
|
|
|
|
|
request := dataflow.CreateJobFromTemplateRequest{
|
|
|
|
JobName: d.Get("name").(string),
|
|
|
|
GcsPath: d.Get("template_gcs_path").(string),
|
|
|
|
Parameters: params,
|
|
|
|
Environment: &env,
|
|
|
|
}
|
|
|
|
|
2018-09-07 17:48:33 +00:00
|
|
|
job, err := createJob(config, project, region, &request)
|
2018-01-10 22:38:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
d.SetId(job.Id)
|
|
|
|
|
|
|
|
return resourceDataflowJobRead(d, meta)
|
|
|
|
}
|
|
|
|
|
|
|
|
func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
|
|
|
|
config := meta.(*Config)
|
|
|
|
|
|
|
|
project, err := getProject(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-09-07 17:48:33 +00:00
|
|
|
region, err := getRegion(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
id := d.Id()
|
|
|
|
|
2018-09-07 17:48:33 +00:00
|
|
|
job, err := getJob(config, project, region, id)
|
2018-01-10 22:38:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id))
|
|
|
|
}
|
|
|
|
|
2018-03-16 22:32:40 +00:00
|
|
|
d.Set("state", job.CurrentState)
|
|
|
|
d.Set("name", job.Name)
|
|
|
|
d.Set("project", project)
|
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
|
|
|
|
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
|
|
|
|
d.SetId("")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
d.SetId(job.Id)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
|
|
|
|
config := meta.(*Config)
|
|
|
|
|
|
|
|
project, err := getProject(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-09-07 17:48:33 +00:00
|
|
|
region, err := getRegion(d, config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
id := d.Id()
|
|
|
|
requestedState, err := mapOnDelete(d.Get("on_delete").(string))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-03-16 22:32:40 +00:00
|
|
|
for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; !ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] {
|
2018-01-10 22:38:15 +00:00
|
|
|
job := &dataflow.Job{
|
|
|
|
RequestedState: requestedState,
|
|
|
|
}
|
|
|
|
|
2018-09-07 17:48:33 +00:00
|
|
|
_, err = updateJob(config, project, region, id, job)
|
2018-03-16 22:32:40 +00:00
|
|
|
if err != nil {
|
|
|
|
if gerr, err_ok := err.(*googleapi.Error); !err_ok {
|
|
|
|
// If we have an error and it's not a google-specific error, we should go ahead and return.
|
|
|
|
return err
|
|
|
|
} else if err_ok && strings.Contains(gerr.Message, "not yet ready for canceling") {
|
|
|
|
// We'll sleep below to wait for the job to be ready to cancel.
|
|
|
|
} else {
|
|
|
|
return err
|
|
|
|
}
|
2018-01-10 22:38:15 +00:00
|
|
|
}
|
2018-03-16 22:32:40 +00:00
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
err = resourceDataflowJobRead(d, meta)
|
2018-03-16 22:32:40 +00:00
|
|
|
postReadState := d.Get("state").(string)
|
|
|
|
log.Printf("[DEBUG] Job state: '%s'.", postReadState)
|
|
|
|
if _, ok := dataflowTerminalStatesMap[postReadState]; !ok {
|
|
|
|
// If we're not yet in a terminal state, we need to sleep a few seconds so we don't
|
|
|
|
// exhaust our update quota with repeated attempts.
|
|
|
|
time.Sleep(5 * time.Second)
|
|
|
|
}
|
2018-01-10 22:38:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-03-16 22:32:40 +00:00
|
|
|
// Only remove the job from state if it's actually successfully canceled.
|
|
|
|
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
|
|
|
|
d.SetId("")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return fmt.Errorf("There was a problem canceling the dataflow job '%s' - the final state was %s.", d.Id(), d.Get("state").(string))
|
|
|
|
|
2018-01-10 22:38:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func mapOnDelete(policy string) (string, error) {
|
|
|
|
switch policy {
|
|
|
|
case "cancel":
|
|
|
|
return "JOB_STATE_CANCELLED", nil
|
|
|
|
case "drain":
|
|
|
|
return "JOB_STATE_DRAINING", nil
|
|
|
|
default:
|
|
|
|
return "", fmt.Errorf("Invalid `on_delete` policy: %s", policy)
|
|
|
|
}
|
|
|
|
}
|
2018-09-07 17:48:33 +00:00
|
|
|
|
|
|
|
func createJob(config *Config, project string, region string, request *dataflow.CreateJobFromTemplateRequest) (*dataflow.Job, error) {
|
|
|
|
if region == "" {
|
|
|
|
return config.clientDataflow.Projects.Templates.Create(project, request).Do()
|
|
|
|
}
|
|
|
|
return config.clientDataflow.Projects.Locations.Templates.Create(project, region, request).Do()
|
|
|
|
}
|
|
|
|
|
|
|
|
func getJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
|
|
|
|
if region == "" {
|
|
|
|
return config.clientDataflow.Projects.Jobs.Get(project, id).Do()
|
|
|
|
}
|
|
|
|
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do()
|
|
|
|
}
|
|
|
|
|
|
|
|
func updateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
|
|
|
|
if region == "" {
|
|
|
|
return config.clientDataflow.Projects.Jobs.Update(project, id, job).Do()
|
|
|
|
}
|
|
|
|
return config.clientDataflow.Projects.Locations.Jobs.Update(project, region, id, job).Do()
|
|
|
|
}
|