437 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			437 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
| package storage
 | |
| 
 | |
| // Copyright 2017 Microsoft Corporation
 | |
| //
 | |
| //  Licensed under the Apache License, Version 2.0 (the "License");
 | |
| //  you may not use this file except in compliance with the License.
 | |
| //  You may obtain a copy of the License at
 | |
| //
 | |
| //      http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| //  Unless required by applicable law or agreed to in writing, software
 | |
| //  distributed under the License is distributed on an "AS IS" BASIS,
 | |
| //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| //  See the License for the specific language governing permissions and
 | |
| //  limitations under the License.
 | |
| 
 | |
| import (
 | |
| 	"encoding/xml"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"strconv"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// casing is per Golang's http.Header canonicalizing the header names.
 | |
| 	approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
 | |
| )
 | |
| 
 | |
| // QueueAccessPolicy represents each access policy in the queue ACL.
 | |
| type QueueAccessPolicy struct {
 | |
| 	ID         string
 | |
| 	StartTime  time.Time
 | |
| 	ExpiryTime time.Time
 | |
| 	CanRead    bool
 | |
| 	CanAdd     bool
 | |
| 	CanUpdate  bool
 | |
| 	CanProcess bool
 | |
| }
 | |
| 
 | |
| // QueuePermissions represents the queue ACLs.
 | |
| type QueuePermissions struct {
 | |
| 	AccessPolicies []QueueAccessPolicy
 | |
| }
 | |
| 
 | |
| // SetQueuePermissionOptions includes options for a set queue permissions operation
 | |
| type SetQueuePermissionOptions struct {
 | |
| 	Timeout   uint
 | |
| 	RequestID string `header:"x-ms-client-request-id"`
 | |
| }
 | |
| 
 | |
| // Queue represents an Azure queue.
 | |
| type Queue struct {
 | |
| 	qsc               *QueueServiceClient
 | |
| 	Name              string
 | |
| 	Metadata          map[string]string
 | |
| 	AproxMessageCount uint64
 | |
| }
 | |
| 
 | |
| func (q *Queue) buildPath() string {
 | |
| 	return fmt.Sprintf("/%s", q.Name)
 | |
| }
 | |
| 
 | |
| func (q *Queue) buildPathMessages() string {
 | |
| 	return fmt.Sprintf("%s/messages", q.buildPath())
 | |
| }
 | |
| 
 | |
| // QueueServiceOptions includes options for some queue service operations
 | |
| type QueueServiceOptions struct {
 | |
| 	Timeout   uint
 | |
| 	RequestID string `header:"x-ms-client-request-id"`
 | |
| }
 | |
| 
 | |
| // Create operation creates a queue under the given account.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Create-Queue4
 | |
| func (q *Queue) Create(options *QueueServiceOptions) error {
 | |
| 	params := url.Values{}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 	headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
 | |
| 
 | |
| 	resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer drainRespBody(resp)
 | |
| 	return checkRespCode(resp, []int{http.StatusCreated})
 | |
| }
 | |
| 
 | |
| // Delete operation permanently deletes the specified queue.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Delete-Queue3
 | |
| func (q *Queue) Delete(options *QueueServiceOptions) error {
 | |
| 	params := url.Values{}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
 | |
| 	resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer drainRespBody(resp)
 | |
| 	return checkRespCode(resp, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // Exists returns true if a queue with given name exists.
 | |
| func (q *Queue) Exists() (bool, error) {
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}})
 | |
| 	resp, err := q.qsc.client.exec(http.MethodGet, uri, q.qsc.client.getStandardHeaders(), nil, q.qsc.auth)
 | |
| 	if resp != nil {
 | |
| 		defer drainRespBody(resp)
 | |
| 		if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotFound {
 | |
| 			return resp.StatusCode == http.StatusOK, nil
 | |
| 		}
 | |
| 		err = getErrorFromResponse(resp)
 | |
| 	}
 | |
| 	return false, err
 | |
| }
 | |
| 
 | |
| // SetMetadata operation sets user-defined metadata on the specified queue.
 | |
| // Metadata is associated with the queue as name-value pairs.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
 | |
