Add Cloud Composer Environments resource (#2001)

Adds Cloud Composer Environment resource and test sweeper
This commit is contained in:
emily 2018-09-11 13:13:11 -07:00 committed by GitHub
parent afcbb859ca
commit 017f2f9b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1763 additions and 20 deletions

View File

@ -0,0 +1,71 @@
package google
import (
"fmt"
"log"
"time"
"github.com/hashicorp/terraform/helper/resource"
composer "google.golang.org/api/composer/v1"
)
type ComposerOperationWaiter struct {
Service *composer.ProjectsLocationsService
Op *composer.Operation
}
func (w *ComposerOperationWaiter) RefreshFunc() resource.StateRefreshFunc {
return func() (interface{}, string, error) {
op, err := w.Service.Operations.Get(w.Op.Name).Do()
if err != nil {
return nil, "", err
}
log.Printf("[DEBUG] Got %v while polling for operation %s's 'done' status", op.Done, w.Op.Name)
return op, fmt.Sprint(op.Done), nil
}
}
func (w *ComposerOperationWaiter) Conf() *resource.StateChangeConf {
return &resource.StateChangeConf{
Pending: []string{"false"},
Target: []string{"true"},
Refresh: w.RefreshFunc(),
}
}
func composerOperationWait(service *composer.Service, op *composer.Operation, project, activity string) error {
return composerOperationWaitTime(service, op, project, activity, 10)
}
func composerOperationWaitTime(service *composer.Service, op *composer.Operation, project, activity string, timeoutMin int) error {
if op.Done {
if op.Error != nil {
return fmt.Errorf("Error code %v, message: %s", op.Error.Code, op.Error.Message)
}
return nil
}
w := &ComposerOperationWaiter{
Service: service.Projects.Locations,
Op: op,
}
state := w.Conf()
state.Delay = 10 * time.Second
state.Timeout = time.Duration(timeoutMin) * time.Minute
state.MinTimeout = 2 * time.Second
opRaw, err := state.WaitForState()
if err != nil {
return fmt.Errorf("Error waiting for %s: %s", activity, err)
}
op = opRaw.(*composer.Operation)
if op.Error != nil {
return fmt.Errorf("Error code %v, message: %s", op.Error.Code, op.Error.Message)
}
return nil
}

View File

@ -103,7 +103,6 @@ func Provider() terraform.ResourceProvider {
GeneratedComputeResourcesMap,
GeneratedContainerAnalysisResourcesMap,
GeneratedRedisResourcesMap,
GeneratedComposerResourcesMap,
GeneratedResourceManagerResourcesMap,
map[string]*schema.Resource{
"google_bigquery_dataset": resourceBigQueryDataset(),
@ -113,6 +112,7 @@ func Provider() terraform.ResourceProvider {
"google_cloudbuild_trigger": resourceCloudBuildTrigger(),
"google_cloudfunctions_function": resourceCloudFunctionsFunction(),
"google_cloudiot_registry": resourceCloudIoTRegistry(),
"google_composer_environment": resourceComposerEnvironment(),
"google_compute_autoscaler": resourceComputeAutoscaler(),
"google_compute_address": resourceComputeAddress(),
"google_compute_attached_disk": resourceComputeAttachedDisk(),

View File

@ -1,19 +0,0 @@
// ----------------------------------------------------------------------------
//
// *** AUTO GENERATED CODE *** AUTO GENERATED CODE ***
//
// ----------------------------------------------------------------------------
//
// This file is automatically generated by Magic Modules and manual
// changes will be clobbered when the file is regenerated.
//
// Please read more about how to change this file in
// .github/CONTRIBUTING.md.
//
// ----------------------------------------------------------------------------
package google
import "github.com/hashicorp/terraform/helper/schema"
var GeneratedComposerResourcesMap = map[string]*schema.Resource{}

View File

@ -0,0 +1,884 @@
package google
import (
"fmt"
"log"
"regexp"
"strings"
"time"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/helper/validation"
"google.golang.org/api/composer/v1"
)
const (
composerEnvironmentEnvVariablesRegexp = "[a-zA-Z_][a-zA-Z0-9_]*."
composerEnvironmentReservedAirflowEnvVarRegexp = "AIRFLOW__[A-Z0-9_]+__[A-Z0-9_]+"
)
var composerEnvironmentReservedEnvVar = map[string]struct{}{
"AIRFLOW_HOME": {},
"C_FORCE_ROOT": {},
"CONTAINER_NAME": {},
"DAGS_FOLDER": {},
"GCP_PROJECT": {},
"GCS_BUCKET": {},
"GKE_CLUSTER_NAME": {},
"SQL_DATABASE": {},
"SQL_INSTANCE": {},
"SQL_PASSWORD": {},
"SQL_PROJECT": {},
"SQL_REGION": {},
"SQL_USER": {},
}
func resourceComposerEnvironment() *schema.Resource {
return &schema.Resource{
Create: resourceComposerEnvironmentCreate,
Read: resourceComposerEnvironmentRead,
Update: resourceComposerEnvironmentUpdate,
Delete: resourceComposerEnvironmentDelete,
Importer: &schema.ResourceImporter{
State: resourceComposerEnvironmentImport,
},
Timeouts: &schema.ResourceTimeout{
// Composer takes <= 1 hr for create/update.
Create: schema.DefaultTimeout(3600 * time.Second),
Update: schema.DefaultTimeout(3600 * time.Second),
Delete: schema.DefaultTimeout(360 * time.Second),
},
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validateGCPName,
},
"region": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ForceNew: true,
},
"config": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"node_count": {
Type: schema.TypeInt,
Computed: true,
Optional: true,
ValidateFunc: validation.IntAtLeast(3),
},
"node_config": {
Type: schema.TypeList,
Computed: true,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"zone": {
Type: schema.TypeString,
Computed: true,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},
"machine_type": {
Type: schema.TypeString,
Computed: true,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},
"network": {
Type: schema.TypeString,
Computed: true,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},
"subnetwork": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},
"disk_size_gb": {
Type: schema.TypeInt,
Computed: true,
Optional: true,
ForceNew: true,
},
"oauth_scopes": {
Type: schema.TypeSet,
Computed: true,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Set: schema.HashString,
},
"service_account": {
Type: schema.TypeString,
Computed: true,
Optional: true,
ForceNew: true,
ValidateFunc: validateServiceAccountRelativeNameOrEmail,
DiffSuppressFunc: compareServiceAccountEmailToLink,
},
"tags": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Set: schema.HashString,
},
},
},
},
"software_config": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"airflow_config_overrides": {
Type: schema.TypeMap,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"pypi_packages": {
Type: schema.TypeMap,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
ValidateFunc: validateComposerEnvironmentPypiPackages,
},
"env_variables": {
Type: schema.TypeMap,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
ValidateFunc: validateComposerEnvironmentEnvVariables,
},
"image_version": {
Type: schema.TypeString,
Computed: true,
},
},
},
},
"airflow_uri": {
Type: schema.TypeString,
Computed: true,
},
"dag_gcs_prefix": {
Type: schema.TypeString,
Computed: true,
},
"gke_cluster": {
Type: schema.TypeString,
Computed: true,
},
},
},
},
"labels": {
Type: schema.TypeMap,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
},
}
}
func resourceComposerEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
envName, err := resourceComposerEnvironmentName(d, config)
if err != nil {
return err
}
transformedConfig, err := expandComposerEnvironmentConfig(d.Get("config"), d, config)
if err != nil {
return err
}
env := &composer.Environment{
Name: envName.resourceName(),
Labels: expandLabels(d),
Config: transformedConfig,
}
// Some fields cannot be specified during create and must be updated post-creation.
updateOnlyEnv := getComposerEnvironmentPostCreateUpdateObj(env)
log.Printf("[DEBUG] Creating new Environment %q", envName.parentName())
op, err := config.clientComposer.Projects.Locations.Environments.Create(envName.parentName(), env).Do()
if err != nil {
return err
}
// Store the ID now
id, err := replaceVars(d, config, "{{project}}/{{region}}/{{name}}")
if err != nil {
return fmt.Errorf("Error constructing id: %s", err)
}
d.SetId(id)
waitErr := composerOperationWaitTime(
config.clientComposer, op, envName.Project, "Creating Environment",
int(d.Timeout(schema.TimeoutCreate).Minutes()))
if waitErr != nil {
// The resource didn't actually get created, remove from state.
d.SetId("")
errMsg := fmt.Sprintf("Error waiting to create Environment: %s", waitErr)
if err := handleComposerEnvironmentCreationOpFailure(id, envName, d, config); err != nil {
return fmt.Errorf("Error waiting to create Environment: %s. An initial "+
"environment was or is still being created, and clean up failed with "+
"error: %s.", errMsg, err)
}
return fmt.Errorf("Error waiting to create Environment: %s", waitErr)
}
log.Printf("[DEBUG] Finished creating Environment %q: %#v", d.Id(), op)
if err := resourceComposerEnvironmentPostCreateUpdate(updateOnlyEnv, d, config); err != nil {
return err
}
return resourceComposerEnvironmentRead(d, meta)
}
func resourceComposerEnvironmentRead(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
envName, err := resourceComposerEnvironmentName(d, config)
if err != nil {
return err
}
res, err := config.clientComposer.Projects.Locations.Environments.Get(envName.resourceName()).Do()
if err != nil {
return handleNotFoundError(err, d, fmt.Sprintf("ComposerEnvironment %q", d.Id()))
}
// Set from getProject(d)
if err := d.Set("project", envName.Project); err != nil {
return fmt.Errorf("Error reading Environment: %s", err)
}
// Set from getRegion(d)
if err := d.Set("region", envName.Region); err != nil {
return fmt.Errorf("Error reading Environment: %s", err)
}
if err := d.Set("name", GetResourceNameFromSelfLink(res.Name)); err != nil {
return fmt.Errorf("Error reading Environment: %s", err)
}
if err := d.Set("config", flattenComposerEnvironmentConfig(res.Config)); err != nil {
return fmt.Errorf("Error reading Environment: %s", err)
}
if err := d.Set("labels", res.Labels); err != nil {
return fmt.Errorf("Error reading Environment: %s", err)
}
return nil
}
func resourceComposerEnvironmentUpdate(d *schema.ResourceData, meta interface{}) error {
tfConfig := meta.(*Config)
d.Partial(true)
// Composer only allows PATCHing one field at a time, so for each updatable field, we
// 1. determine if it needs to be updated
// 2. construct a PATCH object with only that field populated
// 3. call resourceComposerEnvironmentPatchField(...)to update that single field.
if d.HasChange("config") {
config, err := expandComposerEnvironmentConfig(d.Get("config"), d, tfConfig)
if err != nil {
return err
}
if d.HasChange("config.0.software_config.0.airflow_config_overrides") {
patchObj := &composer.Environment{
Config: &composer.EnvironmentConfig{
SoftwareConfig: &composer.SoftwareConfig{
AirflowConfigOverrides: make(map[string]string),
},
},
}
if config != nil && config.SoftwareConfig != nil && len(config.SoftwareConfig.AirflowConfigOverrides) > 0 {
patchObj.Config.SoftwareConfig.AirflowConfigOverrides = config.SoftwareConfig.AirflowConfigOverrides
}
err = resourceComposerEnvironmentPatchField("config.softwareConfig.airflowConfigOverrides", patchObj, d, tfConfig)
if err != nil {
return err
}
d.SetPartial("config")
}
if d.HasChange("config.0.software_config.0.env_variables") {
patchObj := &composer.Environment{
Config: &composer.EnvironmentConfig{
SoftwareConfig: &composer.SoftwareConfig{
EnvVariables: make(map[string]string),
},
},
}
if config != nil && config.SoftwareConfig != nil && len(config.SoftwareConfig.EnvVariables) > 0 {
patchObj.Config.SoftwareConfig.EnvVariables = config.SoftwareConfig.EnvVariables
}
err = resourceComposerEnvironmentPatchField("config.softwareConfig.envVariables", patchObj, d, tfConfig)
if err != nil {
return err
}
d.SetPartial("config")
}
if d.HasChange("config.0.software_config.0.pypi_packages") {
patchObj := &composer.Environment{
Config: &composer.EnvironmentConfig{
SoftwareConfig: &composer.SoftwareConfig{
PypiPackages: make(map[string]string),
},
},
}
if config != nil && config.SoftwareConfig != nil && config.SoftwareConfig.PypiPackages != nil {
patchObj.Config.SoftwareConfig.PypiPackages = config.SoftwareConfig.PypiPackages
}
err = resourceComposerEnvironmentPatchField("config.softwareConfig.pypiPackages", patchObj, d, tfConfig)
if err != nil {
return err
}
d.SetPartial("config")
}
if d.HasChange("config.0.node_count") {
patchObj := &composer.Environment{Config: &composer.EnvironmentConfig{}}
if config != nil {
patchObj.Config.NodeCount = config.NodeCount
}
err = resourceComposerEnvironmentPatchField("config.nodeCount", patchObj, d, tfConfig)
if err != nil {
return err
}
d.SetPartial("config")
}
}
if d.HasChange("labels") {
patchEnv := &composer.Environment{Labels: expandLabels(d)}
err := resourceComposerEnvironmentPatchField("labels", patchEnv, d, tfConfig)
if err != nil {
return err
}
d.SetPartial("labels")
}
d.Partial(false)
return resourceComposerEnvironmentRead(d, tfConfig)
}
func resourceComposerEnvironmentPostCreateUpdate(updateEnv *composer.Environment, d *schema.ResourceData, cfg *Config) error {
if updateEnv == nil {
return nil
}
d.Partial(true)
if updateEnv.Config != nil && updateEnv.Config.SoftwareConfig != nil && len(updateEnv.Config.SoftwareConfig.PypiPackages) > 0 {
log.Printf("[DEBUG] Running post-create update for Environment %q", d.Id())
err := resourceComposerEnvironmentPatchField("config.softwareConfig.pypiPackages", updateEnv, d, cfg)
if err != nil {
return err
}
log.Printf("[DEBUG] Finish update to Environment %q post create for update only fields", d.Id())
d.SetPartial("config")
}
d.Partial(false)
return resourceComposerEnvironmentRead(d, cfg)
}
func resourceComposerEnvironmentPatchField(updateMask string, env *composer.Environment, d *schema.ResourceData, config *Config) error {
envJson, _ := env.MarshalJSON()
log.Printf("[DEBUG] Updating Environment %q (updateMask = %q): %s", d.Id(), updateMask, string(envJson))
envName, err := resourceComposerEnvironmentName(d, config)
if err != nil {
return err
}
op, err := config.clientComposer.Projects.Locations.Environments.
Patch(envName.resourceName(), env).
UpdateMask(updateMask).Do()
if err != nil {
return err
}
waitErr := composerOperationWaitTime(
config.clientComposer, op, envName.Project, "Updating newly created Environment",
int(d.Timeout(schema.TimeoutCreate).Minutes()))
if waitErr != nil {
// The resource didn't actually update.
return fmt.Errorf("Error waiting to update Environment: %s", waitErr)
}
log.Printf("[DEBUG] Finished updating Environment %q (updateMask = %q)", d.Id(), updateMask)
return nil
}
func resourceComposerEnvironmentDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
envName, err := resourceComposerEnvironmentName(d, config)
if err != nil {
return err
}
log.Printf("[DEBUG] Deleting Environment %q", d.Id())
op, err := config.clientComposer.Projects.Locations.Environments.Delete(envName.resourceName()).Do()
if err != nil {
return err
}
err = composerOperationWaitTime(
config.clientComposer, op, envName.Project, "Deleting Environment",
int(d.Timeout(schema.TimeoutDelete).Minutes()))
if err != nil {
return err
}
log.Printf("[DEBUG] Finished deleting Environment %q: %#v", d.Id(), op)
return nil
}
func resourceComposerEnvironmentImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
config := meta.(*Config)
parseImportId([]string{"projects/(?P<project>[^/]+)/locations/(?P<region>[^/]+)/environments/(?P<name>[^/]+)", "(?P<project>[^/]+)/(?P<region>[^/]+)/(?P<name>[^/]+)", "(?P<name>[^/]+)"}, d, config)
// Replace import id for the resource id
id, err := replaceVars(d, config, "{{project}}/{{region}}/{{name}}")
if err != nil {
return nil, fmt.Errorf("Error constructing id: %s", err)
}
d.SetId(id)
return []*schema.ResourceData{d}, nil
}
func flattenComposerEnvironmentConfig(envCfg *composer.EnvironmentConfig) interface{} {
if envCfg == nil {
return nil
}
transformed := make(map[string]interface{})
transformed["gke_cluster"] = envCfg.GkeCluster
transformed["dag_gcs_prefix"] = envCfg.DagGcsPrefix
transformed["node_count"] = envCfg.NodeCount
transformed["airflow_uri"] = envCfg.AirflowUri
transformed["node_config"] = flattenComposerEnvironmentConfigNodeConfig(envCfg.NodeConfig)
transformed["software_config"] = flattenComposerEnvironmentConfigSoftwareConfig(envCfg.SoftwareConfig)
return []interface{}{transformed}
}
func flattenComposerEnvironmentConfigNodeConfig(nodeCfg *composer.NodeConfig) interface{} {
if nodeCfg == nil {
return nil
}
transformed := make(map[string]interface{})
transformed["zone"] = nodeCfg.Location
transformed["machine_type"] = nodeCfg.MachineType
transformed["network"] = nodeCfg.Network
transformed["subnetwork"] = nodeCfg.Subnetwork
transformed["disk_size_gb"] = nodeCfg.DiskSizeGb
transformed["service_account"] = nodeCfg.ServiceAccount
transformed["oauth_scopes"] = flattenComposerEnvironmentConfigNodeConfigOauthScopes(nodeCfg.OauthScopes)
transformed["tags"] = flattenComposerEnvironmentConfigNodeConfigTags(nodeCfg.Tags)
return []interface{}{transformed}
}
func flattenComposerEnvironmentConfigNodeConfigOauthScopes(v interface{}) interface{} {
if v == nil {
return v
}
return schema.NewSet(schema.HashString, convertStringArrToInterface(v.([]string)))
}
func flattenComposerEnvironmentConfigNodeConfigTags(v interface{}) interface{} {
if v == nil {
return v
}
return schema.NewSet(schema.HashString, convertStringArrToInterface(v.([]string)))
}
func flattenComposerEnvironmentConfigSoftwareConfig(softwareCfg *composer.SoftwareConfig) interface{} {
if softwareCfg == nil {
return nil
}
transformed := make(map[string]interface{})
transformed["image_version"] = softwareCfg.ImageVersion
transformed["airflow_config_overrides"] = softwareCfg.AirflowConfigOverrides
transformed["pypi_packages"] = softwareCfg.PypiPackages
transformed["env_variables"] = softwareCfg.EnvVariables
return []interface{}{transformed}
}
func expandComposerEnvironmentConfig(v interface{}, d *schema.ResourceData, config *Config) (*composer.EnvironmentConfig, error) {
l := v.([]interface{})
if len(l) == 0 {
return nil, nil
}
original := l[0].(map[string]interface{})
transformed := &composer.EnvironmentConfig{}
if nodeCountRaw, ok := original["node_count"]; ok {
transformedNodeCount, err := expandComposerEnvironmentConfigNodeCount(nodeCountRaw, d, config)
if err != nil {
return nil, err
}
transformed.NodeCount = transformedNodeCount
}
transformedNodeConfig, err := expandComposerEnvironmentConfigNodeConfig(original["node_config"], d, config)
if err != nil {
return nil, err
}
transformed.NodeConfig = transformedNodeConfig
transformedSoftwareConfig, err := expandComposerEnvironmentConfigSoftwareConfig(original["software_config"], d, config)
if err != nil {
return nil, err
}
transformed.SoftwareConfig = transformedSoftwareConfig
return transformed, nil
}
func expandComposerEnvironmentConfigNodeCount(v interface{}, d *schema.ResourceData, config *Config) (int64, error) {
if v == nil {
return 0, nil
}
return int64(v.(int)), nil
}
func expandComposerEnvironmentConfigNodeConfig(v interface{}, d *schema.ResourceData, config *Config) (*composer.NodeConfig, error) {
l := v.([]interface{})
if len(l) == 0 {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := &composer.NodeConfig{}
if transformedDiskSizeGb, ok := original["disk_size_gb"]; ok {
transformed.DiskSizeGb = int64(transformedDiskSizeGb.(int))
}
if v, ok := original["service_account"]; ok {
transformedServiceAccount, err := expandComposerEnvironmentServiceAccount(v, d, config)
if err != nil {
return nil, err
}
transformed.ServiceAccount = transformedServiceAccount
}
var nodeConfigZone string
if v, ok := original["zone"]; ok {
transformedZone, err := expandComposerEnvironmentZone(v, d, config)
if err != nil {
return nil, err
}
transformed.Location = transformedZone
nodeConfigZone = transformedZone
}
if v, ok := original["machine_type"]; ok {
transformedMachineType, err := expandComposerEnvironmentMachineType(v, d, config, nodeConfigZone)
if err != nil {
return nil, err
}
transformed.MachineType = transformedMachineType
}
if v, ok := original["network"]; ok {
transformedNetwork, err := expandComposerEnvironmentNetwork(v, d, config)
if err != nil {
return nil, err
}
transformed.Network = transformedNetwork
}
if v, ok := original["subnetwork"]; ok {
transformedSubnetwork, err := expandComposerEnvironmentSubnetwork(v, d, config)
if err != nil {
return nil, err
}
transformed.Subnetwork = transformedSubnetwork
}
transformedOauthScopes, err := expandComposerEnvironmentSetList(original["oauth_scopes"], d, config)
if err != nil {
return nil, err
}
transformed.OauthScopes = transformedOauthScopes
transformedTags, err := expandComposerEnvironmentSetList(original["tags"], d, config)
if err != nil {
return nil, err
}
transformed.Tags = transformedTags
return transformed, nil
}
func expandComposerEnvironmentServiceAccount(v interface{}, d *schema.ResourceData, config *Config) (string, error) {
serviceAccount := v.(string)
if len(serviceAccount) == 0 {
return "", nil
}
return GetResourceNameFromSelfLink(serviceAccount), nil
}
func expandComposerEnvironmentZone(v interface{}, d *schema.ResourceData, config *Config) (string, error) {
zone := v.(string)
if len(zone) == 0 {
return zone, nil
}
if !strings.Contains(zone, "/") {
project, err := getProject(d, config)
if err != nil {
return "", err
}
return fmt.Sprintf("projects/%s/zones/%s", project, zone), nil
}
return getRelativePath(zone)
}
func expandComposerEnvironmentMachineType(v interface{}, d *schema.ResourceData, config *Config, nodeCfgZone interface{}) (string, error) {
fv, err := ParseMachineTypesFieldValue(v.(string), d, config)
if err != nil {
return "", nil
}
return fv.RelativeLink(), nil
}
func expandComposerEnvironmentNetwork(v interface{}, d *schema.ResourceData, config *Config) (string, error) {
fv, err := ParseNetworkFieldValue(v.(string), d, config)
if err != nil {
return "", err
}
return fv.RelativeLink(), nil
}
func expandComposerEnvironmentSubnetwork(v interface{}, d *schema.ResourceData, config *Config) (string, error) {
fv, err := ParseSubnetworkFieldValue(v.(string), d, config)
if err != nil {
return "", err
}
return fv.RelativeLink(), nil
}
func expandComposerEnvironmentSetList(v interface{}, d *schema.ResourceData, config *Config) ([]string, error) {
if v == nil {
return nil, nil
}
return convertStringArr(v.(*schema.Set).List()), nil
}
func expandComposerEnvironmentConfigSoftwareConfig(v interface{}, d *schema.ResourceData, config *Config) (*composer.SoftwareConfig, error) {
l := v.([]interface{})
if len(l) == 0 {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := &composer.SoftwareConfig{}
transformed.ImageVersion = original["image_version"].(string)
transformed.AirflowConfigOverrides = expandComposerEnvironmentConfigSoftwareConfigStringMap(original, "airflow_config_overrides")
transformed.PypiPackages = expandComposerEnvironmentConfigSoftwareConfigStringMap(original, "pypi_packages")
transformed.EnvVariables = expandComposerEnvironmentConfigSoftwareConfigStringMap(original, "env_variables")
return transformed, nil
}
func expandComposerEnvironmentConfigSoftwareConfigStringMap(softwareConfig map[string]interface{}, k string) map[string]string {
v, ok := softwareConfig[k]
if ok && v != nil {
return convertStringMap(v.(map[string]interface{}))
}
return map[string]string{}
}
func validateComposerEnvironmentPypiPackages(v interface{}, k string) (ws []string, errors []error) {
if v == nil {
return ws, errors
}
for pkgName := range v.(map[string]interface{}) {
if pkgName != strings.ToLower(pkgName) {
errors = append(errors,
fmt.Errorf("PYPI package %q can only contain lowercase characters", pkgName))
}
}
return ws, errors
}
func validateComposerEnvironmentEnvVariables(v interface{}, k string) (ws []string, errors []error) {
if v == nil {
return ws, errors
}
reEnvVarName := regexp.MustCompile(composerEnvironmentEnvVariablesRegexp)
reAirflowReserved := regexp.MustCompile(composerEnvironmentReservedAirflowEnvVarRegexp)
for envVarName := range v.(map[string]interface{}) {
if !reEnvVarName.MatchString(envVarName) {
errors = append(errors,
fmt.Errorf("env_variable %q must match regexp %q", envVarName, composerEnvironmentEnvVariablesRegexp))
} else if _, ok := composerEnvironmentReservedEnvVar[envVarName]; ok {
errors = append(errors,
fmt.Errorf("env_variable %q is a reserved name and cannot be used", envVarName))
} else if reAirflowReserved.MatchString(envVarName) {
errors = append(errors,
fmt.Errorf("env_variable %q cannot match reserved Airflow variable names with regexp %q",
envVarName, composerEnvironmentReservedAirflowEnvVarRegexp))
}
}
return ws, errors
}
func handleComposerEnvironmentCreationOpFailure(id string, envName *composerEnvironmentName, d *schema.ResourceData, config *Config) error {
log.Printf("[WARNING] Creation operation for Composer Environment %q failed, check Environment isn't still running", id)
// Try to get possible created but invalid environment.
env, err := config.clientComposer.Projects.Locations.Environments.Get(envName.resourceName()).Do()
if err != nil {
// If error is 401, we don't have to clean up environment, return nil.
// Otherwise, we encountered another error.
return handleNotFoundError(err, d, fmt.Sprintf("Composer Environment %q", envName.resourceName()))
}
if env.State == "CREATING" {
return fmt.Errorf(
"Getting creation operation state failed while waiting for environment to finish creating, "+
"but environment seems to still be in 'CREATING' state. Wait for operation to finish and either "+
"manually delete environment or import %q into your state", id)
}
log.Printf("[WARNING] Environment %q from failed creation operation was created, deleting.", id)
op, err := config.clientComposer.Projects.Locations.Environments.Delete(envName.resourceName()).Do()
if err != nil {
return fmt.Errorf("Could not delete the invalid created environment with state %q: %s", env.State, err)
}
waitErr := composerOperationWaitTime(
config.clientComposer, op, envName.Project,
fmt.Sprintf("Deleting invalid created Environment with state %q", env.State),
int(d.Timeout(schema.TimeoutCreate).Minutes()))
if waitErr != nil {
return fmt.Errorf("Error waiting to delete invalid Environment with state %q: %s", env.State, waitErr)
}
return nil
}
func getComposerEnvironmentPostCreateUpdateObj(env *composer.Environment) (updateEnv *composer.Environment) {
// pypiPackages can only be added via update
if env != nil && env.Config != nil && env.Config.SoftwareConfig != nil {
if len(env.Config.SoftwareConfig.PypiPackages) > 0 {
updateEnv = &composer.Environment{
Config: &composer.EnvironmentConfig{
SoftwareConfig: &composer.SoftwareConfig{
PypiPackages: env.Config.SoftwareConfig.PypiPackages,
},
},
}
// Clear PYPI packages - otherwise, API will return error
// that the create request is invalid.
env.Config.SoftwareConfig.PypiPackages = make(map[string]string)
}
}
return updateEnv
}
func resourceComposerEnvironmentName(d *schema.ResourceData, config *Config) (*composerEnvironmentName, error) {
project, err := getProject(d, config)
if err != nil {
return nil, err
}
region, err := getRegion(d, config)
if err != nil {
return nil, err
}
return &composerEnvironmentName{
Project: project,
Region: region,
Environment: d.Get("name").(string),
}, nil
}
type composerEnvironmentName struct {
Project string
Region string
Environment string
}
func (n *composerEnvironmentName) resourceName() string {
return fmt.Sprintf("projects/%s/locations/%s/environments/%s", n.Project, n.Region, n.Environment)
}
func (n *composerEnvironmentName) parentName() string {
return fmt.Sprintf("projects/%s/locations/%s", n.Project, n.Region)
}
// The value we store (i.e. `old` in this method), might be only the service account email,
// but we expect either the email or the name (projects/.../serviceAccounts/...)
func compareServiceAccountEmailToLink(_, old, new string, _ *schema.ResourceData) bool {
// old is the service account email returned from the server.
if !strings.HasPrefix("projects/", old) {
return old == GetResourceNameFromSelfLink(new)
}
return compareSelfLinkRelativePaths("", old, new, nil)
}
func validateServiceAccountRelativeNameOrEmail(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
serviceAccountRe := "(" + strings.Join(PossibleServiceAccountNames, "|") + ")"
if strings.HasPrefix(value, "projects/") {
serviceAccountRe = fmt.Sprintf("projects/(.+)/serviceAccounts/%s", serviceAccountRe)
}
r := regexp.MustCompile(serviceAccountRe)
if !r.MatchString(value) {
errors = append(errors, fmt.Errorf(
"%q (%q) doesn't match regexp %q", k, value, serviceAccountRe))
}
return
}

View File

@ -0,0 +1,491 @@
package google
import (
"fmt"
"testing"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
"google.golang.org/api/composer/v1"
"google.golang.org/api/storage/v1"
"log"
"strings"
)
const testComposerEnvironmentPrefix = "tf-cc-testenv"
const testComposerNetworkPrefix = "tf-cc-testnet"
func init() {
resource.AddTestSweepers("gcp_composer_environment", &resource.Sweeper{
Name: "gcp_composer_environment",
F: testSweepComposerResources,
})
}
// Checks environment creation with minimum required information.
func TestAccComposerEnvironment_basic(t *testing.T) {
t.Parallel()
envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccComposerEnvironmentDestroy,
Steps: []resource.TestStep{
{
Config: testAccComposerEnvironment_basic(envName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.airflow_uri"),
resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.gke_cluster"),
resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.node_count"),
resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.node_config.0.zone"),
resource.TestCheckResourceAttrSet("google_composer_environment.test", "config.0.node_config.0.machine_type")),
},
{
ResourceName: "google_composer_environment.test",
ImportState: true,
ImportStateVerify: true,
},
{
ResourceName: "google_composer_environment.test",
ImportState: true,
ImportStateId: fmt.Sprintf("projects/%s/locations/%s/environments/%s", getTestProjectFromEnv(), "us-central1", envName),
ImportStateVerify: true,
},
},
})
}
// Checks that all updatable fields can be updated in one apply
// (PATCH for Environments only is per-field)
func TestAccComposerEnvironment_update(t *testing.T) {
t.Parallel()
envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix)
var env composer.Environment
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccComposerEnvironmentDestroy,
Steps: []resource.TestStep{
{
Config: testAccComposerEnvironment_basic(envName),
Check: resource.ComposeTestCheckFunc(
testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env),
),
},
{
Config: testAccComposerEnvironment_update(envName),
Check: resource.ComposeTestCheckFunc(
testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env),
),
},
{
ResourceName: "google_composer_environment.test",
ImportState: true,
ImportStateVerify: true,
},
},
})
}
// Checks behavior of node config, including dependencies on Compute resources.
func TestAccComposerEnvironment_withNodeConfig(t *testing.T) {
t.Parallel()
envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix)
network := acctest.RandomWithPrefix(testComposerNetworkPrefix)
subnetwork := network + "-1"
serviceAccount := acctest.RandomWithPrefix("tf-test")
var env composer.Environment
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccComposerEnvironmentDestroy,
Steps: []resource.TestStep{
{
Config: testAccComposerEnvironment_nodeCfg(envName, network, subnetwork, serviceAccount),
Check: testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env),
},
{
ResourceName: "google_composer_environment.test",
ImportState: true,
ImportStateVerify: true,
},
// This is a terrible clean-up step in order to get destroy to succeed,
// due to dangling firewall rules left by the Composer Environment blocking network deletion.
// TODO(emilyye): Remove this check if firewall rules bug gets fixed by Composer.
{
PlanOnly: true,
ExpectNonEmptyPlan: false,
Config: testAccComposerEnvironment_nodeCfg(envName, network, subnetwork, serviceAccount),
Check: testAccCheckClearComposerEnvironmentFirewalls(network),
},
},
})
}
// Checks behavior of config for creation for attributes that must
// be updated during create.
func TestAccComposerEnvironment_withUpdateOnCreate(t *testing.T) {
t.Parallel()
envName := acctest.RandomWithPrefix(testComposerEnvironmentPrefix)
var env composer.Environment
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccComposerEnvironmentDestroy,
Steps: []resource.TestStep{
{
Config: testAccComposerEnvironment_updateOnlyFields(envName),
Check: resource.ComposeTestCheckFunc(
testAccCheckComposerEnvironmentExists("google_composer_environment.test", &env),
),
},
{
ResourceName: "google_composer_environment.test",
ImportState: true,
ImportStateVerify: true,
},
},
})
}
func testAccCheckComposerEnvironmentExists(n string, environment *composer.Environment) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set")
}
idTokens := strings.Split(rs.Primary.ID, "/")
if len(idTokens) != 3 {
return fmt.Errorf("Invalid ID %q, expected format {project}/{region}/{environment}", rs.Primary.ID)
}
envName := &composerEnvironmentName{
Project: idTokens[0],
Region: idTokens[1],
Environment: idTokens[2],
}
nameFromId := envName.resourceName()
config := testAccProvider.Meta().(*Config)
found, err := config.clientComposer.Projects.Locations.Environments.Get(nameFromId).Do()
if err != nil {
return err
}
if found.Name != nameFromId {
return fmt.Errorf("Environment not found")
}
*environment = *found
return nil
}
}
func testAccComposerEnvironmentDestroy(s *terraform.State) error {
config := testAccProvider.Meta().(*Config)
for _, rs := range s.RootModule().Resources {
if rs.Type != "google_composer_environment" {
continue
}
idTokens := strings.Split(rs.Primary.ID, "/")
if len(idTokens) != 3 {
return fmt.Errorf("Invalid ID %q, expected format {project}/{region}/{environment}", rs.Primary.ID)
}
envName := &composerEnvironmentName{
Project: idTokens[0],
Region: idTokens[1],
Environment: idTokens[2],
}
_, err := config.clientComposer.Projects.Locations.Environments.Get(envName.resourceName()).Do()
if err == nil {
return fmt.Errorf("environment %s still exists", envName.resourceName())
}
}
return nil
}
func testAccComposerEnvironment_basic(name string) string {
return fmt.Sprintf(`
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
}
`, name)
}
func testAccComposerEnvironment_update(name string) string {
return fmt.Sprintf(`
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
config {
node_count = 4
software_config {
airflow_config_overrides {
core-load_example = "True"
}
pypi_packages {
numpy = ""
}
env_variables {
FOO = "bar"
}
}
}
labels {
foo = "bar"
anotherlabel = "boo"
}
}
`, name)
}
func testAccComposerEnvironment_nodeCfg(environment, network, subnetwork, serviceAccount string) string {
return fmt.Sprintf(`
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
config {
node_config {
network = "${google_compute_network.test.self_link}"
subnetwork = "${google_compute_subnetwork.test.self_link}"
service_account = "${google_service_account.test.name}"
}
}
depends_on = ["google_project_iam_member.composer-worker"]
}
resource "google_compute_network" "test" {
name = "%s"
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "test" {
name = "%s"
ip_cidr_range = "10.2.0.0/16"
region = "us-central1"
network = "${google_compute_network.test.self_link}"
}
resource "google_service_account" "test" {
account_id = "%s"
display_name = "Test Service Account for Composer Environment"
}
resource "google_project_iam_member" "composer-worker" {
role = "roles/composer.worker"
member = "serviceAccount:${google_service_account.test.email}"
}
`, environment, network, subnetwork, serviceAccount)
}
func testAccComposerEnvironment_updateOnlyFields(name string) string {
return fmt.Sprintf(`
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
config {
software_config {
pypi_packages {
scipy = "==1.1.0"
}
}
}
}
`, name)
}
/**
* CLEAN UP HELPER FUNCTIONS
*/
func testSweepComposerResources(region string) error {
config, err := sharedConfigForRegion(region)
if err != nil {
return fmt.Errorf("error getting shared config for region: %s", err)
}
err = config.loadAndValidate()
if err != nil {
log.Fatalf("error loading: %s", err)
}
// Environments need to be cleaned up because the service is flaky.
if err := testSweepComposerEnvironments(config); err != nil {
return err
}
// Buckets need to be cleaned up because they just don't get deleted on purpose.
if err := testSweepComposerEnvironmentBuckets(config); err != nil {
return err
}
return nil
}
func testSweepComposerEnvironments(config *Config) error {
found, err := config.clientComposer.Projects.Locations.Environments.List(
fmt.Sprintf("projects/%s/locations/%s", config.Project, config.Region)).Do()
if err != nil {
return fmt.Errorf("error listing storage buckets for composer environment: %s", err)
}
if len(found.Environments) == 0 {
log.Printf("No environment need to be cleaned up")
return nil
}
var allErrors error
for _, e := range found.Environments {
switch e.State {
case "CREATING":
case "UPDATING":
allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete pending Environment %q with state %q", e.Name, e.State))
case "DELETING":
log.Printf("Environment %q is currently deleting", e.Name)
case "RUNNING":
case "ERROR":
default:
op, deleteErr := config.clientComposer.Projects.Locations.Environments.Delete(e.Name).Do()
if deleteErr != nil {
allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete environment %q: %s", e.Name, deleteErr))
continue
}
waitErr := composerOperationWaitTime(config.clientComposer, op, config.Project, "Sweeping old test environments", 10)
if waitErr != nil {
allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete environment %q: %s", e.Name, waitErr))
}
}
}
return allErrors
}
func testSweepComposerEnvironmentBuckets(config *Config) error {
artifactsBName := fmt.Sprintf("artifacts.%s.appspot.com", config.Project)
artifactBucket, err := config.clientStorage.Buckets.Get(artifactsBName).Do()
if err == nil {
if err := testSweepComposerEnvironmentCleanUpBucket(config, artifactBucket); err != nil {
return err
}
} else if isGoogleApiErrorWithCode(err, 404) {
log.Printf("could not find bucket %q to clean up", artifactsBName)
} else {
return err
}
found, err := config.clientStorage.Buckets.List(config.Project).Prefix(config.Region).Do()
if err != nil {
return fmt.Errorf("error listing storage buckets created when testing composer environment: %s", err)
}
if len(found.Items) == 0 {
log.Printf("No environment buckets need to be cleaned up")
return nil
}
for _, bucket := range found.Items {
if _, ok := bucket.Labels["goog-composer-environment"]; ok {
continue
}
if err := testSweepComposerEnvironmentCleanUpBucket(config, bucket); err != nil {
return err
}
}
return nil
}
func testSweepComposerEnvironmentCleanUpBucket(config *Config, bucket *storage.Bucket) error {
var allErrors error
objList, err := config.clientStorage.Objects.List(bucket.Name).Do()
if err != nil {
allErrors = multierror.Append(allErrors,
fmt.Errorf("Unable to list objects to delete for bucket %q: %s", bucket.Name, err))
}
for _, o := range objList.Items {
if err := config.clientStorage.Objects.Delete(bucket.Name, o.Name).Do(); err != nil {
allErrors = multierror.Append(allErrors,
fmt.Errorf("Unable to delete object %q from bucket %q: %s", o.Name, bucket.Name, err))
}
}
if err := config.clientStorage.Buckets.Delete(bucket.Name).Do(); err != nil {
allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete bucket %q: %s", bucket.Name, err))
}
if err := config.clientStorage.Buckets.Delete(bucket.Name).Do(); err != nil {
allErrors = multierror.Append(allErrors, fmt.Errorf("Unable to delete bucket %q: %s", bucket.Name, err))
}
if allErrors != nil {
return fmt.Errorf("Unable to clean up bucket %q: %v", bucket.Name, allErrors)
}
return nil
}
// WARNING: This is not actually a check and is a terrible clean-up step because Composer Environments
// have a bug that hasn't been fixed. Composer will add firewalls to non-default networks for environments
// but will not remove them when the Environment is deleted.
//
// Destroy test step for config with a network will fail unless we clean up the firewalls before.
func testAccCheckClearComposerEnvironmentFirewalls(networkName string) resource.TestCheckFunc {
return func(s *terraform.State) error {
config := testAccProvider.Meta().(*Config)
config.Project = getTestProjectFromEnv()
network, err := config.clientCompute.Networks.Get(getTestProjectFromEnv(), networkName).Do()
if err != nil {
return err
}
foundFirewalls, err := config.clientCompute.Firewalls.List(config.Project).Do()
if err != nil {
return fmt.Errorf("Unable to list firewalls for network %q: %s", network.Name, err)
}
var allErrors error
for _, firewall := range foundFirewalls.Items {
if !strings.HasPrefix(firewall.Name, testComposerNetworkPrefix) {
continue
}
log.Printf("[DEBUG] Deleting firewall %q for test-resource network %q", firewall.Name, network.Name)
op, err := config.clientCompute.Firewalls.Delete(config.Project, firewall.Name).Do()
if err != nil {
allErrors = multierror.Append(allErrors,
fmt.Errorf("Unable to delete firewalls for network %q: %s", network.Name, err))
continue
}
waitErr := computeOperationWait(config.clientCompute, op, config.Project,
"Sweeping test composer environment firewalls")
if waitErr != nil {
allErrors = multierror.Append(allErrors,
fmt.Errorf("Error while waiting to delete firewall %q: %s", firewall.Name, waitErr))
}
}
return allErrors
}
}

