New Resource: Dataflow Job (#855)

Add support for Google Dataflow jobs

Note: A dataflow job exists when it is in a nonterminal state, and does not exist if it
is in a terminal state (or a non-running state which can only transition into terminal
states).  See doc for more detail.
This commit is contained in:
Nathan McKinley 2018-01-10 14:38:15 -08:00 committed by GitHub
parent 0f80d7443f
commit f50d4a19b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 17281 additions and 0 deletions

View File

@ -25,6 +25,7 @@ import (
computeBeta "google.golang.org/api/compute/v0.beta"
"google.golang.org/api/compute/v1"
"google.golang.org/api/container/v1"
"google.golang.org/api/dataflow/v1b3"
"google.golang.org/api/dataproc/v1"
"google.golang.org/api/dns/v1"
"google.golang.org/api/iam/v1"
@ -51,6 +52,7 @@ type Config struct {
clientComputeBeta *computeBeta.Service
clientContainer *container.Service
clientDataproc *dataproc.Service
clientDataflow *dataflow.Service
clientDns *dns.Service
clientKms *cloudkms.Service
clientLogging *cloudlogging.Service
@ -197,6 +199,13 @@ func (c *Config) loadAndValidate() error {
}
c.clientPubsub.UserAgent = userAgent
log.Printf("[INFO] Instantiating Google Dataflow Client...")
c.clientDataflow, err = dataflow.New(client)
if err != nil {
return err
}
c.clientDataflow.UserAgent = userAgent
log.Printf("[INFO] Instantiating Google Cloud ResourceManager Client...")
c.clientResourceManager, err = cloudresourcemanager.New(client)
if err != nil {

View File

@ -131,6 +131,7 @@ func Provider() terraform.ResourceProvider {
"google_compute_vpn_tunnel": resourceComputeVpnTunnel(),
"google_container_cluster": resourceContainerCluster(),
"google_container_node_pool": resourceContainerNodePool(),
"google_dataflow_job": resourceDataflowJob(),
"google_dataproc_cluster": resourceDataprocCluster(),
"google_dataproc_job": resourceDataprocJob(),
"google_dns_managed_zone": resourceDnsManagedZone(),

View File

@ -0,0 +1,201 @@
package google
import (
"fmt"
"log"
"strings"
"time"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/helper/validation"
"google.golang.org/api/dataflow/v1b3"
"google.golang.org/api/googleapi"
)
var dataflowTerminalStatesMap = map[string]struct{}{
"JOB_STATE_DONE": {},
"JOB_STATE_FAILED": {},
"JOB_STATE_CANCELLED": {},
"JOB_STATE_UPDATED": {},
"JOB_STATE_DRAINING": {},
"JOB_STATE_DRAINED": {},
"JOB_STATE_CANCELLING": {},
}
func resourceDataflowJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowJobCreate,
Read: resourceDataflowJobRead,
Delete: resourceDataflowJobDelete,
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"template_gcs_path": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"temp_gcs_location": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"zone": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"max_workers": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},
"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},
"on_delete": &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
ForceNew: true,
},
"project": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"state": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},
},
}
}
func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
project, err := getProject(d, config)
if err != nil {
return err
}
zone, err := getZone(d, config)
if err != nil {
return err
}
params := expandStringMap(d, "parameters")
env := dataflow.RuntimeEnvironment{
TempLocation: d.Get("temp_gcs_location").(string),
Zone: zone,
MaxWorkers: int64(d.Get("max_workers").(int)),
}
request := dataflow.CreateJobFromTemplateRequest{
JobName: d.Get("name").(string),
GcsPath: d.Get("template_gcs_path").(string),
Parameters: params,
Environment: &env,
}
job, err := config.clientDataflow.Projects.Templates.Create(project, &request).Do()
if err != nil {
return err
}
d.SetId(job.Id)
return resourceDataflowJobRead(d, meta)
}
func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
project, err := getProject(d, config)
if err != nil {
return err
}
id := d.Id()
job, err := config.clientDataflow.Projects.Jobs.Get(project, id).Do()
if err != nil {
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id))
}
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
return nil
}
d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
d.SetId(job.Id)
return nil
}
func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
project, err := getProject(d, config)
if err != nil {
return err
}
id := d.Id()
requestedState, err := mapOnDelete(d.Get("on_delete").(string))
if err != nil {
return err
}
for _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok; _, ok = dataflowTerminalStatesMap[d.Get("state").(string)] {
job := &dataflow.Job{
RequestedState: requestedState,
}
_, err = config.clientDataflow.Projects.Jobs.Update(project, id, job).Do()
if gerr, ok := err.(*googleapi.Error); !ok {
// If we have an error and it's not a google-specific error, we should go ahead and return.
return err
} else if ok && strings.Contains(gerr.Message, "not yet ready for canceling") {
time.Sleep(5 * time.Second)
} else {
return err
}
err = resourceDataflowJobRead(d, meta)
if err != nil {
return err
}
}
d.SetId("")
return nil
}
func mapOnDelete(policy string) (string, error) {
switch policy {
case "cancel":
return "JOB_STATE_CANCELLED", nil
case "drain":
return "JOB_STATE_DRAINING", nil
default:
return "", fmt.Errorf("Invalid `on_delete` policy: %s", policy)
}
}

