mirror of
https://github.com/letic/terraform-provider-google.git
synced 2024-10-07 03:01:06 +00:00
f7630cb5f0
The hadoop test passes for me locally but fails in CI, so this should give some extra debugging information (my user acct doesn't have permissions to read the dataproc logs from the failing test). WIP for now because I want to make sure this method of reading the logs works.
694 lines
22 KiB
Go
694 lines
22 KiB
Go
package google
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"strings"
|
|
"testing"
|
|
|
|
"regexp"
|
|
|
|
"github.com/hashicorp/errwrap"
|
|
"github.com/hashicorp/terraform/helper/acctest"
|
|
"github.com/hashicorp/terraform/helper/resource"
|
|
"github.com/hashicorp/terraform/terraform"
|
|
"google.golang.org/api/dataproc/v1"
|
|
"google.golang.org/api/googleapi"
|
|
)
|
|
|
|
type jobTestField struct {
|
|
tf_attr string
|
|
gcp_attr interface{}
|
|
}
|
|
|
|
func TestAccDataprocJob_failForMissingJobConfig(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_missingJobConf(),
|
|
ExpectError: regexp.MustCompile("You must define and configure exactly one xxx_config block"),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_updatable(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
jobId := fmt.Sprintf("dproc-update-job-id-%s", rnd)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_updatable(rnd, jobId, "false"),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.updatable", &job),
|
|
resource.TestCheckResourceAttr("google_dataproc_job.updatable", "force_delete", "false"),
|
|
),
|
|
},
|
|
{
|
|
Config: testAccDataprocJob_updatable(rnd, jobId, "true"),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.updatable", &job),
|
|
resource.TestCheckResourceAttr("google_dataproc_job.updatable", "force_delete", "true"),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_PySpark(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
jobId := fmt.Sprintf("dproc-custom-job-id-%s", rnd)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_pySpark(rnd),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
|
|
testAccCheckDataprocJobExists("google_dataproc_job.pyspark", &job),
|
|
|
|
// Custom supplied job_id
|
|
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "reference.0.job_id", jobId),
|
|
|
|
// Autogenerated / computed values
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.pyspark", "status.0.state"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.pyspark", "status.0.state_start_time"),
|
|
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "scheduling.0.max_failures_per_hour", "1"),
|
|
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "labels.one", "1"),
|
|
|
|
// Unique job config
|
|
testAccCheckDataprocJobAttrMatch(
|
|
"google_dataproc_job.pyspark", "pyspark_config", &job),
|
|
|
|
// Wait until job completes successfully
|
|
testAccCheckDataprocJobCompletesSuccessfully("google_dataproc_job.pyspark", &job),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_Spark(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_spark(rnd),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.spark", &job),
|
|
|
|
// Autogenerated / computed values
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.spark", "reference.0.job_id"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.spark", "status.0.state"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.spark", "status.0.state_start_time"),
|
|
|
|
// Unique job config
|
|
testAccCheckDataprocJobAttrMatch(
|
|
"google_dataproc_job.spark", "spark_config", &job),
|
|
|
|
// Wait until job completes successfully
|
|
testAccCheckDataprocJobCompletesSuccessfully("google_dataproc_job.spark", &job),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_Hadoop(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_hadoop(rnd),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.hadoop", &job),
|
|
|
|
// Autogenerated / computed values
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.hadoop", "reference.0.job_id"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.hadoop", "status.0.state"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.hadoop", "status.0.state_start_time"),
|
|
|
|
// Unique job config
|
|
testAccCheckDataprocJobAttrMatch(
|
|
"google_dataproc_job.hadoop", "hadoop_config", &job),
|
|
|
|
// Wait until job completes successfully
|
|
testAccCheckDataprocJobCompletesSuccessfully("google_dataproc_job.hadoop", &job),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_Hive(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_hive(rnd),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.hive", &job),
|
|
|
|
// Autogenerated / computed values
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.hive", "reference.0.job_id"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.hive", "status.0.state"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.hive", "status.0.state_start_time"),
|
|
|
|
// Unique job config
|
|
testAccCheckDataprocJobAttrMatch(
|
|
"google_dataproc_job.hive", "hive_config", &job),
|
|
|
|
// Wait until job completes successfully
|
|
testAccCheckDataprocJobCompletesSuccessfully("google_dataproc_job.hive", &job),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_Pig(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_pig(rnd),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.pig", &job),
|
|
|
|
// Autogenerated / computed values
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.pig", "reference.0.job_id"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.pig", "status.0.state"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.pig", "status.0.state_start_time"),
|
|
|
|
// Unique job config
|
|
testAccCheckDataprocJobAttrMatch(
|
|
"google_dataproc_job.pig", "pig_config", &job),
|
|
|
|
// Wait until job completes successfully
|
|
testAccCheckDataprocJobCompletesSuccessfully("google_dataproc_job.pig", &job),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func TestAccDataprocJob_SparkSql(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var job dataproc.Job
|
|
rnd := acctest.RandString(10)
|
|
resource.Test(t, resource.TestCase{
|
|
PreCheck: func() { testAccPreCheck(t) },
|
|
Providers: testAccProviders,
|
|
CheckDestroy: testAccCheckDataprocJobDestroy,
|
|
Steps: []resource.TestStep{
|
|
{
|
|
Config: testAccDataprocJob_sparksql(rnd),
|
|
Check: resource.ComposeTestCheckFunc(
|
|
testAccCheckDataprocJobExists("google_dataproc_job.sparksql", &job),
|
|
|
|
// Autogenerated / computed values
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.sparksql", "reference.0.job_id"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.sparksql", "status.0.state"),
|
|
resource.TestCheckResourceAttrSet("google_dataproc_job.sparksql", "status.0.state_start_time"),
|
|
|
|
// Unique job config
|
|
testAccCheckDataprocJobAttrMatch(
|
|
"google_dataproc_job.sparksql", "sparksql_config", &job),
|
|
|
|
// Wait until job completes successfully
|
|
testAccCheckDataprocJobCompletesSuccessfully("google_dataproc_job.sparksql", &job),
|
|
),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func testAccCheckDataprocJobDestroy(s *terraform.State) error {
|
|
config := testAccProvider.Meta().(*Config)
|
|
|
|
for _, rs := range s.RootModule().Resources {
|
|
if rs.Type != "google_dataproc_job" {
|
|
continue
|
|
}
|
|
|
|
if rs.Primary.ID == "" {
|
|
return fmt.Errorf("Unable to verify delete of dataproc job ID is empty")
|
|
}
|
|
attributes := rs.Primary.Attributes
|
|
|
|
project, err := getTestProject(rs.Primary, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = config.clientDataproc.Projects.Regions.Jobs.Get(
|
|
project, attributes["region"], rs.Primary.ID).Do()
|
|
if err != nil {
|
|
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 404 {
|
|
return nil
|
|
} else if ok {
|
|
return fmt.Errorf("Error making GCP platform call: http code error : %d, http message error: %s", gerr.Code, gerr.Message)
|
|
}
|
|
return fmt.Errorf("Error making GCP platform call: %s", err.Error())
|
|
}
|
|
return fmt.Errorf("Dataproc job still exists")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func testAccCheckDataprocJobCompletesSuccessfully(n string, job *dataproc.Job) resource.TestCheckFunc {
|
|
return func(s *terraform.State) error {
|
|
config := testAccProvider.Meta().(*Config)
|
|
|
|
attributes := s.RootModule().Resources[n].Primary.Attributes
|
|
region := attributes["region"]
|
|
project, err := getTestProject(s.RootModule().Resources[n].Primary, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
jobCompleteTimeoutMins := 5
|
|
waitErr := dataprocJobOperationWait(config, region, project, job.Reference.JobId,
|
|
"Awaiting Dataproc job completion", jobCompleteTimeoutMins, 1)
|
|
if waitErr != nil {
|
|
return waitErr
|
|
}
|
|
|
|
completeJob, err := config.clientDataproc.Projects.Regions.Jobs.Get(
|
|
project, region, job.Reference.JobId).Do()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if completeJob.Status.State == "ERROR" {
|
|
if !strings.HasPrefix(completeJob.DriverOutputResourceUri, "gs://") {
|
|
return fmt.Errorf("Job completed in ERROR state but no valid log URI found")
|
|
}
|
|
u := strings.SplitN(strings.TrimPrefix(completeJob.DriverOutputResourceUri, "gs://"), "/", 2)
|
|
if len(u) != 2 {
|
|
return fmt.Errorf("Job completed in ERROR state but no valid log URI found")
|
|
}
|
|
l, err := config.clientStorage.Objects.List(u[0]).Prefix(u[1]).Do()
|
|
if err != nil {
|
|
return errwrap.Wrapf("Job completed in ERROR state, found error when trying to list logs: {{err}}", err)
|
|
}
|
|
for _, item := range l.Items {
|
|
resp, err := config.clientStorage.Objects.Get(item.Bucket, item.Name).Download()
|
|
if err != nil {
|
|
return errwrap.Wrapf("Job completed in ERROR state, found error when trying to read logs: {{err}}", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return errwrap.Wrapf("Job completed in ERROR state, found error when trying to read logs: {{err}}", err)
|
|
}
|
|
log.Printf("[ERROR] Job failed, driver logs:\n%s", body)
|
|
}
|
|
return fmt.Errorf("Job completed in ERROR state, check logs for details")
|
|
} else if completeJob.Status.State != "DONE" {
|
|
return fmt.Errorf("Job did not complete successfully, instead status: %s", completeJob.Status.State)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func testAccCheckDataprocJobExists(n string, job *dataproc.Job) resource.TestCheckFunc {
|
|
return func(s *terraform.State) error {
|
|
rs, ok := s.RootModule().Resources[n]
|
|
if !ok {
|
|
return fmt.Errorf("Terraform resource Not found: %s", n)
|
|
}
|
|
|
|
if rs.Primary.ID == "" {
|
|
return fmt.Errorf("No ID is set for Dataproc job")
|
|
}
|
|
|
|
config := testAccProvider.Meta().(*Config)
|
|
jobId := s.RootModule().Resources[n].Primary.ID
|
|
project, err := getTestProject(s.RootModule().Resources[n].Primary, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
found, err := config.clientDataproc.Projects.Regions.Jobs.Get(
|
|
project, rs.Primary.Attributes["region"], jobId).Do()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
*job = *found
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func testAccCheckDataprocJobAttrMatch(n, jobType string, job *dataproc.Job) resource.TestCheckFunc {
|
|
return func(s *terraform.State) error {
|
|
attributes, err := getResourceAttributes(n, s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
jobTests := []jobTestField{}
|
|
if jobType == "pyspark_config" {
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.main_python_file_uri", job.PysparkJob.MainPythonFileUri})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.args", job.PysparkJob.Args})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.python_file_uris", job.PysparkJob.PythonFileUris})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.jar_file_uris", job.PysparkJob.JarFileUris})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.file_uris", job.PysparkJob.FileUris})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.archive_uris", job.PysparkJob.ArchiveUris})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.properties", job.PysparkJob.Properties})
|
|
jobTests = append(jobTests, jobTestField{"pyspark_config.0.logging_config.0.driver_log_levels", job.PysparkJob.LoggingConfig.DriverLogLevels})
|
|
}
|
|
if jobType == "spark_config" {
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.main_class", job.SparkJob.MainClass})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.main_jar_file_uri", job.SparkJob.MainJarFileUri})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.args", job.SparkJob.Args})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.jar_file_uris", job.SparkJob.JarFileUris})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.file_uris", job.SparkJob.FileUris})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.archive_uris", job.SparkJob.ArchiveUris})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.properties", job.SparkJob.Properties})
|
|
jobTests = append(jobTests, jobTestField{"spark_config.0.logging_config.0.driver_log_levels", job.SparkJob.LoggingConfig.DriverLogLevels})
|
|
}
|
|
if jobType == "hadoop_config" {
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.main_class", job.HadoopJob.MainClass})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.main_jar_file_uri", job.HadoopJob.MainJarFileUri})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.args", job.HadoopJob.Args})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.jar_file_uris", job.HadoopJob.JarFileUris})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.file_uris", job.HadoopJob.FileUris})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.archive_uris", job.HadoopJob.ArchiveUris})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.properties", job.HadoopJob.Properties})
|
|
jobTests = append(jobTests, jobTestField{"hadoop_config.0.logging_config.0.driver_log_levels", job.HadoopJob.LoggingConfig.DriverLogLevels})
|
|
}
|
|
if jobType == "hive_config" {
|
|
queries := []string{}
|
|
if job.HiveJob.QueryList != nil {
|
|
queries = job.HiveJob.QueryList.Queries
|
|
}
|
|
jobTests = append(jobTests, jobTestField{"hive_config.0.query_list", queries})
|
|
jobTests = append(jobTests, jobTestField{"hive_config.0.query_file_uri", job.HiveJob.QueryFileUri})
|
|
jobTests = append(jobTests, jobTestField{"hive_config.0.continue_on_failure", job.HiveJob.ContinueOnFailure})
|
|
jobTests = append(jobTests, jobTestField{"hive_config.0.script_variables", job.HiveJob.ScriptVariables})
|
|
jobTests = append(jobTests, jobTestField{"hive_config.0.properties", job.HiveJob.Properties})
|
|
jobTests = append(jobTests, jobTestField{"hive_config.0.jar_file_uris", job.HiveJob.JarFileUris})
|
|
}
|
|
if jobType == "pig_config" {
|
|
queries := []string{}
|
|
if job.PigJob.QueryList != nil {
|
|
queries = job.PigJob.QueryList.Queries
|
|
}
|
|
jobTests = append(jobTests, jobTestField{"pig_config.0.query_list", queries})
|
|
jobTests = append(jobTests, jobTestField{"pig_config.0.query_file_uri", job.PigJob.QueryFileUri})
|
|
jobTests = append(jobTests, jobTestField{"pig_config.0.continue_on_failure", job.PigJob.ContinueOnFailure})
|
|
jobTests = append(jobTests, jobTestField{"pig_config.0.script_variables", job.PigJob.ScriptVariables})
|
|
jobTests = append(jobTests, jobTestField{"pig_config.0.properties", job.PigJob.Properties})
|
|
jobTests = append(jobTests, jobTestField{"pig_config.0.jar_file_uris", job.PigJob.JarFileUris})
|
|
}
|
|
if jobType == "sparksql_config" {
|
|
queries := []string{}
|
|
if job.SparkSqlJob.QueryList != nil {
|
|
queries = job.SparkSqlJob.QueryList.Queries
|
|
}
|
|
jobTests = append(jobTests, jobTestField{"sparksql_config.0.query_list", queries})
|
|
jobTests = append(jobTests, jobTestField{"sparksql_config.0.query_file_uri", job.SparkSqlJob.QueryFileUri})
|
|
jobTests = append(jobTests, jobTestField{"sparksql_config.0.script_variables", job.SparkSqlJob.ScriptVariables})
|
|
jobTests = append(jobTests, jobTestField{"sparksql_config.0.properties", job.SparkSqlJob.Properties})
|
|
jobTests = append(jobTests, jobTestField{"sparksql_config.0.jar_file_uris", job.SparkSqlJob.JarFileUris})
|
|
}
|
|
|
|
for _, attrs := range jobTests {
|
|
if c := checkMatch(attributes, attrs.tf_attr, attrs.gcp_attr); c != "" {
|
|
return fmt.Errorf(c)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func testAccDataprocJob_missingJobConf() string {
|
|
return `
|
|
resource "google_dataproc_job" "missing_config" {
|
|
placement {
|
|
cluster_name = "na"
|
|
}
|
|
|
|
force_delete = true
|
|
}`
|
|
}
|
|
|
|
var singleNodeClusterConfig = `
|
|
resource "google_dataproc_cluster" "basic" {
|
|
name = "dproc-job-test-%s"
|
|
region = "us-central1"
|
|
|
|
cluster_config {
|
|
# Keep the costs down with smallest config we can get away with
|
|
software_config {
|
|
override_properties = {
|
|
"dataproc:dataproc.allow.zero.workers" = "true"
|
|
}
|
|
}
|
|
|
|
master_config {
|
|
num_instances = 1
|
|
machine_type = "n1-standard-1"
|
|
disk_config {
|
|
boot_disk_size_gb = 10
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`
|
|
|
|
func testAccDataprocJob_updatable(rnd, jobId, del string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "updatable" {
|
|
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
reference {
|
|
job_id = "%s"
|
|
}
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = %s
|
|
|
|
pyspark_config {
|
|
main_python_file_uri = "gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py"
|
|
}
|
|
}
|
|
`, rnd, jobId, del)
|
|
}
|
|
|
|
func testAccDataprocJob_pySpark(rnd string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "pyspark" {
|
|
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
reference {
|
|
job_id = "dproc-custom-job-id-%s"
|
|
}
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = true
|
|
|
|
pyspark_config {
|
|
main_python_file_uri = "gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py"
|
|
properties = {
|
|
"spark.logConf" = "true"
|
|
}
|
|
logging_config {
|
|
driver_log_levels {
|
|
"root" = "INFO"
|
|
}
|
|
}
|
|
}
|
|
|
|
scheduling {
|
|
max_failures_per_hour = 1
|
|
}
|
|
|
|
labels {
|
|
one = "1"
|
|
}
|
|
}
|
|
`, rnd, rnd)
|
|
}
|
|
|
|
func testAccDataprocJob_spark(rnd string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "spark" {
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = true
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
|
|
spark_config {
|
|
main_class = "org.apache.spark.examples.SparkPi"
|
|
jar_file_uris = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
|
|
args = ["1000"]
|
|
properties = {
|
|
"spark.logConf" = "true"
|
|
}
|
|
}
|
|
}
|
|
`, rnd)
|
|
|
|
}
|
|
|
|
func testAccDataprocJob_hadoop(rnd string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "hadoop" {
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = true
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
|
|
hadoop_config {
|
|
main_jar_file_uri = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
|
|
args = [
|
|
"wordcount",
|
|
"file:///usr/lib/spark/NOTICE",
|
|
"gs://${google_dataproc_cluster.basic.cluster_config.0.bucket}/hadoopjob_output"
|
|
]
|
|
}
|
|
|
|
}
|
|
`, rnd)
|
|
|
|
}
|
|
|
|
func testAccDataprocJob_hive(rnd string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "hive" {
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = true
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
|
|
hive_config {
|
|
query_list = [
|
|
"DROP TABLE IF EXISTS dprocjob_test",
|
|
"CREATE EXTERNAL TABLE dprocjob_test(bar int) LOCATION 'gs://${google_dataproc_cluster.basic.cluster_config.0.bucket}/hive_dprocjob_test/'",
|
|
"SELECT * FROM dprocjob_test WHERE bar > 2",
|
|
]
|
|
}
|
|
|
|
}
|
|
`, rnd)
|
|
|
|
}
|
|
|
|
func testAccDataprocJob_pig(rnd string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "pig" {
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = true
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
|
|
pig_config {
|
|
query_list = [
|
|
"LNS = LOAD 'file:///usr/lib/pig/LICENSE.txt ' AS (line)",
|
|
"WORDS = FOREACH LNS GENERATE FLATTEN(TOKENIZE(line)) AS word",
|
|
"GROUPS = GROUP WORDS BY word",
|
|
"WORD_COUNTS = FOREACH GROUPS GENERATE group, COUNT(WORDS)",
|
|
"DUMP WORD_COUNTS"
|
|
]
|
|
}
|
|
}
|
|
`, rnd)
|
|
|
|
}
|
|
|
|
func testAccDataprocJob_sparksql(rnd string) string {
|
|
return fmt.Sprintf(
|
|
singleNodeClusterConfig+`
|
|
|
|
resource "google_dataproc_job" "sparksql" {
|
|
|
|
region = "${google_dataproc_cluster.basic.region}"
|
|
force_delete = true
|
|
placement {
|
|
cluster_name = "${google_dataproc_cluster.basic.name}"
|
|
}
|
|
|
|
sparksql_config {
|
|
query_list = [
|
|
"DROP TABLE IF EXISTS dprocjob_test",
|
|
"CREATE TABLE dprocjob_test(bar int)",
|
|
"SELECT * FROM dprocjob_test WHERE bar > 2",
|
|
]
|
|
}
|
|
}
|
|
`, rnd)
|
|
|
|
}
|