View File

@ -0,0 +1,316 @@
layout: "google"
page_title: "Google: google_composer_environment"
sidebar_current: "docs-google-composer-environment"
description: |-
An environment for running orchestration tasks.
---
# google\_composer\_environment
An environment for running orchestration tasks.
Environments run Apache Airflow software on Google infrastructure.
To get more information about Environments, see:
* [API documentation](https://cloud.google.com/composer/docs/reference/rest/)
* How-to Guides
* [Official Documentation](https://cloud.google.com/composer/docs)
* [Configuring Shared VPC for Composer Environments](https://cloud.google.com/composer/docs/how-to/managing/configuring-shared-vpc)
* [Apache Airflow Documentation](http://airflow.apache.org/)
~> **Warning:** We **STRONGLY** recommend you read the [GCP guides](https://cloud.google.com/composer/docs/how-to)
as the Environment resource requires a long deployment process and involves several layers of GCP infrastructure,
including a Kubernetes Engine cluster, Cloud Storage, and Compute networking resources. Due to limitations of the API,
Terraform will not be able to automatically find or manage many of these underlying resources. In particular:
* It can take up to one hour to create or update an environment resource. In addition, GCP may only detect some
errors in configuration when they are used (e.g. ~40-50 minutes into the creation process), and is prone to limited
error reporting. If you encounter confusing or uninformative errors, please verify your configuration is valid
against GCP Cloud Composer before filing bugs against the Terraform provider.
* **Environments create Google Cloud Storage buckets that do not get cleaned up automatically** on environment
deletion. [More about Composer's use of Cloud Storage](https://cloud.google.com/composer/docs/concepts/cloud-storage).
## Example Usage
### Basic Usage
```hcl
resource "google_composer_environment" "test" {
name = "my-composer-env"
region = "us-central1"
}
```
### With GKE and Compute Resource Dependencies
**NOTE** To use service accounts, you need to give `role/composer.worker` to the service account on any resources that may be created for the environment
(i.e. at a project level). This will probably require an explicit dependency
on the IAM policy binding (see `google_project_iam_member` below).
```hcl
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
config {
node_count = 4
node_config {
zone = "us-central1-a"
machine_type = "n1-standard-1"
network = "${google_compute_network.test.self_link}"
subnetwork = "${google_compute_subnetwork.test.self_link}"
service_account = "${google_service_account.test.name}"
}
}
depends_on = ["google_project_iam_member.composer-worker"]
}
resource "google_compute_network" "test" {
name = "composer-test-network"
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "test" {
name = "composer-test-subnetwork"
ip_cidr_range = "10.2.0.0/16"
region = "us-central1"
network = "${google_compute_network.test.self_link}"
}
resource "google_service_account" "test" {
account_id = "composer-env-account"
display_name = "Test Service Account for Composer Environment"
}
resource "google_project_iam_member" "composer-worker" {
role = "roles/composer.worker"
member = "serviceAccount:${google_service_account.test.email}"
}
```
### With Software (Airflow) Config
```hcl
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
config {
software_config {
airflow_config_overrides {
core-load_example = "True"
}
pypi_packages {
numpy = ""
scipy = "==1.1.0"
}
env_variables {
FOO = "bar"
}
}
}
}
```
## Argument Reference
The following arguments are supported:
* `name` -
(Required)
Name of the environment
- - -
* `config` -
(Optional)
Configuration parameters for this environment Structure is documented below.
* `labels` -
(Optional)
User-defined labels for this environment. The labels map can contain
no more than 64 entries. Entries of the labels map are UTF8 strings
that comply with the following restrictions:
Label keys must be between 1 and 63 characters long and must conform
to the following regular expression: `[a-z]([-a-z0-9]*[a-z0-9])?`.
Label values must be between 0 and 63 characters long and must
conform to the regular expression `([a-z]([-a-z0-9]*[a-z0-9])?)?`.
No more than 64 labels can be associated with a given environment.
Both keys and values must be <= 128 bytes in size.
* `region` -
(Optional)
The location or Compute Engine region for the environment.
* `project` - (Optional) The ID of the project in which the resource belongs.
If it is not provided, the provider project is used.
The `config` block supports:
* `node_count` -
(Optional)
The number of nodes in the Kubernetes Engine cluster that
will be used to run this environment.
* `node_config` -
(Optional)
The configuration used for the Kubernetes Engine cluster. Structure is documented below.
* `software_config` -
(Optional)
The configuration settings for software inside the environment. Structure is documented below.
The `node_config` block supports:
* `zone` -
(Optional)
The Compute Engine zone in which to deploy the VMs running the
Apache Airflow software, specified as the zone name or
relative resource name (e.g. "projects/{project}/zones/{zone}"). Must belong to the enclosing environment's project
and region.
If both zone and machineType are specified, machineType must belong to this zone. If neither is specified, the service
will pick default values in the specified resource's region. If only one of zone or machineType is specified, the
location information from the specified field will be used for the location-unspecified field.
* `machine_type` -
(Optional)
The Compute Engine machine type used for cluster instances,
specified as a name or relative resource name. For example:
"projects/{project}/zones/{zone}/machineTypes/{machineType}". Must belong to the enclosing environment's project and
region/zone.
If both zone and machineType are specified, machineType must belong to this zone. If neither is specified, the service
will pick default values in the specified resource's region. If only one of zone or machineType is specified, the
location information from the specified field will be used for the location-unspecified field.
* `network` -
(Optional)
The Compute Engine network to be used for machine
communications, specified as a self-link, relative resource name
(e.g. "projects/{project}/global/networks/{network}"), by name.
The network must belong to the environment's project. If unspecified, the "default" network ID in the environment's
project is used. If a Custom Subnet Network is provided, subnetwork must also be provided.
* `subnetwork` -
(Optional)
The Compute Engine subnetwork to be used for machine
communications, , specified as a self-link, relative resource name (e.g.
"projects/{project}/regions/{region}/subnetworks/{subnetwork}"), or by name. If subnetwork is provided,
network must also be provided and the subnetwork must belong to the enclosing environment's project and region.
* `disk_size_gb` -
(Optional)
The disk size in GB used for node VMs. Minimum size is 20GB.
If unspecified, defaults to 100GB. Cannot be updated.
* `oauth_scopes` -
(Optional)
The set of Google API scopes to be made available on all node
VMs. Cannot be updated. If empty, defaults to
`["https://www.googleapis.com/auth/cloud-platform"]`
* `service_account` -
(Optional)
The Google Cloud Platform Service Account to be used by the
node VMs. If a service account is not specified, the "default"
Compute Engine service account is used. Cannot be updated. If given,
note that the service account must have `roles/composer.worker`
for any GCP resources created under the Cloud Composer Environment.
* `tags` -
(Optional)
The list of instance tags applied to all node VMs. Tags are
used to identify valid sources or targets for network
firewalls. Each tag within the list must comply with RFC1035.
Cannot be updated.
The `software_config` block supports:
* `airflow_config_overrides` -
(Optional) Apache Airflow configuration properties to override. Property keys contain the section and property names,
separated by a hyphen, for example "core-dags_are_paused_at_creation".
Section names must not contain hyphens ("-"), opening square brackets ("["), or closing square brackets ("]").
The property name must not be empty and cannot contain "=" or ";". Section and property names cannot contain
characters: "." Apache Airflow configuration property names must be written in snake_case. Property values can
contain any character, and can be written in any lower/upper case format. Certain Apache Airflow configuration
property values are [blacklisted](https://cloud.google.com/composer/docs/concepts/airflow-configurations#airflow_configuration_blacklists),
and cannot be overridden.
* `pypi_packages` -
(Optional)
Custom Python Package Index (PyPI) packages to be installed
in the environment. Keys refer to the lowercase package name (e.g. "numpy"). Values are the lowercase extras and
version specifier (e.g. "==1.12.0", "[devel,gcp_api]", "[devel]>=1.8.2, <1.9.2"). To specify a package without
pinning it to a version specifier, use the empty string as the value.
* `env_variables` -
(Optional)
Additional environment variables to provide to the Apache Airflow scheduler, worker, and webserver processes.
Environment variable names must match the regular expression `[a-zA-Z_][a-zA-Z0-9_]*`.
They cannot specify Apache Airflow software configuration overrides (they cannot match the regular expression
`AIRFLOW__[A-Z0-9_]+__[A-Z0-9_]+`), and they cannot match any of the following reserved names:
```
AIRFLOW_HOME
C_FORCE_ROOT
CONTAINER_NAME
DAGS_FOLDER
GCP_PROJECT
GCS_BUCKET
GKE_CLUSTER_NAME
SQL_DATABASE
SQL_INSTANCE
SQL_PASSWORD
SQL_PROJECT
SQL_REGION
SQL_USER
```
## Attributes Reference
In addition to the arguments listed above, the following computed attributes are exported:
* `config.gke_cluster` -
The Kubernetes Engine cluster used to run this environment.
* `config.dag_gcs_prefix` -
The Cloud Storage prefix of the DAGs for this environment.
Although Cloud Storage objects reside in a flat namespace, a
hierarchical file tree can be simulated using '/'-delimited
object name prefixes. DAG objects for this environment
reside in a simulated directory with this prefix.
* `config.airflow_uri` -
The URI of the Apache Airflow Web UI hosted within this
environment.
* `config.software_config.image_version` -
The version of the software running in the environment. This encapsulates both the version of Cloud Composer
functionality and the version of Apache Airflow. It must match the regular expression
`composer-[0-9]+\.[0-9]+(\.[0-9]+)?-airflow-[0-9]+\.[0-9]+(\.[0-9]+.*)?`.
The Cloud Composer portion of the version is a semantic version.
The portion of the image version following 'airflow-' is an official Apache Airflow repository release name.
## Timeouts
This resource provides the following
[Timeouts](/docs/configuration/resources.html#timeouts) configuration options:
- `create` - Default is 60 minutes.
- `update` - Default is 60 minutes.
- `delete` - Default is 6 minutes.
## Import
Environment can be imported using any of these accepted formats:
```
$ terraform import google_composer_environment.default projects/{{project}}/locations/{{region}}/environments/{{name}}
$ terraform import google_composer_environment.default {{project}}/{{region}}/{{name}}
$ terraform import google_composer_environment.default {{name}}
```