limit the number of parallel things happening via proxmox API

This commit is contained in:
Grant Gongaware 2017-07-20 18:35:01 -07:00
parent 2f2c5dff98
commit 365b131264
2 changed files with 91 additions and 19 deletions

View File

@ -10,7 +10,12 @@ import (
) )
type providerConfiguration struct { type providerConfiguration struct {
Client *pxapi.Client Client *pxapi.Client
MaxParallel int
CurrentParallel int
MaxVmId int
Mutex *sync.Mutex
Cond *sync.Cond
} }
func Provider() *schema.Provider { func Provider() *schema.Provider {
@ -36,6 +41,11 @@ func Provider() *schema.Provider {
DefaultFunc: schema.EnvDefaultFunc("PM_API_URL", nil), DefaultFunc: schema.EnvDefaultFunc("PM_API_URL", nil),
Description: "https://host.fqdn:8006/api2/json", Description: "https://host.fqdn:8006/api2/json",
}, },
"pm_parallel": {
Type: schema.TypeInt,
Optional: true,
Default: 4,
},
}, },
ResourcesMap: map[string]*schema.Resource{ ResourcesMap: map[string]*schema.Resource{
@ -54,8 +64,14 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var mut sync.Mutex
return &providerConfiguration{ return &providerConfiguration{
Client: client, Client: client,
MaxParallel: d.Get("pm_parallel").(int),
CurrentParallel: 0,
MaxVmId: 0,
Mutex: &mut,
Cond: sync.NewCond(&mut),
}, nil }, nil
} }
@ -68,23 +84,36 @@ func getClient(pm_api_url string, pm_user string, pm_password string) (*pxapi.Cl
return client, nil return client, nil
} }
var mutex = &sync.Mutex{} func nextVmId(pconf *providerConfiguration) (nextId int, err error) {
var maxVmId = 0 pconf.Mutex.Lock()
if pconf.MaxVmId == 0 {
func nextVmId(client *pxapi.Client) (nextId int, err error) { pconf.MaxVmId, err = pxapi.MaxVmId(pconf.Client)
mutex.Lock()
if maxVmId == 0 {
maxVmId, err = pxapi.MaxVmId(client)
if err != nil { if err != nil {
return 0, err return 0, err
} }
} }
maxVmId++ pconf.MaxVmId++
nextId = maxVmId nextId = pconf.MaxVmId
mutex.Unlock() pconf.Mutex.Unlock()
return nextId, nil return nextId, nil
} }
func pmParallelBegin(pconf *providerConfiguration) {
pconf.Mutex.Lock()
for pconf.CurrentParallel >= pconf.MaxParallel {
pconf.Cond.Wait()
}
pconf.CurrentParallel++
pconf.Mutex.Unlock()
}
func pmParallelEnd(pconf *providerConfiguration) {
pconf.Mutex.Lock()
pconf.CurrentParallel--
pconf.Cond.Signal()
pconf.Mutex.Unlock()
}
func resourceId(targetNode string, resType string, vmId int) string { func resourceId(targetNode string, resType string, vmId int) string {
return fmt.Sprintf("%s/%s/%d", targetNode, resType, vmId) return fmt.Sprintf("%s/%s/%d", targetNode, resType, vmId)
} }

View File