| func (q *Queue) SetMetadata(options *QueueServiceOptions) error {
 | |
| 	params := url.Values{"comp": {"metadata"}}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 	headers = q.qsc.client.addMetadataToHeaders(headers, q.Metadata)
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
 | |
| 
 | |
| 	resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer drainRespBody(resp)
 | |
| 	return checkRespCode(resp, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // GetMetadata operation retrieves user-defined metadata and queue
 | |
| // properties on the specified queue. Metadata is associated with
 | |
| // the queue as name-values pairs.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Set-Queue-Metadata
 | |
| //
 | |
| // Because the way Golang's http client (and http.Header in particular)
 | |
| // canonicalize header names, the returned metadata names would always
 | |
| // be all lower case.
 | |
| func (q *Queue) GetMetadata(options *QueueServiceOptions) error {
 | |
| 	params := url.Values{"comp": {"metadata"}}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), url.Values{"comp": {"metadata"}})
 | |
| 
 | |
| 	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer drainRespBody(resp)
 | |
| 
 | |
| 	if err := checkRespCode(resp, []int{http.StatusOK}); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	aproxMessagesStr := resp.Header.Get(http.CanonicalHeaderKey(approximateMessagesCountHeader))
 | |
| 	if aproxMessagesStr != "" {
 | |
| 		aproxMessages, err := strconv.ParseUint(aproxMessagesStr, 10, 64)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		q.AproxMessageCount = aproxMessages
 | |
| 	}
 | |
| 
 | |
| 	q.Metadata = getMetadataFromHeaders(resp.Header)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetMessageReference returns a message object with the specified text.
 | |
| func (q *Queue) GetMessageReference(text string) *Message {
 | |
| 	return &Message{
 | |
| 		Queue: q,
 | |
| 		Text:  text,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetMessagesOptions is the set of options can be specified for Get
 | |
| // Messsages operation. A zero struct does not use any preferences for the
 | |
| // request.
 | |
| type GetMessagesOptions struct {
 | |
| 	Timeout           uint
 | |
| 	NumOfMessages     int
 | |
| 	VisibilityTimeout int
 | |
| 	RequestID         string `header:"x-ms-client-request-id"`
 | |
| }
 | |
| 
 | |
| type messages struct {
 | |
| 	XMLName  xml.Name  `xml:"QueueMessagesList"`
 | |
| 	Messages []Message `xml:"QueueMessage"`
 | |
| }
 | |
| 
 | |
| // GetMessages operation retrieves one or more messages from the front of the
 | |
| // queue.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Get-Messages
 | |
| func (q *Queue) GetMessages(options *GetMessagesOptions) ([]Message, error) {
 | |
| 	query := url.Values{}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 
 | |
| 	if options != nil {
 | |
| 		if options.NumOfMessages != 0 {
 | |
| 			query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
 | |
| 		}
 | |
| 		if options.VisibilityTimeout != 0 {
 | |
| 			query.Set("visibilitytimeout", strconv.Itoa(options.VisibilityTimeout))
 | |
| 		}
 | |
| 		query = addTimeout(query, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
 | |
| 
 | |
| 	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return []Message{}, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	var out messages
 | |
| 	err = xmlUnmarshal(resp.Body, &out)
 | |
| 	if err != nil {
 | |
| 		return []Message{}, err
 | |
| 	}
 | |
| 	for i := range out.Messages {
 | |
| 		out.Messages[i].Queue = q
 | |
| 	}
 | |
| 	return out.Messages, err
 | |
| }
 | |
| 
 | |
| // PeekMessagesOptions is the set of options can be specified for Peek
 | |
| // Messsage operation. A zero struct does not use any preferences for the
 | |
| // request.
 | |
| type PeekMessagesOptions struct {
 | |
| 	Timeout       uint
 | |
| 	NumOfMessages int
 | |
| 	RequestID     string `header:"x-ms-client-request-id"`
 | |
| }
 | |
| 
 | |
| // PeekMessages retrieves one or more messages from the front of the queue, but
 | |
| // does not alter the visibility of the message.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Peek-Messages
 | |
| func (q *Queue) PeekMessages(options *PeekMessagesOptions) ([]Message, error) {
 | |
| 	query := url.Values{"peekonly": {"true"}} // Required for peek operation
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 
 | |
| 	if options != nil {
 | |
| 		if options.NumOfMessages != 0 {
 | |
| 			query.Set("numofmessages", strconv.Itoa(options.NumOfMessages))
 | |
| 		}
 | |
| 		query = addTimeout(query, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), query)
 | |
| 
 | |
| 	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return []Message{}, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	var out messages
 | |
| 	err = xmlUnmarshal(resp.Body, &out)
 | |
| 	if err != nil {
 | |
| 		return []Message{}, err
 | |
| 	}
 | |
| 	for i := range out.Messages {
 | |
| 		out.Messages[i].Queue = q
 | |
| 	}
 | |
| 	return out.Messages, err
 | |
| }
 | |
| 
 | |
| // ClearMessages operation deletes all messages from the specified queue.
 | |
| //
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/Clear-Messages
 | |
| func (q *Queue) ClearMessages(options *QueueServiceOptions) error {
 | |
| 	params := url.Values{}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPathMessages(), params)
 | |
| 
 | |
| 	resp, err := q.qsc.client.exec(http.MethodDelete, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer drainRespBody(resp)
 | |
| 	return checkRespCode(resp, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // SetPermissions sets up queue permissions
 | |
| // See https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/set-queue-acl
 | |
| func (q *Queue) SetPermissions(permissions QueuePermissions, options *SetQueuePermissionOptions) error {
 | |
| 	body, length, err := generateQueueACLpayload(permissions.AccessPolicies)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	params := url.Values{
 | |
| 		"comp": {"acl"},
 | |
| 	}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 	headers["Content-Length"] = strconv.Itoa(length)
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
 | |
| 	resp, err := q.qsc.client.exec(http.MethodPut, uri, headers, body, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer drainRespBody(resp)
 | |
| 	return checkRespCode(resp, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| func generateQueueACLpayload(policies []QueueAccessPolicy) (io.Reader, int, error) {
 | |
| 	sil := SignedIdentifiers{
 | |
| 		SignedIdentifiers: []SignedIdentifier{},
 | |
| 	}
 | |
| 	for _, qapd := range policies {
 | |
| 		permission := qapd.generateQueuePermissions()
 | |
| 		signedIdentifier := convertAccessPolicyToXMLStructs(qapd.ID, qapd.StartTime, qapd.ExpiryTime, permission)
 | |
| 		sil.SignedIdentifiers = append(sil.SignedIdentifiers, signedIdentifier)
 | |
| 	}
 | |
| 	return xmlMarshal(sil)
 | |
| }
 | |
| 
 | |
| func (qapd *QueueAccessPolicy) generateQueuePermissions() (permissions string) {
 | |
| 	// generate the permissions string (raup).
 | |
| 	// still want the end user API to have bool flags.
 | |
| 	permissions = ""
 | |
| 
 | |
| 	if qapd.CanRead {
 | |
| 		permissions += "r"
 | |
| 	}
 | |
| 
 | |
| 	if qapd.CanAdd {
 | |
| 		permissions += "a"
 | |
| 	}
 | |
| 
 | |
| 	if qapd.CanUpdate {
 | |
| 		permissions += "u"
 | |
| 	}
 | |
| 
 | |
| 	if qapd.CanProcess {
 | |
| 		permissions += "p"
 | |
| 	}
 | |
| 
 | |
| 	return permissions
 | |
| }
 | |
| 
 | |
| // GetQueuePermissionOptions includes options for a get queue permissions operation
 | |
| type GetQueuePermissionOptions struct {
 | |
| 	Timeout   uint
 | |
| 	RequestID string `header:"x-ms-client-request-id"`
 | |
| }
 | |
| 
 | |
| // GetPermissions gets the queue permissions as per https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/get-queue-acl
 | |
| // If timeout is 0 then it will not be passed to Azure
 | |
| func (q *Queue) GetPermissions(options *GetQueuePermissionOptions) (*QueuePermissions, error) {
 | |
| 	params := url.Values{
 | |
| 		"comp": {"acl"},
 | |
| 	}
 | |
| 	headers := q.qsc.client.getStandardHeaders()
 | |
| 
 | |
| 	if options != nil {
 | |
| 		params = addTimeout(params, options.Timeout)
 | |
| 		headers = mergeHeaders(headers, headersFromStruct(*options))
 | |
| 	}
 | |
| 	uri := q.qsc.client.getEndpoint(queueServiceName, q.buildPath(), params)
 | |
| 	resp, err := q.qsc.client.exec(http.MethodGet, uri, headers, nil, q.qsc.auth)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	var ap AccessPolicy
 | |
| 	err = xmlUnmarshal(resp.Body, &ap.SignedIdentifiersList)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return buildQueueAccessPolicy(ap, &resp.Header), nil
 | |
| }
 | |
| 
 | |
| func buildQueueAccessPolicy(ap AccessPolicy, headers *http.Header) *QueuePermissions {
 | |
| 	permissions := QueuePermissions{
 | |
| 		AccessPolicies: []QueueAccessPolicy{},
 | |
| 	}
 | |
| 
 | |
| 	for _, policy := range ap.SignedIdentifiersList.SignedIdentifiers {
 | |
| 		qapd := QueueAccessPolicy{
 | |
| 			ID:         policy.ID,
 | |
| 			StartTime:  policy.AccessPolicy.StartTime,
 | |
| 			ExpiryTime: policy.AccessPolicy.ExpiryTime,
 | |
| 		}
 | |
| 		qapd.CanRead = updatePermissions(policy.AccessPolicy.Permission, "r")
 | |
| 		qapd.CanAdd = updatePermissions(policy.AccessPolicy.Permission, "a")
 | |
| 		qapd.CanUpdate = updatePermissions(policy.AccessPolicy.Permission, "u")
 | |
| 		qapd.CanProcess = updatePermissions(policy.AccessPolicy.Permission, "p")
 | |
| 
 | |
| 		permissions.AccessPolicies = append(permissions.AccessPolicies, qapd)
 | |
| 	}
 | |
| 	return &permissions
 | |
| }
 |