From 017f2f9b54fc1cd1dab59fb90495b48bd62b582e Mon Sep 17 00:00:00 2001 From: emily Date: Tue, 11 Sep 2018 13:13:11 -0700 Subject: [PATCH] Add Cloud Composer Environments resource (#2001) Adds Cloud Composer Environment resource and test sweeper --- google/composer_operation.go | 71 ++ google/provider.go | 2 +- google/provider_composer_gen.go | 19 - google/resource_composer_environment.go | 884 ++++++++++++++++++ google/resource_composer_environment_test.go | 491 ++++++++++ .../docs/r/composer_environment.html.markdown | 316 +++++++ 6 files changed, 1763 insertions(+), 20 deletions(-) create mode 100644 google/composer_operation.go delete mode 100644 google/provider_composer_gen.go create mode 100644 google/resource_composer_environment.go create mode 100644 google/resource_composer_environment_test.go create mode 100644 website/docs/r/composer_environment.html.markdown diff --git a/google/composer_operation.go b/google/composer_operation.go new file mode 100644 index 00000000..7d838c5e --- /dev/null +++ b/google/composer_operation.go @@ -0,0 +1,71 @@ +package google + +import ( + "fmt" + "log" + "time" + + "github.com/hashicorp/terraform/helper/resource" + composer "google.golang.org/api/composer/v1" +) + +type ComposerOperationWaiter struct { + Service *composer.ProjectsLocationsService + Op *composer.Operation +} + +func (w *ComposerOperationWaiter) RefreshFunc() resource.StateRefreshFunc { + return func() (interface{}, string, error) { + op, err := w.Service.Operations.Get(w.Op.Name).Do() + + if err != nil { + return nil, "", err + } + + log.Printf("[DEBUG] Got %v while polling for operation %s's 'done' status", op.Done, w.Op.Name) + + return op, fmt.Sprint(op.Done), nil + } +} + +func (w *ComposerOperationWaiter) Conf() *resource.StateChangeConf { + return &resource.StateChangeConf{ + Pending: []string{"false"}, + Target: []string{"true"}, + Refresh: w.RefreshFunc(), + } +} + +func composerOperationWait(service *composer.Service, op *composer.Operation, project, activity string) error { + return composerOperationWaitTime(service, op, project, activity, 10) +} + +func composerOperationWaitTime(service *composer.Service, op *composer.Operation, project, activity string, timeoutMin int) error { + if op.Done { + if op.Error != nil { + return fmt.Errorf("Error code %v, message: %s", op.Error.Code, op.Error.Message) + } + return nil + } + + w := &ComposerOperationWaiter{ + Service: service.Projects.Locations, + Op: op, + } + + state := w.Conf() + state.Delay = 10 * time.Second + state.Timeout = time.Duration(timeoutMin) * time.Minute + state.MinTimeout = 2 * time.Second + opRaw, err := state.WaitForState() + if err != nil { + return fmt.Errorf("Error waiting for %s: %s", activity, err) + } + + op = opRaw.(*composer.Operation) + if op.Error != nil { + return fmt.Errorf("Error code %v, message: %s", op.Error.Code, op.Error.Message) + } + + return nil +} diff --git a/google/provider.go b/google/provider.go index 64524c8f..7fe1d64b 100644 --- a/google/provider.go +++ b/google/provider.go @@ -103,7 +103,6 @@ func Provider() terraform.ResourceProvider { GeneratedComputeResourcesMap, GeneratedContainerAnalysisResourcesMap, GeneratedRedisResourcesMap, - GeneratedComposerResourcesMap, GeneratedResourceManagerResourcesMap, map[string]*schema.Resource{ "google_bigquery_dataset": resourceBigQueryDataset(), @@ -113,6 +112,7 @@ func Provider() terraform.ResourceProvider { "google_cloudbuild_trigger": resourceCloudBuildTrigger(), "google_cloudfunctions_function": resourceCloudFunctionsFunction(), "google_cloudiot_registry": resourceCloudIoTRegistry(), + "google_composer_environment": resourceComposerEnvironment(), "google_compute_autoscaler": resourceComputeAutoscaler(), "google_compute_address": resourceComputeAddress(), "google_compute_attached_disk": resourceComputeAttachedDisk(), diff --git a/google/provider_composer_gen.go b/google/provider_composer_gen.go deleted file mode 100644 index a53d579e..00000000 --- a/google/provider_composer_gen.go +++ /dev/null @@ -1,19 +0,0 @@ -// ---------------------------------------------------------------------------- -// -// *** AUTO GENERATED CODE *** AUTO GENERATED CODE *** -// -// ---------------------------------------------------------------------------- -// -// This file is automatically generated by Magic Modules and manual -// changes will be clobbered when the file is regenerated. -// -// Please read more about how to change this file in -// .github/CONTRIBUTING.md. -// -// ---------------------------------------------------------------------------- - -package google - -import "github.com/hashicorp/terraform/helper/schema" - -var GeneratedComposerResourcesMap = map[string]*schema.Resource{} diff --git a/google/resource_composer_environment.go b/google/resource_composer_environment.go new file mode 100644 index 00000000..dc220fbe --- /dev/null +++ b/google/resource_composer_environment.go @@ -0,0 +1,884 @@ +package google + +import ( + "fmt" + "log" + "regexp" + "strings" + "time" + + "github.com/hashicorp/terraform/helper/schema" + "github.com/hashicorp/terraform/helper/validation" + "google.golang.org/api/composer/v1" +) + +const ( + composerEnvironmentEnvVariablesRegexp = "[a-zA-Z_][a-zA-Z0-9_]*." + composerEnvironmentReservedAirflowEnvVarRegexp = "AIRFLOW__[A-Z0-9_]+__[A-Z0-9_]+" +) + +var composerEnvironmentReservedEnvVar = map[string]struct{}{ + "AIRFLOW_HOME": {}, + "C_FORCE_ROOT": {}, + "CONTAINER_NAME": {}, + "DAGS_FOLDER": {}, + "GCP_PROJECT": {}, + "GCS_BUCKET": {}, + "GKE_CLUSTER_NAME": {}, + "SQL_DATABASE": {}, + "SQL_INSTANCE": {}, + "SQL_PASSWORD": {}, + "SQL_PROJECT": {}, + "SQL_REGION": {}, + "SQL_USER": {}, +} + +func resourceComposerEnvironment() *schema.Resource { + return &schema.Resource{ + Create: resourceComposerEnvironmentCreate, + Read: resourceComposerEnvironmentRead, + Update: resourceComposerEnvironmentUpdate, + Delete: resourceComposerEnvironmentDelete, + + Importer: &schema.ResourceImporter{ + State: resourceComposerEnvironmentImport, + }, + + Timeouts: &schema.ResourceTimeout{ + // Composer takes <= 1 hr for create/update. + Create: schema.DefaultTimeout(3600 * time.Second), + Update: schema.DefaultTimeout(3600 * time.Second), + Delete: schema.DefaultTimeout(360 * time.Second), + }, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validateGCPName, + }, + "region": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "project": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ForceNew: true, + }, + "config": { + Type: schema.TypeList, + Optional: true, + Computed: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "node_count": { + Type: schema.TypeInt, + Computed: true, + Optional: true, + ValidateFunc: validation.IntAtLeast(3), + }, + "node_config": { + Type: schema.TypeList, + Computed: true, + Optional: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "zone": { + Type: schema.TypeString, + Computed: true, + Optional: true, + ForceNew: true, + DiffSuppressFunc: compareSelfLinkOrResourceName, + }, + "machine_type": { + Type: schema.TypeString, + Computed: true, + Optional: true, + ForceNew: true, + DiffSuppressFunc: compareSelfLinkOrResourceName, + }, + "network": { + Type: schema.TypeString, + Computed: true, + Optional: true, + ForceNew: true, + DiffSuppressFunc: compareSelfLinkOrResourceName, + }, + "subnetwork": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + DiffSuppressFunc: compareSelfLinkOrResourceName, + }, + "disk_size_gb": { + Type: schema.TypeInt, + Computed: true, + Optional: true, + ForceNew: true, + }, + "oauth_scopes": { + Type: schema.TypeSet, + Computed: true, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + Set: schema.HashString, + }, + "service_account": { + Type: schema.TypeString, + Computed: true, + Optional: true, + ForceNew: true, + ValidateFunc: validateServiceAccountRelativeNameOrEmail, + DiffSuppressFunc: compareServiceAccountEmailToLink, + }, + "tags": { + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + Set: schema.HashString, + }, + }, + }, + }, + "software_config": { + Type: schema.TypeList, + Optional: true, + Computed: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "airflow_config_overrides": { + Type: schema.TypeMap, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "pypi_packages": { + Type: schema.TypeMap, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + ValidateFunc: validateComposerEnvironmentPypiPackages, + }, + "env_variables": { + Type: schema.TypeMap, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + ValidateFunc: validateComposerEnvironmentEnvVariables, + }, + "image_version": { + Type: schema.TypeString, + Computed: true, + }, + }, + }, + }, + "airflow_uri": { + Type: schema.TypeString, + Computed: true, + }, + "dag_gcs_prefix": { + Type: schema.TypeString, + Computed: true, + }, + "gke_cluster": { + Type: schema.TypeString, + Computed: true, + }, + }, + }, + }, + "labels": { + Type: schema.TypeMap, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + } +} + +func resourceComposerEnvironmentCreate(d *schema.ResourceData, meta interface{}) error { + config := meta.(*Config) + + envName, err := resourceComposerEnvironmentName(d, config) + if err != nil { + return err + } + + transformedConfig, err := expandComposerEnvironmentConfig(d.Get("config"), d, config) + if err != nil { + return err + } + + env := &composer.Environment{ + Name: envName.resourceName(), + Labels: expandLabels(d), + Config: transformedConfig, + } + + // Some fields cannot be specified during create and must be updated post-creation. + updateOnlyEnv := getComposerEnvironmentPostCreateUpdateObj(env) + + log.Printf("[DEBUG] Creating new Environment %q", envName.parentName()) + op, err := config.clientComposer.Projects.Locations.Environments.Create(envName.parentName(), env).Do() + if err != nil { + return err + } + + // Store the ID now + id, err := replaceVars(d, config, "{{project}}/{{region}}/{{name}}") + if err != nil { + return fmt.Errorf("Error constructing id: %s", err) + } + d.SetId(id) + + waitErr := composerOperationWaitTime( + config.clientComposer, op, envName.Project, "Creating Environment", + int(d.Timeout(schema.TimeoutCreate).Minutes())) + + if waitErr != nil { + // The resource didn't actually get created, remove from state. + d.SetId("") + + errMsg := fmt.Sprintf("Error waiting to create Environment: %s", waitErr) + if err := handleComposerEnvironmentCreationOpFailure(id, envName, d, config); err != nil { + return fmt.Errorf("Error waiting to create Environment: %s. An initial "+ + "environment was or is still being created, and clean up failed with "+ + "error: %s.", errMsg, err) + } + + return fmt.Errorf("Error waiting to create Environment: %s", waitErr) + } + + log.Printf("[DEBUG] Finished creating Environment %q: %#v", d.Id(), op) + + if err := resourceComposerEnvironmentPostCreateUpdate(updateOnlyEnv, d, config); err != nil { + return err + } + + return resourceComposerEnvironmentRead(d, meta) +} + +func resourceComposerEnvironmentRead(d *schema.ResourceData, meta interface{}) error { + config := meta.(*Config) + + envName, err := resourceComposerEnvironmentName(d, config) + if err != nil { + return err + } + + res, err := config.clientComposer.Projects.Locations.Environments.Get(envName.resourceName()).Do() + if err != nil { + return handleNotFoundError(err, d, fmt.Sprintf("ComposerEnvironment %q", d.Id())) + } + + // Set from getProject(d) + if err := d.Set("project", envName.Project); err != nil { + return fmt.Errorf("Error reading Environment: %s", err) + } + // Set from getRegion(d) + if err := d.Set("region", envName.Region); err != nil { + return fmt.Errorf("Error reading Environment: %s", err) + } + if err := d.Set("name", GetResourceNameFromSelfLink(res.Name)); err != nil { + return fmt.Errorf("Error reading Environment: %s", err) + } + if err := d.Set("config", flattenComposerEnvironmentConfig(res.Config)); err != nil { + return fmt.Errorf("Error reading Environment: %s", err) + } + if err := d.Set("labels", res.Labels); err != nil { + return fmt.Errorf("Error reading Environment: %s", err) + } + return nil +} + +func resourceComposerEnvironmentUpdate(d *schema.ResourceData, meta interface{}) error { + tfConfig := meta.(*Config) + + d.Partial(true) + + // Composer only allows PATCHing one field at a time, so for each updatable field, we + // 1. determine if it needs to be updated + // 2. construct a PATCH object with only that field populated + // 3. call resourceComposerEnvironmentPatchField(...)to update that single field. + if d.HasChange("config") { + config, err := expandComposerEnvironmentConfig(d.Get("config"), d, tfConfig) + if err != nil { + return err + } + + if d.HasChange("config.0.software_config.0.airflow_config_overrides") { + + patchObj := &composer.Environment{ + Config: &composer.EnvironmentConfig{ + SoftwareConfig: &composer.SoftwareConfig{ + AirflowConfigOverrides: make(map[string]string), + }, + }, + } + + if config != nil && config.SoftwareConfig != nil && len(config.SoftwareConfig.AirflowConfigOverrides) > 0 { + patchObj.Config.SoftwareConfig.AirflowConfigOverrides = config.SoftwareConfig.AirflowConfigOverrides + } + + err = resourceComposerEnvironmentPatchField("config.softwareConfig.airflowConfigOverrides", patchObj, d, tfConfig) + if err != nil { + return err + } + d.SetPartial("config") + } + + if d.HasChange("config.0.software_config.0.env_variables") { + patchObj := &composer.Environment{ + Config: &composer.EnvironmentConfig{ + SoftwareConfig: &composer.SoftwareConfig{ + EnvVariables: make(map[string]string), + }, + }, + } + if config != nil && config.SoftwareConfig != nil && len(config.SoftwareConfig.EnvVariables) > 0 { + patchObj.Config.SoftwareConfig.EnvVariables = config.SoftwareConfig.EnvVariables + } + + err = resourceComposerEnvironmentPatchField("config.softwareConfig.envVariables", patchObj, d, tfConfig) + if err != nil { + return err + } + d.SetPartial("config") + } + + if d.HasChange("config.0.software_config.0.pypi_packages") { + patchObj := &composer.Environment{ + Config: &composer.EnvironmentConfig{ + SoftwareConfig: &composer.SoftwareConfig{ + PypiPackages: make(map[string]string), + }, + }, + } + if config != nil && config.SoftwareConfig != nil && config.SoftwareConfig.PypiPackages != nil { + patchObj.Config.SoftwareConfig.PypiPackages = config.SoftwareConfig.PypiPackages + } + + err = resourceComposerEnvironmentPatchField("config.softwareConfig.pypiPackages", patchObj, d, tfConfig) + if err != nil { + return err + } + d.SetPartial("config") + } + + if d.HasChange("config.0.node_count") { + patchObj := &composer.Environment{Config: &composer.EnvironmentConfig{}} + if config != nil { + patchObj.Config.NodeCount = config.NodeCount + } + err = resourceComposerEnvironmentPatchField("config.nodeCount", patchObj, d, tfConfig) + if err != nil { + return err + } + d.SetPartial("config") + } + } + + if d.HasChange("labels") { + patchEnv := &composer.Environment{Labels: expandLabels(d)} + err := resourceComposerEnvironmentPatchField("labels", patchEnv, d, tfConfig) + if err != nil { + return err + } + d.SetPartial("labels") + } + + d.Partial(false) + return resourceComposerEnvironmentRead(d, tfConfig) +} + +func resourceComposerEnvironmentPostCreateUpdate(updateEnv *composer.Environment, d *schema.ResourceData, cfg *Config) error { + if updateEnv == nil { + return nil + } + + d.Partial(true) + + if updateEnv.Config != nil && updateEnv.Config.SoftwareConfig != nil && len(updateEnv.Config.SoftwareConfig.PypiPackages) > 0 { + log.Printf("[DEBUG] Running post-create update for Environment %q", d.Id()) + err := resourceComposerEnvironmentPatchField("config.softwareConfig.pypiPackages", updateEnv, d, cfg) + if err != nil { + return err + } + + log.Printf("[DEBUG] Finish update to Environment %q post create for update only fields", d.Id()) + d.SetPartial("config") + } + d.Partial(false) + return resourceComposerEnvironmentRead(d, cfg) +} + +func resourceComposerEnvironmentPatchField(updateMask string, env *composer.Environment, d *schema.ResourceData, config *Config) error { + envJson, _ := env.MarshalJSON() + log.Printf("[DEBUG] Updating Environment %q (updateMask = %q): %s", d.Id(), updateMask, string(envJson)) + envName, err := resourceComposerEnvironmentName(d, config) + if err != nil { + return err + } + + op, err := config.clientComposer.Projects.Locations.Environments. + Patch(envName.resourceName(), env). + UpdateMask(updateMask).Do() + if err != nil { + return err + } + + waitErr := composerOperationWaitTime( + config.clientComposer, op, envName.Project, "Updating newly created Environment", + int(d.Timeout(schema.TimeoutCreate).Minutes())) + if waitErr != nil { + // The resource didn't actually update. + return fmt.Errorf("Error waiting to update Environment: %s", waitErr) + } + + log.Printf("[DEBUG] Finished updating Environment %q (updateMask = %q)", d.Id(), updateMask) + return nil +} + +func resourceComposerEnvironmentDelete(d *schema.ResourceData, meta interface{}) error { + config := meta.(*Config) + + envName, err := resourceComposerEnvironmentName(d, config) + if err != nil { + return err + } + + log.Printf("[DEBUG] Deleting Environment %q", d.Id()) + op, err := config.clientComposer.Projects.Locations.Environments.Delete(envName.resourceName()).Do() + if err != nil { + return err + } + + err = composerOperationWaitTime( + config.clientComposer, op, envName.Project, "Deleting Environment", + int(d.Timeout(schema.TimeoutDelete).Minutes())) + if err != nil { + return err + } + + log.Printf("[DEBUG] Finished deleting Environment %q: %#v", d.Id(), op) + return nil +} + +func resourceComposerEnvironmentImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { + config := meta.(*Config) + parseImportId([]string{"projects/(?P[^/]+)/locations/(?P[^/]+)/environments/(?P[^/]+)", "(?P[^/]+)/(?P[^/]+)/(?P[^/]+)", "(?P[^/]+)"}, d, config) + + // Replace import id for the resource id + id, err := replaceVars(d, config, "{{project}}/{{region}}/{{name}}") + if err != nil { + return nil, fmt.Errorf("Error constructing id: %s", err) + } + d.SetId(id) + + return []*schema.ResourceData{d}, nil +} + +func flattenComposerEnvironmentConfig(envCfg *composer.EnvironmentConfig) interface{} { + if envCfg == nil { + return nil + } + transformed := make(map[string]interface{}) + transformed["gke_cluster"] = envCfg.GkeCluster + transformed["dag_gcs_prefix"] = envCfg.DagGcsPrefix + transformed["node_count"] = envCfg.NodeCount + transformed["airflow_uri"] = envCfg.AirflowUri + transformed["node_config"] = flattenComposerEnvironmentConfigNodeConfig(envCfg.NodeConfig) + transformed["software_config"] = flattenComposerEnvironmentConfigSoftwareConfig(envCfg.SoftwareConfig) + + return []interface{}{transformed} +} + +func flattenComposerEnvironmentConfigNodeConfig(nodeCfg *composer.NodeConfig) interface{} { + if nodeCfg == nil { + return nil + } + transformed := make(map[string]interface{}) + transformed["zone"] = nodeCfg.Location + transformed["machine_type"] = nodeCfg.MachineType + transformed["network"] = nodeCfg.Network + transformed["subnetwork"] = nodeCfg.Subnetwork + transformed["disk_size_gb"] = nodeCfg.DiskSizeGb + transformed["service_account"] = nodeCfg.ServiceAccount + transformed["oauth_scopes"] = flattenComposerEnvironmentConfigNodeConfigOauthScopes(nodeCfg.OauthScopes) + transformed["tags"] = flattenComposerEnvironmentConfigNodeConfigTags(nodeCfg.Tags) + return []interface{}{transformed} +} + +func flattenComposerEnvironmentConfigNodeConfigOauthScopes(v interface{}) interface{} { + if v == nil { + return v + } + return schema.NewSet(schema.HashString, convertStringArrToInterface(v.([]string))) +} + +func flattenComposerEnvironmentConfigNodeConfigTags(v interface{}) interface{} { + if v == nil { + return v + } + return schema.NewSet(schema.HashString, convertStringArrToInterface(v.([]string))) +} + +func flattenComposerEnvironmentConfigSoftwareConfig(softwareCfg *composer.SoftwareConfig) interface{} { + if softwareCfg == nil { + return nil + } + transformed := make(map[string]interface{}) + transformed["image_version"] = softwareCfg.ImageVersion + transformed["airflow_config_overrides"] = softwareCfg.AirflowConfigOverrides + transformed["pypi_packages"] = softwareCfg.PypiPackages + transformed["env_variables"] = softwareCfg.EnvVariables + return []interface{}{transformed} +} + +func expandComposerEnvironmentConfig(v interface{}, d *schema.ResourceData, config *Config) (*composer.EnvironmentConfig, error) { + l := v.([]interface{}) + if len(l) == 0 { + return nil, nil + } + original := l[0].(map[string]interface{}) + transformed := &composer.EnvironmentConfig{} + + if nodeCountRaw, ok := original["node_count"]; ok { + transformedNodeCount, err := expandComposerEnvironmentConfigNodeCount(nodeCountRaw, d, config) + if err != nil { + return nil, err + } + transformed.NodeCount = transformedNodeCount + } + + transformedNodeConfig, err := expandComposerEnvironmentConfigNodeConfig(original["node_config"], d, config) + if err != nil { + return nil, err + } + transformed.NodeConfig = transformedNodeConfig + + transformedSoftwareConfig, err := expandComposerEnvironmentConfigSoftwareConfig(original["software_config"], d, config) + if err != nil { + return nil, err + } + transformed.SoftwareConfig = transformedSoftwareConfig + return transformed, nil +} + +func expandComposerEnvironmentConfigNodeCount(v interface{}, d *schema.ResourceData, config *Config) (int64, error) { + if v == nil { + return 0, nil + } + return int64(v.(int)), nil +} + +func expandComposerEnvironmentConfigNodeConfig(v interface{}, d *schema.ResourceData, config *Config) (*composer.NodeConfig, error) { + l := v.([]interface{}) + if len(l) == 0 { + return nil, nil + } + raw := l[0] + original := raw.(map[string]interface{}) + transformed := &composer.NodeConfig{} + + if transformedDiskSizeGb, ok := original["disk_size_gb"]; ok { + transformed.DiskSizeGb = int64(transformedDiskSizeGb.(int)) + } + + if v, ok := original["service_account"]; ok { + transformedServiceAccount, err := expandComposerEnvironmentServiceAccount(v, d, config) + if err != nil { + return nil, err + } + transformed.ServiceAccount = transformedServiceAccount + } + + var nodeConfigZone string + if v, ok := original["zone"]; ok { + transformedZone, err := expandComposerEnvironmentZone(v, d, config) + if err != nil { + return nil, err + } + transformed.Location = transformedZone + nodeConfigZone = transformedZone + } + + if v, ok := original["machine_type"]; ok { + transformedMachineType, err := expandComposerEnvironmentMachineType(v, d, config, nodeConfigZone) + if err != nil { + return nil, err + } + transformed.MachineType = transformedMachineType + } + + if v, ok := original["network"]; ok { + transformedNetwork, err := expandComposerEnvironmentNetwork(v, d, config) + if err != nil { + return nil, err + } + transformed.Network = transformedNetwork + } + + if v, ok := original["subnetwork"]; ok { + transformedSubnetwork, err := expandComposerEnvironmentSubnetwork(v, d, config) + if err != nil { + return nil, err + } + transformed.Subnetwork = transformedSubnetwork + } + + transformedOauthScopes, err := expandComposerEnvironmentSetList(original["oauth_scopes"], d, config) + if err != nil { + return nil, err + } + transformed.OauthScopes = transformedOauthScopes + + transformedTags, err := expandComposerEnvironmentSetList(original["tags"], d, config) + if err != nil { + return nil, err + } + transformed.Tags = transformedTags + return transformed, nil +} + +func expandComposerEnvironmentServiceAccount(v interface{}, d *schema.ResourceData, config *Config) (string, error) { + serviceAccount := v.(string) + if len(serviceAccount) == 0 { + return "", nil + } + + return GetResourceNameFromSelfLink(serviceAccount), nil +} + +func expandComposerEnvironmentZone(v interface{}, d *schema.ResourceData, config *Config) (string, error) { + zone := v.(string) + if len(zone) == 0 { + return zone, nil + } + if !strings.Contains(zone, "/") { + project, err := getProject(d, config) + if err != nil { + return "", err + } + return fmt.Sprintf("projects/%s/zones/%s", project, zone), nil + } + + return getRelativePath(zone) +} + +func expandComposerEnvironmentMachineType(v interface{}, d *schema.ResourceData, config *Config, nodeCfgZone interface{}) (string, error) { + fv, err := ParseMachineTypesFieldValue(v.(string), d, config) + if err != nil { + return "", nil + } + return fv.RelativeLink(), nil +} + +func expandComposerEnvironmentNetwork(v interface{}, d *schema.ResourceData, config *Config) (string, error) { + fv, err := ParseNetworkFieldValue(v.(string), d, config) + if err != nil { + return "", err + } + return fv.RelativeLink(), nil +} + +func expandComposerEnvironmentSubnetwork(v interface{}, d *schema.ResourceData, config *Config) (string, error) { + fv, err := ParseSubnetworkFieldValue(v.(string), d, config) + if err != nil { + return "", err + } + return fv.RelativeLink(), nil +} + +func expandComposerEnvironmentSetList(v interface{}, d *schema.ResourceData, config *Config) ([]string, error) { + if v == nil { + return nil, nil + } + return convertStringArr(v.(*schema.Set).List()), nil +} + +func expandComposerEnvironmentConfigSoftwareConfig(v interface{}, d *schema.ResourceData, config *Config) (*composer.SoftwareConfig, error) { + l := v.([]interface{}) + if len(l) == 0 { + return nil, nil + } + raw := l[0] + original := raw.(map[string]interface{}) + transformed := &composer.SoftwareConfig{} + + transformed.ImageVersion = original["image_version"].(string) + transformed.AirflowConfigOverrides = expandComposerEnvironmentConfigSoftwareConfigStringMap(original, "airflow_config_overrides") + transformed.PypiPackages = expandComposerEnvironmentConfigSoftwareConfigStringMap(original, "pypi_packages") + transformed.EnvVariables = expandComposerEnvironmentConfigSoftwareConfigStringMap(original, "env_variables") + return transformed, nil +} + +func expandComposerEnvironmentConfigSoftwareConfigStringMap(softwareConfig map[string]interface{}, k string) map[string]string { + v, ok := softwareConfig[k] + if ok && v != nil { + return convertStringMap(v.(map[string]interface{})) + } + return map[string]string{} +} + +func validateComposerEnvironmentPypiPackages(v interface{}, k string) (ws []string, errors []error) { + if v == nil { + return ws, errors + } + for pkgName := range v.(map[string]interface{}) { + if pkgName != strings.ToLower(pkgName) { + errors = append(errors, + fmt.Errorf("PYPI package %q can only contain lowercase characters", pkgName)) + } + } + + return ws, errors +} + +func validateComposerEnvironmentEnvVariables(v interface{}, k string) (ws []string, errors []error) { + if v == nil { + return ws, errors + } + + reEnvVarName := regexp.MustCompile(composerEnvironmentEnvVariablesRegexp) + reAirflowReserved := regexp.MustCompile(composerEnvironmentReservedAirflowEnvVarRegexp) + + for envVarName := range v.(map[string]interface{}) { + if !reEnvVarName.MatchString(envVarName) { + errors = append(errors, + fmt.Errorf("env_variable %q must match regexp %q", envVarName, composerEnvironmentEnvVariablesRegexp)) + } else if _, ok := composerEnvironmentReservedEnvVar[envVarName]; ok { + errors = append(errors, + fmt.Errorf("env_variable %q is a reserved name and cannot be used", envVarName)) + } else if reAirflowReserved.MatchString(envVarName) { + errors = append(errors, + fmt.Errorf("env_variable %q cannot match reserved Airflow variable names with regexp %q", + envVarName, composerEnvironmentReservedAirflowEnvVarRegexp)) + } + } + + return ws, errors +} + +func handleComposerEnvironmentCreationOpFailure(id string, envName *composerEnvironmentName, d *schema.ResourceData, config *Config) error { + log.Printf("[WARNING] Creation operation for Composer Environment %q failed, check Environment isn't still running", id) + // Try to get possible created but invalid environment. + env, err := config.clientComposer.Projects.Locations.Environments.Get(envName.resourceName()).Do() + if err != nil { + // If error is 401, we don't have to clean up environment, return nil. + // Otherwise, we encountered another error. + return handleNotFoundError(err, d, fmt.Sprintf("Composer Environment %q", envName.resourceName())) + } + + if env.State == "CREATING" { + return fmt.Errorf( + "Getting creation operation state failed while waiting for environment to finish creating, "+ + "but environment seems to still be in 'CREATING' state. Wait for operation to finish and either "+ + "manually delete environment or import %q into your state", id) + } + + log.Printf("[WARNING] Environment %q from failed creation operation was created, deleting.", id) + op, err := config.clientComposer.Projects.Locations.Environments.Delete(envName.resourceName()).Do() + if err != nil { + return fmt.Errorf("Could not delete the invalid created environment with state %q: %s", env.State, err) + } + + waitErr := composerOperationWaitTime( + config.clientComposer, op, envName.Project, + fmt.Sprintf("Deleting invalid created Environment with state %q", env.State), + int(d.Timeout(schema.TimeoutCreate).Minutes())) + if waitErr != nil { + return fmt.Errorf("Error waiting to delete invalid Environment with state %q: %s", env.State, waitErr) + } + + return nil +} + +func getComposerEnvironmentPostCreateUpdateObj(env *composer.Environment) (updateEnv *composer.Environment) { + // pypiPackages can only be added via update + if env != nil && env.Config != nil && env.Config.SoftwareConfig != nil { + if len(env.Config.SoftwareConfig.PypiPackages) > 0 { + updateEnv = &composer.Environment{ + Config: &composer.EnvironmentConfig{ + SoftwareConfig: &composer.SoftwareConfig{ + PypiPackages: env.Config.SoftwareConfig.PypiPackages, + }, + }, + } + // Clear PYPI packages - otherwise, API will return error + // that the create request is invalid. + env.Config.SoftwareConfig.PypiPackages = make(map[string]string) + } + } + + return updateEnv +} + +func resourceComposerEnvironmentName(d *schema.ResourceData, config *Config) (*composerEnvironmentName, error) { + project, err := getProject(d, config) + if err != nil { + return nil, err + } + + region, err := getRegion(d, config) + if err != nil { + return nil, err + } + + return &composerEnvironmentName{ + Project: project, + Region: region, + Environment: d.Get("name").(string), + }, nil +} + +type composerEnvironmentName struct { + Project string + Region string + Environment string +} + +func (n *composerEnvironmentName) resourceName() string { + return fmt.Sprintf("projects/%s/locations/%s/environments/%s", n.Project, n.Region, n.Environment) +} + +func (n *composerEnvironmentName) parentName() string { + return fmt.Sprintf("projects/%s/locations/%s", n.Project, n.Region) +} + +// The value we store (i.e. `old` in this method), might be only the service account email, +// but we expect either the email or the name (projects/.../serviceAccounts/...) +func compareServiceAccountEmailToLink(_, old, new string, _ *schema.ResourceData) bool { + // old is the service account email returned from the server. + if !strings.HasPrefix("projects/", old) { + return old == GetResourceNameFromSelfLink(new) + } + return compareSelfLinkRelativePaths("", old, new, nil) +} + +func validateServiceAccountRelativeNameOrEmail(v interface{}, k string) (ws []string, errors []error) { + value := v.(string) + + serviceAccountRe := "(" + strings.Join(PossibleServiceAccountNames, "|") + ")" + if strings.HasPrefix(value, "projects/") { + serviceAccountRe = fmt.Sprintf("projects/(.+)/serviceAccounts/%s", serviceAccountRe) + } + r := regexp.MustCompile(serviceAccountRe) + if !r.MatchString(value) { + errors = append(errors, fmt.Errorf( + "%q (%q) doesn't match regexp %q", k, value, serviceAccountRe)) + } + + return +} diff --git a/google/resource_composer_environment_test.go b/google/resource_composer_environment_test.go new file mode 100644 index 00000000..8ca53ba7 --- /dev/null +++ b/google/resource_composer_environment_test.go @@ -0,0 +1,491 @@ +package google + +import ( + "fmt" + "testing" + + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" + "google.golang.org/api/composer/v1" + "google.golang.org/api/storage/v1" + "log" + "strings" +) + +const testComposerEnvironmentPrefix = "tf-cc-testenv" +const testComposerNetworkPrefix = "tf-cc-testnet" + +func init() { + resource.AddTestSweepers("gcp_composer_environment", &resource.Sweeper{ + Name: "gcp_composer_environment", + F: testSweepComposerResources, + }) +} + +// Checks environment creation with minimum required information. +func TestAccComposerEnvironment_basic(t *testing.T) { + t.Parallel() + + envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix) + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccComposerEnvironmentDestroy, + Steps: []resource.TestStep{ + { + Config: testAccComposerEnvironment_basic(envName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.airflow_uri"), + resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.gke_cluster"), + resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.node_count"), + resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.node_config.0.zone"), + resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.node_config.0.machine_type")), + }, + { + ResourceName: "google_composer_environment.test", + ImportState: true, + ImportStateVerify: true, + }, + { + ResourceName: "google_composer_environment.test", + ImportState: true, + ImportStateId: fmt.Sprintf("projects/%s/locations/%s/environments/%s", getTestProjectFromEnv(), "us-central1", envName), + ImportStateVerify: true, + }, + }, + }) +} + +// Checks that all updatable fields can be updated in one apply +// (PATCH for Environments only is per-field) +func TestAccComposerEnvironment_update(t *testing.T) { + t.Parallel() + + envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix) + var env composer.Environment + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccComposerEnvironmentDestroy, + Steps: []resource.TestStep{ + { + Config: testAccComposerEnvironment_basic(envName), + Check: resource.ComposeTestCheckFunc( + testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env), + ), + }, + { + Config: testAccComposerEnvironment_update(envName), + Check: resource.ComposeTestCheckFunc( + testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env), + ), + }, + { + ResourceName: "google_composer_environment.test", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +// Checks behavior of node config, including dependencies on Compute resources. +func TestAccComposerEnvironment_withNodeConfig(t *testing.T) { + t.Parallel() + + envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix) + network := acctest.RandomWithPrefix(testComposerNetworkPrefix) + subnetwork := network + "-1" + serviceAccount := acctest.RandomWithPrefix("tf-test") + + var env composer.Environment + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccComposerEnvironmentDestroy, + Steps: []resource.TestStep{ + { + Config: testAccComposerEnvironment_nodeCfg(envName, network, subnetwork, serviceAccount), + Check: testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env), + }, + { + ResourceName: "google_composer_environment.test", + ImportState: true, + ImportStateVerify: true, + }, + // This is a terrible clean-up step in order to get destroy to succeed, + // due to dangling firewall rules left by the Composer Environment blocking network deletion. + // TODO(emilyye): Remove this check if firewall rules bug gets fixed by Composer. + { + PlanOnly: true, + ExpectNonEmptyPlan: false, + Config: testAccComposerEnvironment_nodeCfg(envName, network, subnetwork, serviceAccount), + Check: testAccCheckClearComposerEnvironmentFirewalls(network), + }, + }, + }) +} + +// Checks behavior of config for creation for attributes that must +// be updated during create. +func TestAccComposerEnvironment_withUpdateOnCreate(t *testing.T) { + t.Parallel() + + envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix) + var env composer.Environment + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccComposerEnvironmentDestroy, + Steps: []resource.TestStep{ + { + Config: testAccComposerEnvironment_updateOnlyFields(envName), + Check: resource.ComposeTestCheckFunc( + testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env), + ), + }, + { + ResourceName: "google_composer_environment.test", + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccCheckComposerEnvironmentExists(n string, environment *composer.Environment) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + + idTokens := strings.Split(rs.Primary.ID, "/") + if len(idTokens) != 3 { + return fmt.Errorf("Invalid ID %q, expected format {project}/{region}/{environment}", rs.Primary.ID) + } + envName := &composerEnvironmentName{ + Project: idTokens[0], + Region: idTokens[1], + Environment: idTokens[2], + } + + nameFromId := envName.resourceName() + config := testAccProvider.Meta().(*Config) + + found, err := config.clientComposer.Projects.Locations.Environments.Get(nameFromId).Do() + if err != nil { + return err + } + + if found.Name != nameFromId { + return fmt.Errorf("Environment not found") + } + + *environment = *found + return nil + } +} + +func testAccComposerEnvironmentDestroy(s *terraform.State) error { + config := testAccProvider.Meta().(*Config) + + for _, rs := range s.RootModule().Resources { + if rs.Type != "google_composer_environment" { + continue + } + + idTokens := strings.Split(rs.Primary.ID, "/") + if len(idTokens) != 3 { + return fmt.Errorf("Invalid ID %q, expected format {project}/{region}/{environment}", rs.Primary.ID) + } + envName := &composerEnvironmentName{ + Project: idTokens[0], + Region: idTokens[1], + Environment: idTokens[2], + } + + _, err := config.clientComposer.Projects.Locations.Environments.Get(envName.resourceName()).Do() + if err == nil { + return fmt.Errorf("environment %s still exists", envName.resourceName()) + } + } + + return nil +} + +func testAccComposerEnvironment_basic(name string) string { + return fmt.Sprintf(` +resource "google_composer_environment" "test" { + name = "%s" + region = "us-central1" +} +`, name) +} + +func testAccComposerEnvironment_update(name string) string { + return fmt.Sprintf(` +resource "google_composer_environment" "test" { + name = "%s" + region = "us-central1" + + config { + node_count = 4 + + software_config { + airflow_config_overrides { + core-load_example = "True" + } + + pypi_packages { + numpy = "" + } + + env_variables { + FOO = "bar" + } + } + } + + labels { + foo = "bar" + anotherlabel = "boo" + } +} +`, name) +} + +func testAccComposerEnvironment_nodeCfg(environment, network, subnetwork, serviceAccount string) string { + return fmt.Sprintf(` +resource "google_composer_environment" "test" { + name = "%s" + region = "us-central1" + config { + node_config { + network = "${google_compute_network.test.self_link}" + subnetwork = "${google_compute_subnetwork.test.self_link}" + + service_account = "${google_service_account.test.name}" + } + } + + depends_on = ["google_project_iam_member.composer-worker"] +} + +resource "google_compute_network" "test" { + name = "%s" + auto_create_subnetworks = false +} + +resource "google_compute_subnetwork" "test" { + name = "%s" + ip_cidr_range = "10.2.0.0/16" + region = "us-central1" + network = "${google_compute_network.test.self_link}" +} + +resource "google_service_account" "test" { + account_id = "%s" + display_name = "Test Service Account for Composer Environment" +} + +resource "google_project_iam_member" "composer-worker" { + role = "roles/composer.worker" + member = "serviceAccount:${google_service_account.test.email}" +} +`, environment, network, subnetwork, serviceAccount) +} + +func testAccComposerEnvironment_updateOnlyFields(name string) string { + return fmt.Sprintf(` +resource "google_composer_environment" "test" { + name = "%s" + region = "us-central1" + config { + software_config { + pypi_packages { + scipy = "==1.1.0" + } + } + } +} +`, name) +} + +/** + * CLEAN UP HELPER FUNCTIONS + */ +func testSweepComposerResources(region string) error { + config, err := sharedConfigForRegion(region) + if err != nil { + return fmt.Errorf("error getting shared config for region: %s", err) + } + + err = config.loadAndValidate() + if err != nil { + log.Fatalf("error loading: %s", err) + } + + // Environments need to be cleaned up because the service is flaky. + if err := testSweepComposerEnvironments(config); err != nil { + return err + } + + // Buckets need to be cleaned up because they just don't get deleted on purpose. + if err := testSweepComposerEnvironmentBuckets(config); err != nil { + return err + } + + return nil +} + +func testSweepComposerEnvironments(config *Config) error { + found, err := config.clientComposer.Projects.Locations.Environments.List( + fmt.Sprintf("projects/%s/locations/%s", config.Project, config.Region)).Do() + if err != nil { + return fmt.Errorf("error listing storage buckets for composer environment: %s", err) + } + + if len(found.Environments) == 0 { + log.Printf("No environment need to be cleaned up") + return nil + } + + var allErrors error + for _, e := range found.Environments { + switch e.State { + case "CREATING": + case "UPDATING": + allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete pending Environment %q with state %q", e.Name, e.State)) + case "DELETING": + log.Printf("Environment %q is currently deleting", e.Name) + case "RUNNING": + case "ERROR": + default: + op, deleteErr := config.clientComposer.Projects.Locations.Environments.Delete(e.Name).Do() + if deleteErr != nil { + allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete environment %q: %s", e.Name, deleteErr)) + continue + } + waitErr := composerOperationWaitTime(config.clientComposer, op, config.Project, "Sweeping old test environments", 10) + if waitErr != nil { + allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete environment %q: %s", e.Name, waitErr)) + } + } + } + return allErrors +} + +func testSweepComposerEnvironmentBuckets(config *Config) error { + artifactsBName := fmt.Sprintf("artifacts.%s.appspot.com", config.Project) + artifactBucket, err := config.clientStorage.Buckets.Get(artifactsBName).Do() + if err == nil { + if err := testSweepComposerEnvironmentCleanUpBucket(config, artifactBucket); err != nil { + return err + } + } else if isGoogleApiErrorWithCode(err, 404) { + log.Printf("could not find bucket %q to clean up", artifactsBName) + } else { + return err + } + + found, err := config.clientStorage.Buckets.List(config.Project).Prefix(config.Region).Do() + if err != nil { + return fmt.Errorf("error listing storage buckets created when testing composer environment: %s", err) + } + if len(found.Items) == 0 { + log.Printf("No environment buckets need to be cleaned up") + return nil + } + + for _, bucket := range found.Items { + if _, ok := bucket.Labels["goog-composer-environment"]; ok { + continue + } + if err := testSweepComposerEnvironmentCleanUpBucket(config, bucket); err != nil { + return err + } + } + return nil +} + +func testSweepComposerEnvironmentCleanUpBucket(config *Config, bucket *storage.Bucket) error { + var allErrors error + objList, err := config.clientStorage.Objects.List(bucket.Name).Do() + if err != nil { + allErrors = multierror.Append(allErrors, + fmt.Errorf("Unable to list objects to delete for bucket %q: %s", bucket.Name, err)) + } + + for _, o := range objList.Items { + if err := config.clientStorage.Objects.Delete(bucket.Name, o.Name).Do(); err != nil { + allErrors = multierror.Append(allErrors, + fmt.Errorf("Unable to delete object %q from bucket %q: %s", o.Name, bucket.Name, err)) + } + } + + if err := config.clientStorage.Buckets.Delete(bucket.Name).Do(); err != nil { + allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete bucket %q: %s", bucket.Name, err)) + } + + if err := config.clientStorage.Buckets.Delete(bucket.Name).Do(); err != nil { + allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete bucket %q: %s", bucket.Name, err)) + } + + if allErrors != nil { + return fmt.Errorf("Unable to clean up bucket %q: %v", bucket.Name, allErrors) + } + return nil +} + +// WARNING: This is not actually a check and is a terrible clean-up step because Composer Environments +// have a bug that hasn't been fixed. Composer will add firewalls to non-default networks for environments +// but will not remove them when the Environment is deleted. +// +// Destroy test step for config with a network will fail unless we clean up the firewalls before. +func testAccCheckClearComposerEnvironmentFirewalls(networkName string) resource.TestCheckFunc { + return func(s *terraform.State) error { + config := testAccProvider.Meta().(*Config) + config.Project = getTestProjectFromEnv() + network, err := config.clientCompute.Networks.Get(getTestProjectFromEnv(), networkName).Do() + if err != nil { + return err + } + + foundFirewalls, err := config.clientCompute.Firewalls.List(config.Project).Do() + if err != nil { + return fmt.Errorf("Unable to list firewalls for network %q: %s", network.Name, err) + } + + var allErrors error + for _, firewall := range foundFirewalls.Items { + if !strings.HasPrefix(firewall.Name, testComposerNetworkPrefix) { + continue + } + log.Printf("[DEBUG] Deleting firewall %q for test-resource network %q", firewall.Name, network.Name) + op, err := config.clientCompute.Firewalls.Delete(config.Project, firewall.Name).Do() + if err != nil { + allErrors = multierror.Append(allErrors, + fmt.Errorf("Unable to delete firewalls for network %q: %s", network.Name, err)) + continue + } + + waitErr := computeOperationWait(config.clientCompute, op, config.Project, + "Sweeping test composer environment firewalls") + if waitErr != nil { + allErrors = multierror.Append(allErrors, + fmt.Errorf("Error while waiting to delete firewall %q: %s", firewall.Name, waitErr)) + } + } + return allErrors + } +} diff --git a/website/docs/r/composer_environment.html.markdown b/website/docs/r/composer_environment.html.markdown new file mode 100644 index 00000000..dbd50d41 --- /dev/null +++ b/website/docs/r/composer_environment.html.markdown @@ -0,0 +1,316 @@ +layout: "google" +page_title: "Google: google_composer_environment" +sidebar_current: "docs-google-composer-environment" +description: |- + An environment for running orchestration tasks. +--- + +# google\_composer\_environment + +An environment for running orchestration tasks. + +Environments run Apache Airflow software on Google infrastructure. + +To get more information about Environments, see: + +* [API documentation](https://cloud.google.com/composer/docs/reference/rest/) +* How-to Guides + * [Official Documentation](https://cloud.google.com/composer/docs) + * [Configuring Shared VPC for Composer Environments](https://cloud.google.com/composer/docs/how-to/managing/configuring-shared-vpc) +* [Apache Airflow Documentation](http://airflow.apache.org/) + +~> **Warning:** We **STRONGLY** recommend you read the [GCP guides](https://cloud.google.com/composer/docs/how-to) + as the Environment resource requires a long deployment process and involves several layers of GCP infrastructure, + including a Kubernetes Engine cluster, Cloud Storage, and Compute networking resources. Due to limitations of the API, + Terraform will not be able to automatically find or manage many of these underlying resources. In particular: + * It can take up to one hour to create or update an environment resource. In addition, GCP may only detect some + errors in configuration when they are used (e.g. ~40-50 minutes into the creation process), and is prone to limited + error reporting. If you encounter confusing or uninformative errors, please verify your configuration is valid + against GCP Cloud Composer before filing bugs against the Terraform provider. + * **Environments create Google Cloud Storage buckets that do not get cleaned up automatically** on environment + deletion. [More about Composer's use of Cloud Storage](https://cloud.google.com/composer/docs/concepts/cloud-storage). + +## Example Usage + +### Basic Usage +```hcl +resource "google_composer_environment" "test" { + name = "my-composer-env" + region = "us-central1" +} +``` + +### With GKE and Compute Resource Dependencies + +**NOTE** To use service accounts, you need to give `role/composer.worker` to the service account on any resources that may be created for the environment +(i.e. at a project level). This will probably require an explicit dependency +on the IAM policy binding (see `google_project_iam_member` below). + +```hcl +resource "google_composer_environment" "test" { + name = "%s" + region = "us-central1" + config { + node_count = 4 + + node_config { + zone = "us-central1-a" + machine_type = "n1-standard-1" + + network = "${google_compute_network.test.self_link}" + subnetwork = "${google_compute_subnetwork.test.self_link}" + + service_account = "${google_service_account.test.name}" + } + } + + depends_on = ["google_project_iam_member.composer-worker"] +} + +resource "google_compute_network" "test" { + name = "composer-test-network" + auto_create_subnetworks = false +} + +resource "google_compute_subnetwork" "test" { + name = "composer-test-subnetwork" + ip_cidr_range = "10.2.0.0/16" + region = "us-central1" + network = "${google_compute_network.test.self_link}" +} + +resource "google_service_account" "test" { + account_id = "composer-env-account" + display_name = "Test Service Account for Composer Environment" +} + +resource "google_project_iam_member" "composer-worker" { + role = "roles/composer.worker" + member = "serviceAccount:${google_service_account.test.email}" +} +``` + +### With Software (Airflow) Config +```hcl +resource "google_composer_environment" "test" { + name = "%s" + region = "us-central1" + + config { + software_config { + airflow_config_overrides { + core-load_example = "True" + } + + pypi_packages { + numpy = "" + scipy = "==1.1.0" + } + + env_variables { + FOO = "bar" + } + } + } +} +``` + +## Argument Reference + +The following arguments are supported: + + +* `name` - + (Required) + Name of the environment + + +- - - +* `config` - + (Optional) + Configuration parameters for this environment Structure is documented below. + +* `labels` - + (Optional) + User-defined labels for this environment. The labels map can contain + no more than 64 entries. Entries of the labels map are UTF8 strings + that comply with the following restrictions: + Label keys must be between 1 and 63 characters long and must conform + to the following regular expression: `[a-z]([-a-z0-9]*[a-z0-9])?`. + Label values must be between 0 and 63 characters long and must + conform to the regular expression `([a-z]([-a-z0-9]*[a-z0-9])?)?`. + No more than 64 labels can be associated with a given environment. + Both keys and values must be <= 128 bytes in size. + +* `region` - + (Optional) + The location or Compute Engine region for the environment. +* `project` - (Optional) The ID of the project in which the resource belongs. + If it is not provided, the provider project is used. + +The `config` block supports: + +* `node_count` - + (Optional) + The number of nodes in the Kubernetes Engine cluster that + will be used to run this environment. + +* `node_config` - + (Optional) + The configuration used for the Kubernetes Engine cluster. Structure is documented below. + +* `software_config` - + (Optional) + The configuration settings for software inside the environment. Structure is documented below. + +The `node_config` block supports: + +* `zone` - + (Optional) + The Compute Engine zone in which to deploy the VMs running the + Apache Airflow software, specified as the zone name or + relative resource name (e.g. "projects/{project}/zones/{zone}"). Must belong to the enclosing environment's project + and region. + + If both zone and machineType are specified, machineType must belong to this zone. If neither is specified, the service + will pick default values in the specified resource's region. If only one of zone or machineType is specified, the + location information from the specified field will be used for the location-unspecified field. + +* `machine_type` - + (Optional) + The Compute Engine machine type used for cluster instances, + specified as a name or relative resource name. For example: + "projects/{project}/zones/{zone}/machineTypes/{machineType}". Must belong to the enclosing environment's project and + region/zone. + + If both zone and machineType are specified, machineType must belong to this zone. If neither is specified, the service + will pick default values in the specified resource's region. If only one of zone or machineType is specified, the + location information from the specified field will be used for the location-unspecified field. + +* `network` - + (Optional) + The Compute Engine network to be used for machine + communications, specified as a self-link, relative resource name + (e.g. "projects/{project}/global/networks/{network}"), by name. + + The network must belong to the environment's project. If unspecified, the "default" network ID in the environment's + project is used. If a Custom Subnet Network is provided, subnetwork must also be provided. + +* `subnetwork` - + (Optional) + The Compute Engine subnetwork to be used for machine + communications, , specified as a self-link, relative resource name (e.g. + "projects/{project}/regions/{region}/subnetworks/{subnetwork}"), or by name. If subnetwork is provided, + network must also be provided and the subnetwork must belong to the enclosing environment's project and region. + +* `disk_size_gb` - + (Optional) + The disk size in GB used for node VMs. Minimum size is 20GB. + If unspecified, defaults to 100GB. Cannot be updated. + +* `oauth_scopes` - + (Optional) + The set of Google API scopes to be made available on all node + VMs. Cannot be updated. If empty, defaults to + `["https://www.googleapis.com/auth/cloud-platform"]` + +* `service_account` - + (Optional) + The Google Cloud Platform Service Account to be used by the + node VMs. If a service account is not specified, the "default" + Compute Engine service account is used. Cannot be updated. If given, + note that the service account must have `roles/composer.worker` + for any GCP resources created under the Cloud Composer Environment. + +* `tags` - + (Optional) + The list of instance tags applied to all node VMs. Tags are + used to identify valid sources or targets for network + firewalls. Each tag within the list must comply with RFC1035. + Cannot be updated. + +The `software_config` block supports: + +* `airflow_config_overrides` - + (Optional) Apache Airflow configuration properties to override. Property keys contain the section and property names, + separated by a hyphen, for example "core-dags_are_paused_at_creation". + + Section names must not contain hyphens ("-"), opening square brackets ("["), or closing square brackets ("]"). + The property name must not be empty and cannot contain "=" or ";". Section and property names cannot contain + characters: "." Apache Airflow configuration property names must be written in snake_case. Property values can + contain any character, and can be written in any lower/upper case format. Certain Apache Airflow configuration + property values are [blacklisted](https://cloud.google.com/composer/docs/concepts/airflow-configurations#airflow_configuration_blacklists), + and cannot be overridden. + +* `pypi_packages` - + (Optional) + Custom Python Package Index (PyPI) packages to be installed + in the environment. Keys refer to the lowercase package name (e.g. "numpy"). Values are the lowercase extras and + version specifier (e.g. "==1.12.0", "[devel,gcp_api]", "[devel]>=1.8.2, <1.9.2"). To specify a package without + pinning it to a version specifier, use the empty string as the value. + +* `env_variables` - + (Optional) + Additional environment variables to provide to the Apache Airflow scheduler, worker, and webserver processes. + Environment variable names must match the regular expression `[a-zA-Z_][a-zA-Z0-9_]*`. + They cannot specify Apache Airflow software configuration overrides (they cannot match the regular expression + `AIRFLOW__[A-Z0-9_]+__[A-Z0-9_]+`), and they cannot match any of the following reserved names: + ``` + AIRFLOW_HOME + C_FORCE_ROOT + CONTAINER_NAME + DAGS_FOLDER + GCP_PROJECT + GCS_BUCKET + GKE_CLUSTER_NAME + SQL_DATABASE + SQL_INSTANCE + SQL_PASSWORD + SQL_PROJECT + SQL_REGION + SQL_USER + ``` + +## Attributes Reference + +In addition to the arguments listed above, the following computed attributes are exported: + +* `config.gke_cluster` - + The Kubernetes Engine cluster used to run this environment. + +* `config.dag_gcs_prefix` - + The Cloud Storage prefix of the DAGs for this environment. + Although Cloud Storage objects reside in a flat namespace, a + hierarchical file tree can be simulated using '/'-delimited + object name prefixes. DAG objects for this environment + reside in a simulated directory with this prefix. + +* `config.airflow_uri` - + The URI of the Apache Airflow Web UI hosted within this + environment. + +* `config.software_config.image_version` - + The version of the software running in the environment. This encapsulates both the version of Cloud Composer + functionality and the version of Apache Airflow. It must match the regular expression + `composer-[0-9]+\.[0-9]+(\.[0-9]+)?-airflow-[0-9]+\.[0-9]+(\.[0-9]+.*)?`. + The Cloud Composer portion of the version is a semantic version. + The portion of the image version following 'airflow-' is an official Apache Airflow repository release name. + +## Timeouts + +This resource provides the following +[Timeouts](/docs/configuration/resources.html#timeouts) configuration options: + +- `create` - Default is 60 minutes. +- `update` - Default is 60 minutes. +- `delete` - Default is 6 minutes. + +## Import + +Environment can be imported using any of these accepted formats: + +``` +$ terraform import google_composer_environment.default projects/{{project}}/locations/{{region}}/environments/{{name}} +$ terraform import google_composer_environment.default {{project}}/{{region}}/{{name}} +$ terraform import google_composer_environment.default {{name}} +```