View File

@ -0,0 +1,91 @@
package google
import (
"fmt"
"testing"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccDataflowJobCreate(t *testing.T) {
t.Parallel()
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccDataflowJob,
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(
"google_dataflow_job.big_data"),
),
},
},
})
}
func testAccCheckDataflowJobDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "google_dataflow_job" {
continue
}
config := testAccProvider.Meta().(*Config)
job, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if job != nil {
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
return fmt.Errorf("Job still present")
}
} else if err != nil {
return err
}
}
return nil
}
func testAccDataflowJobExists(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}
config := testAccProvider.Meta().(*Config)
_, err := config.clientDataflow.Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if err != nil {
return fmt.Errorf("Job does not exist")
}
return nil
}
}
var testAccDataflowJob = fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "dfjob-test-%s-temp"
force_destroy = true
}
resource "google_dataflow_job" "big_data" {
name = "dfjob-test-%s"
template_gcs_path = "gs://dataflow-templates/wordcount/template_file"
temp_gcs_location = "${google_storage_bucket.temp.url}"
parameters {
inputFile = "gs://dataflow-samples/shakespeare/kinglear.txt"
output = "${google_storage_bucket.temp.url}/output"
}
zone = "us-central1-f"
project = "%s"
on_delete = "cancel"
}`, acctest.RandString(10), acctest.RandString(10), getTestProjectFromEnv())

File diff suppressed because it is too large Load Diff

11799
vendor/google.golang.org/api/dataflow/v1b3/dataflow-gen.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

6
vendor/vendor.json vendored
View File

@ -1277,6 +1277,12 @@
"revision": "7afc123cf726cd2f253faa3e144d2ab65477b18f",
"revisionTime": "2017-10-21T00:03:56Z"
},
{
"checksumSHA1": "pxXDGWhDrfcAOCQCjgxLfZA4NOw=",
"path": "google.golang.org/api/dataflow/v1b3",
"revision": "ea28708ab465bb536c27f29a34011f3d5b5bb052",
"revisionTime": "2017-12-13T00:03:45Z"
},
{
"checksumSHA1": "4spHGEYR8VWzwlBGD+xSwi8ZRkw=",
"path": "google.golang.org/api/dataproc/v1",

View File

@ -0,0 +1,55 @@
---
layout: "google"
page_title: "Google: google_dataflow_job"
sidebar_current: "docs-google-dataflow-job"
description: |-
Creates a job in Dataflow according to a provided config file.
---
# google\_dataflow\_job
Creates a job on Dataflow, which is an implementation of Apache Beam running on Google Compute Engine. For more information see
the official documentation for
[Beam](https://beam.apache.org) and [Dataflow](https://cloud.google.com/dataflow/).
## Example Usage
```hcl
resource "google_dataflow_job" "big_data_job" {
name = "dataflow-job"
template_gcs_path = "gs://my-bucket/templates/template_file"
temp_gcs_location = "gs://my-bucket/tmp_dir"
parameters {
foo = "bar"
baz = "qux"
}
}
```
## Note on "destroy" / "apply"
There are many types of Dataflow jobs. Some Dataflow jobs run constantly, getting new data from (e.g.) a GCS bucket, and outputting data continuously. Some jobs process a set amount of data then terminate. All jobs can fail while running due to programming errors or other issues. In this way, Dataflow jobs are different from most other Terraform / Google resources.
The Dataflow resource is considered 'existing' while it is in a nonterminal state. If it reaches a terminal state (e.g. 'FAILED', 'COMPLETE', 'CANCELLED'), it will be recreated on the next 'apply'. This is as expected for jobs which run continously, but may surprise users who use this resource for other kinds of Dataflow jobs.
A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "cancelled", but if a user sets `on_delete` to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.
## Argument Reference
The following arguments are supported:
* `name` - (Required) A unique name for the resource, required by Dataflow.
* `template_gcs_path` - (Required) The GCS path to the Dataflow job template.
* `temp_gcs_location` - (Required) A writeable location on GCS for the Dataflow job to dump its temporary data.
- - -
* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as used in the template).
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
* `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used.
## Attributes Reference
* `state` - The current state of the resource, selected from the [JobState enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState)

View File

@ -341,6 +341,16 @@
</ul>
</li>
<li<%= sidebar_current("docs-google-dataflow") %>>
<a href="#">Google Dataflow Resources</a>
<ul class="nav nav-visible">
<li<%= sidebar_current("docs-google-dataflow-job") %>>
<a href="/docs/providers/google/r/dataflow_job.html">google_dataflow_job</a>
</li>
</ul>
</li>
<li<%= sidebar_current("docs-google-dataproc") %>>
<a href="#">Google Dataproc Resources</a>
<ul class="nav nav-visible">