From 365b13126428df3d5f0028b3e6e9c982d47cc1a4 Mon Sep 17 00:00:00 2001 From: Grant Gongaware Date: Thu, 20 Jul 2017 18:35:01 -0700 Subject: [PATCH] limit the number of parallel things happening via proxmox API --- proxmox/provider.go | 53 ++++++++++++++++++++++++++-------- proxmox/resource_vm_qemu.go | 57 ++++++++++++++++++++++++++++++++----- 2 files changed, 91 insertions(+), 19 deletions(-) diff --git a/proxmox/provider.go b/proxmox/provider.go index 8eb35c0..3933300 100644 --- a/proxmox/provider.go +++ b/proxmox/provider.go @@ -10,7 +10,12 @@ import ( ) type providerConfiguration struct { - Client *pxapi.Client + Client *pxapi.Client + MaxParallel int + CurrentParallel int + MaxVmId int + Mutex *sync.Mutex + Cond *sync.Cond } func Provider() *schema.Provider { @@ -36,6 +41,11 @@ func Provider() *schema.Provider { DefaultFunc: schema.EnvDefaultFunc("PM_API_URL", nil), Description: "https://host.fqdn:8006/api2/json", }, + "pm_parallel": { + Type: schema.TypeInt, + Optional: true, + Default: 4, + }, }, ResourcesMap: map[string]*schema.Resource{ @@ -54,8 +64,14 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { if err != nil { return nil, err } + var mut sync.Mutex return &providerConfiguration{ - Client: client, + Client: client, + MaxParallel: d.Get("pm_parallel").(int), + CurrentParallel: 0, + MaxVmId: 0, + Mutex: &mut, + Cond: sync.NewCond(&mut), }, nil } @@ -68,23 +84,36 @@ func getClient(pm_api_url string, pm_user string, pm_password string) (*pxapi.Cl return client, nil } -var mutex = &sync.Mutex{} -var maxVmId = 0 - -func nextVmId(client *pxapi.Client) (nextId int, err error) { - mutex.Lock() - if maxVmId == 0 { - maxVmId, err = pxapi.MaxVmId(client) +func nextVmId(pconf *providerConfiguration) (nextId int, err error) { + pconf.Mutex.Lock() + if pconf.MaxVmId == 0 { + pconf.MaxVmId, err = pxapi.MaxVmId(pconf.Client) if err != nil { return 0, err } } - maxVmId++ - nextId = maxVmId - mutex.Unlock() + pconf.MaxVmId++ + nextId = pconf.MaxVmId + pconf.Mutex.Unlock() return nextId, nil } +func pmParallelBegin(pconf *providerConfiguration) { + pconf.Mutex.Lock() + for pconf.CurrentParallel >= pconf.MaxParallel { + pconf.Cond.Wait() + } + pconf.CurrentParallel++ + pconf.Mutex.Unlock() +} + +func pmParallelEnd(pconf *providerConfiguration) { + pconf.Mutex.Lock() + pconf.CurrentParallel-- + pconf.Cond.Signal() + pconf.Mutex.Unlock() +} + func resourceId(targetNode string, resType string, vmId int) string { return fmt.Sprintf("%s/%s/%d", targetNode, resType, vmId) } diff --git a/proxmox/resource_vm_qemu.go b/proxmox/resource_vm_qemu.go index f1a2203..8b408db 100644 --- a/proxmox/resource_vm_qemu.go +++ b/proxmox/resource_vm_qemu.go @@ -123,7 +123,9 @@ func resourceVmQemu() *schema.Resource { } func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { - client := meta.(*providerConfiguration).Client + pconf := meta.(*providerConfiguration) + pmParallelBegin(pconf) + client := pconf.Client vmName := d.Get("name").(string) disk_gb := d.Get("disk_gb").(float64) config := pxapi.ConfigQemu{ @@ -146,8 +148,10 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { targetNode := d.Get("target_node").(string) if dupVmr != nil && forceCreate { + pmParallelEnd(pconf) return fmt.Errorf("Duplicate VM name (%s) with vmId: %d. Set force_create=false to recycle", vmName, dupVmr.VmId()) } else if dupVmr != nil && dupVmr.Node() != targetNode { + pmParallelEnd(pconf) return fmt.Errorf("Duplicate VM name (%s) with vmId: %d on different target_node=%s", vmName, dupVmr.VmId(), dupVmr.Node()) } @@ -155,8 +159,9 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { if vmr == nil { // get unique id - nextid, err := nextVmId(client) + nextid, err := nextVmId(pconf) if err != nil { + pmParallelEnd(pconf) return err } vmr = pxapi.NewVmRef(nextid) @@ -166,16 +171,22 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { if d.Get("clone").(string) != "" { sourceVmr, err := client.GetVmRefByName(d.Get("clone").(string)) if err != nil { + pmParallelEnd(pconf) return err } log.Print("[DEBUG] cloning VM") err = config.CloneVm(sourceVmr, vmr, client) if err != nil { + pmParallelEnd(pconf) return err } + // give sometime to proxmox to catchup + time.Sleep(5 * time.Second) + err = prepareDiskSize(client, vmr, disk_gb) if err != nil { + pmParallelEnd(pconf) return err } @@ -183,6 +194,7 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { config.QemuIso = d.Get("iso").(string) err := config.CreateVm(vmr, client) if err != nil { + pmParallelEnd(pconf) return err } } @@ -193,26 +205,40 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { err := config.UpdateConfig(vmr, client) if err != nil { + pmParallelEnd(pconf) return err } + + // give sometime to proxmox to catchup + time.Sleep(5 * time.Second) + err = prepareDiskSize(client, vmr, disk_gb) if err != nil { + pmParallelEnd(pconf) return err } } d.SetId(resourceId(targetNode, "qemu", vmr.VmId())) + // give sometime to proxmox to catchup + time.Sleep(5 * time.Second) + log.Print("[DEBUG] starting VM") _, err := client.StartVm(vmr) if err != nil { + pmParallelEnd(pconf) return err } log.Print("[DEBUG] setting up SSH forward") sshPort, err := pxapi.SshForwardUsernet(vmr, client) if err != nil { + pmParallelEnd(pconf) return err } + // Done with proxmox API, end parallel and do the SSH things + pmParallelEnd(pconf) + d.SetConnInfo(map[string]string{ "type": "ssh", "host": d.Get("ssh_forward_ip").(string), @@ -228,7 +254,7 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { case "ubuntu": // give sometime to bootup - time.Sleep(5 * time.Second) + time.Sleep(9 * time.Second) err = preProvisionUbuntu(d) if err != nil { return err @@ -236,7 +262,7 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { case "centos": // give sometime to bootup - time.Sleep(8 * time.Second) + time.Sleep(9 * time.Second) err = preProvisionCentos(d) if err != nil { return err @@ -245,13 +271,17 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { default: return fmt.Errorf("Unknown os_type: %s", d.Get("os_type").(string)) } + return nil } func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error { - client := meta.(*providerConfiguration).Client + pconf := meta.(*providerConfiguration) + pmParallelBegin(pconf) + client := pconf.Client vmr, err := client.GetVmRefByName(d.Get("name").(string)) if err != nil { + pmParallelEnd(pconf) return err } vmName := d.Get("name").(string) @@ -276,6 +306,7 @@ func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error { sshPort, err := pxapi.SshForwardUsernet(vmr, client) if err != nil { + pmParallelEnd(pconf) return err } d.SetConnInfo(map[string]string{ @@ -285,17 +316,21 @@ func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error { "user": d.Get("ssh_user").(string), "private_key": d.Get("ssh_private_key").(string), }) + pmParallelEnd(pconf) return nil } func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error { - client := meta.(*providerConfiguration).Client + pconf := meta.(*providerConfiguration) + pmParallelBegin(pconf) + client := pconf.Client vmr, err := client.GetVmRefByName(d.Get("name").(string)) if err != nil { return err } config, err := pxapi.NewConfigQemuFromApi(vmr, client) if err != nil { + pmParallelEnd(pconf) return err } d.SetId(resourceId(vmr.Node(), "qemu", vmr.VmId())) @@ -311,23 +346,31 @@ func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error { d.Set("nic", config.QemuNicModel) d.Set("bridge", config.QemuBrige) d.Set("vlan", config.QemuVlanTag) + pmParallelEnd(pconf) return nil } func resourceVmQemuImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { + // TODO: research proper import err := resourceVmQemuRead(d, meta) return []*schema.ResourceData{d}, err } func resourceVmQemuDelete(d *schema.ResourceData, meta interface{}) error { - client := meta.(*providerConfiguration).Client + pconf := meta.(*providerConfiguration) + pmParallelBegin(pconf) + client := pconf.Client vmId, _ := strconv.Atoi(path.Base(d.Id())) vmr := pxapi.NewVmRef(vmId) _, err := client.StopVm(vmr) if err != nil { + pmParallelEnd(pconf) return err } + // give sometime to proxmox to catchup + time.Sleep(2 * time.Second) _, err = client.DeleteVm(vmr) + pmParallelEnd(pconf) return err }