@ -123,7 +123,9 @@ func resourceVmQemu() *schema.Resource {
} }
func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error { func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmName := d.Get("name").(string) vmName := d.Get("name").(string)
disk_gb := d.Get("disk_gb").(float64) disk_gb := d.Get("disk_gb").(float64)
config := pxapi.ConfigQemu{ config := pxapi.ConfigQemu{
@ -146,8 +148,10 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
targetNode := d.Get("target_node").(string) targetNode := d.Get("target_node").(string)
if dupVmr != nil && forceCreate { if dupVmr != nil && forceCreate {
pmParallelEnd(pconf)
return fmt.Errorf("Duplicate VM name (%s) with vmId: %d. Set force_create=false to recycle", vmName, dupVmr.VmId()) return fmt.Errorf("Duplicate VM name (%s) with vmId: %d. Set force_create=false to recycle", vmName, dupVmr.VmId())
} else if dupVmr != nil && dupVmr.Node() != targetNode { } else if dupVmr != nil && dupVmr.Node() != targetNode {
pmParallelEnd(pconf)
return fmt.Errorf("Duplicate VM name (%s) with vmId: %d on different target_node=%s", vmName, dupVmr.VmId(), dupVmr.Node()) return fmt.Errorf("Duplicate VM name (%s) with vmId: %d on different target_node=%s", vmName, dupVmr.VmId(), dupVmr.Node())
} }
@ -155,8 +159,9 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
if vmr == nil { if vmr == nil {
// get unique id // get unique id
nextid, err := nextVmId(client) nextid, err := nextVmId(pconf)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
vmr = pxapi.NewVmRef(nextid) vmr = pxapi.NewVmRef(nextid)
@ -166,16 +171,22 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
if d.Get("clone").(string) != "" { if d.Get("clone").(string) != "" {
sourceVmr, err := client.GetVmRefByName(d.Get("clone").(string)) sourceVmr, err := client.GetVmRefByName(d.Get("clone").(string))
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
log.Print("[DEBUG] cloning VM") log.Print("[DEBUG] cloning VM")
err = config.CloneVm(sourceVmr, vmr, client) err = config.CloneVm(sourceVmr, vmr, client)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
// give sometime to proxmox to catchup
time.Sleep(5 * time.Second)
err = prepareDiskSize(client, vmr, disk_gb) err = prepareDiskSize(client, vmr, disk_gb)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
@ -183,6 +194,7 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
config.QemuIso = d.Get("iso").(string) config.QemuIso = d.Get("iso").(string)
err := config.CreateVm(vmr, client) err := config.CreateVm(vmr, client)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
} }
@ -193,26 +205,40 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
err := config.UpdateConfig(vmr, client) err := config.UpdateConfig(vmr, client)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
// give sometime to proxmox to catchup
time.Sleep(5 * time.Second)
err = prepareDiskSize(client, vmr, disk_gb) err = prepareDiskSize(client, vmr, disk_gb)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
} }
d.SetId(resourceId(targetNode, "qemu", vmr.VmId())) d.SetId(resourceId(targetNode, "qemu", vmr.VmId()))
// give sometime to proxmox to catchup
time.Sleep(5 * time.Second)
log.Print("[DEBUG] starting VM") log.Print("[DEBUG] starting VM")
_, err := client.StartVm(vmr) _, err := client.StartVm(vmr)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
log.Print("[DEBUG] setting up SSH forward") log.Print("[DEBUG] setting up SSH forward")
sshPort, err := pxapi.SshForwardUsernet(vmr, client) sshPort, err := pxapi.SshForwardUsernet(vmr, client)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
// Done with proxmox API, end parallel and do the SSH things
pmParallelEnd(pconf)
d.SetConnInfo(map[string]string{ d.SetConnInfo(map[string]string{
"type": "ssh", "type": "ssh",
"host": d.Get("ssh_forward_ip").(string), "host": d.Get("ssh_forward_ip").(string),
@ -228,7 +254,7 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
case "ubuntu": case "ubuntu":
// give sometime to bootup // give sometime to bootup
time.Sleep(5 * time.Second) time.Sleep(9 * time.Second)
err = preProvisionUbuntu(d) err = preProvisionUbuntu(d)
if err != nil { if err != nil {
return err return err
@ -236,7 +262,7 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
case "centos": case "centos":
// give sometime to bootup // give sometime to bootup
time.Sleep(8 * time.Second) time.Sleep(9 * time.Second)
err = preProvisionCentos(d) err = preProvisionCentos(d)
if err != nil { if err != nil {
return err return err
@ -245,13 +271,17 @@ func resourceVmQemuCreate(d *schema.ResourceData, meta interface{}) error {
default: default:
return fmt.Errorf("Unknown os_type: %s", d.Get("os_type").(string)) return fmt.Errorf("Unknown os_type: %s", d.Get("os_type").(string))
} }
return nil return nil
} }
func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error { func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmr, err := client.GetVmRefByName(d.Get("name").(string)) vmr, err := client.GetVmRefByName(d.Get("name").(string))
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
vmName := d.Get("name").(string) vmName := d.Get("name").(string)
@ -276,6 +306,7 @@ func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error {
sshPort, err := pxapi.SshForwardUsernet(vmr, client) sshPort, err := pxapi.SshForwardUsernet(vmr, client)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
d.SetConnInfo(map[string]string{ d.SetConnInfo(map[string]string{
@ -285,17 +316,21 @@ func resourceVmQemuUpdate(d *schema.ResourceData, meta interface{}) error {
"user": d.Get("ssh_user").(string), "user": d.Get("ssh_user").(string),
"private_key": d.Get("ssh_private_key").(string), "private_key": d.Get("ssh_private_key").(string),
}) })
pmParallelEnd(pconf)
return nil return nil
} }
func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error { func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmr, err := client.GetVmRefByName(d.Get("name").(string)) vmr, err := client.GetVmRefByName(d.Get("name").(string))
if err != nil { if err != nil {
return err return err
} }
config, err := pxapi.NewConfigQemuFromApi(vmr, client) config, err := pxapi.NewConfigQemuFromApi(vmr, client)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
d.SetId(resourceId(vmr.Node(), "qemu", vmr.VmId())) d.SetId(resourceId(vmr.Node(), "qemu", vmr.VmId()))
@ -311,23 +346,31 @@ func resourceVmQemuRead(d *schema.ResourceData, meta interface{}) error {
d.Set("nic", config.QemuNicModel) d.Set("nic", config.QemuNicModel)
d.Set("bridge", config.QemuBrige) d.Set("bridge", config.QemuBrige)
d.Set("vlan", config.QemuVlanTag) d.Set("vlan", config.QemuVlanTag)
pmParallelEnd(pconf)
return nil return nil
} }
func resourceVmQemuImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { func resourceVmQemuImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
// TODO: research proper import
err := resourceVmQemuRead(d, meta) err := resourceVmQemuRead(d, meta)
return []*schema.ResourceData{d}, err return []*schema.ResourceData{d}, err
} }
func resourceVmQemuDelete(d *schema.ResourceData, meta interface{}) error { func resourceVmQemuDelete(d *schema.ResourceData, meta interface{}) error {
client := meta.(*providerConfiguration).Client pconf := meta.(*providerConfiguration)
pmParallelBegin(pconf)
client := pconf.Client
vmId, _ := strconv.Atoi(path.Base(d.Id())) vmId, _ := strconv.Atoi(path.Base(d.Id()))
vmr := pxapi.NewVmRef(vmId) vmr := pxapi.NewVmRef(vmId)
_, err := client.StopVm(vmr) _, err := client.StopVm(vmr)
if err != nil { if err != nil {
pmParallelEnd(pconf)
return err return err
} }
// give sometime to proxmox to catchup
time.Sleep(2 * time.Second)
_, err = client.DeleteVm(vmr) _, err = client.DeleteVm(vmr)
pmParallelEnd(pconf)
return err return err
} }