340 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			340 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
| package storage
 | |
| 
 | |
| import (
 | |
| 	"encoding/xml"
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// casing is per Golang's http.Header canonicalizing the header names.
 | |
| 	approximateMessagesCountHeader  = "X-Ms-Approximate-Messages-Count"
 | |
| 	userDefinedMetadataHeaderPrefix = "X-Ms-Meta-"
 | |
| )
 | |
| 
 | |
| func pathForQueue(queue string) string         { return fmt.Sprintf("/%s", queue) }
 | |
| func pathForQueueMessages(queue string) string { return fmt.Sprintf("/%s/messages", queue) }
 | |
| func pathForMessage(queue, name string) string { return fmt.Sprintf("/%s/messages/%s", queue, name) }
 | |
| 
 | |
| type putMessageRequest struct {
 | |
| 	XMLName     xml.Name `xml:"QueueMessage"`
 | |
| 	MessageText string   `xml:"MessageText"`
 | |
| }
 | |
| 
 | |
| // PutMessageParameters is the set of options can be specified for Put Messsage
 | |
| // operation. A zero struct does not use any preferences for the request.
 | |
| type PutMessageParameters struct {
 | |
| 	VisibilityTimeout int
 | |
| 	MessageTTL        int
 | |
| }
 | |
| 
 | |
| func (p PutMessageParameters) getParameters() url.Values {
 | |
| 	out := url.Values{}
 | |
| 	if p.VisibilityTimeout != 0 {
 | |
| 		out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
 | |
| 	}
 | |
| 	if p.MessageTTL != 0 {
 | |
| 		out.Set("messagettl", strconv.Itoa(p.MessageTTL))
 | |
| 	}
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| // GetMessagesParameters is the set of options can be specified for Get
 | |
| // Messsages operation. A zero struct does not use any preferences for the
 | |
| // request.
 | |
| type GetMessagesParameters struct {
 | |
| 	NumOfMessages     int
 | |
| 	VisibilityTimeout int
 | |
| }
 | |
| 
 | |
| func (p GetMessagesParameters) getParameters() url.Values {
 | |
| 	out := url.Values{}
 | |
| 	if p.NumOfMessages != 0 {
 | |
| 		out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
 | |
| 	}
 | |
| 	if p.VisibilityTimeout != 0 {
 | |
| 		out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
 | |
| 	}
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| // PeekMessagesParameters is the set of options can be specified for Peek
 | |
| // Messsage operation. A zero struct does not use any preferences for the
 | |
| // request.
 | |
| type PeekMessagesParameters struct {
 | |
| 	NumOfMessages int
 | |
| }
 | |
| 
 | |
| func (p PeekMessagesParameters) getParameters() url.Values {
 | |
| 	out := url.Values{"peekonly": {"true"}} // Required for peek operation
 | |
| 	if p.NumOfMessages != 0 {
 | |
| 		out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
 | |
| 	}
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| // UpdateMessageParameters is the set of options can be specified for Update Messsage
 | |
| // operation. A zero struct does not use any preferences for the request.
 | |
| type UpdateMessageParameters struct {
 | |
| 	PopReceipt        string
 | |
| 	VisibilityTimeout int
 | |
| }
 | |
| 
 | |
| func (p UpdateMessageParameters) getParameters() url.Values {
 | |
| 	out := url.Values{}
 | |
| 	if p.PopReceipt != "" {
 | |
| 		out.Set("popreceipt", p.PopReceipt)
 | |
| 	}
 | |
| 	if p.VisibilityTimeout != 0 {
 | |
| 		out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
 | |
| 	}
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| // GetMessagesResponse represents a response returned from Get Messages
 | |
| // operation.
 | |
| type GetMessagesResponse struct {
 | |
| 	XMLName           xml.Name             `xml:"QueueMessagesList"`
 | |
| 	QueueMessagesList []GetMessageResponse `xml:"QueueMessage"`
 | |
| }
 | |
| 
 | |
| // GetMessageResponse represents a QueueMessage object returned from Get
 | |
| // Messages operation response.
 | |
| type GetMessageResponse struct {
 | |
| 	MessageID       string `xml:"MessageId"`
 | |
| 	InsertionTime   string `xml:"InsertionTime"`
 | |
| 	ExpirationTime  string `xml:"ExpirationTime"`
 | |
| 	PopReceipt      string `xml:"PopReceipt"`
 | |
| 	TimeNextVisible string `xml:"TimeNextVisible"`
 | |
| 	DequeueCount    int    `xml:"DequeueCount"`
 | |
| 	MessageText     string `xml:"MessageText"`
 | |
| }
 | |
| 
 | |
| // PeekMessagesResponse represents a response returned from Get Messages
 | |
| // operation.
 | |
| type PeekMessagesResponse struct {
 | |
| 	XMLName           xml.Name              `xml:"QueueMessagesList"`
 | |
| 	QueueMessagesList []PeekMessageResponse `xml:"QueueMessage"`
 | |
| }
 | |
| 
 | |
| // PeekMessageResponse represents a QueueMessage object returned from Peek
 | |
| // Messages operation response.
 | |
| type PeekMessageResponse struct {
 | |
| 	MessageID      string `xml:"MessageId"`
 | |
| 	InsertionTime  string `xml:"InsertionTime"`
 | |
| 	ExpirationTime string `xml:"ExpirationTime"`
 | |
| 	DequeueCount   int    `xml:"DequeueCount"`
 | |
| 	MessageText    string `xml:"MessageText"`
 | |
| }
 | |
| 
 | |
| // QueueMetadataResponse represents user defined metadata and queue
 | |
| // properties on a specific queue.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
 | |
| type QueueMetadataResponse struct {
 | |
| 	ApproximateMessageCount int
 | |
| 	UserDefinedMetadata     map[string]string
 | |
| }
 | |
| 
 | |
| // SetMetadata operation sets user-defined metadata on the specified queue.
 | |
| // Metadata is associated with the queue as name-value pairs.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179348.aspx
 | |
| func (c QueueServiceClient) SetMetadata(name string, metadata map[string]string) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
 | |
| 	metadata = c.client.protectUserAgent(metadata)
 | |
| 	headers := c.client.getStandardHeaders()
 | |
| 	for k, v := range metadata {
 | |
| 		headers[userDefinedMetadataHeaderPrefix+k] = v
 | |
| 	}
 | |
| 
 | |
| 	resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // GetMetadata operation retrieves user-defined metadata and queue
 | |
| // properties on the specified queue. Metadata is associated with
 | |
| // the queue as name-values pairs.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
 | |
| //
 | |
| // Because the way Golang's http client (and http.Header in particular)
 | |
| // canonicalize header names, the returned metadata names would always
 | |
| // be all lower case.
 | |
| func (c QueueServiceClient) GetMetadata(name string) (QueueMetadataResponse, error) {
 | |
| 	qm := QueueMetadataResponse{}
 | |
| 	qm.UserDefinedMetadata = make(map[string]string)
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
 | |
| 	headers := c.client.getStandardHeaders()
 | |
| 	resp, err := c.client.exec(http.MethodGet, uri, headers, nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return qm, err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 
 | |
| 	for k, v := range resp.headers {
 | |
| 		if len(v) != 1 {
 | |
| 			return qm, fmt.Errorf("Unexpected number of values (%d) in response header '%s'", len(v), k)
 | |
| 		}
 | |
| 
 | |
| 		value := v[0]
 | |
| 
 | |
| 		if k == approximateMessagesCountHeader {
 | |
| 			qm.ApproximateMessageCount, err = strconv.Atoi(value)
 | |
| 			if err != nil {
 | |
| 				return qm, fmt.Errorf("Unexpected value in response header '%s': '%s' ", k, value)
 | |
| 			}
 | |
| 		} else if strings.HasPrefix(k, userDefinedMetadataHeaderPrefix) {
 | |
| 			name := strings.TrimPrefix(k, userDefinedMetadataHeaderPrefix)
 | |
| 			qm.UserDefinedMetadata[strings.ToLower(name)] = value
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return qm, checkRespCode(resp.statusCode, []int{http.StatusOK})
 | |
| }
 | |
| 
 | |
| // CreateQueue operation creates a queue under the given account.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179342.aspx
 | |
| func (c QueueServiceClient) CreateQueue(name string) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
 | |
| 	headers := c.client.getStandardHeaders()
 | |
| 	resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusCreated})
 | |
| }
 | |
| 
 | |
| // DeleteQueue operation permanently deletes the specified queue.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179436.aspx
 | |
| func (c QueueServiceClient) DeleteQueue(name string) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
 | |
| 	resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // QueueExists returns true if a queue with given name exists.
 | |
| func (c QueueServiceClient) QueueExists(name string) (bool, error) {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": {"metadata"}})
 | |
| 	resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
 | |
| 	if resp != nil && (resp.statusCode == http.StatusOK || resp.statusCode == http.StatusNotFound) {
 | |
| 		return resp.statusCode == http.StatusOK, nil
 | |
| 	}
 | |
| 
 | |
| 	return false, err
 | |
| }
 | |
| 
 | |
| // PutMessage operation adds a new message to the back of the message queue.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179346.aspx
 | |
| func (c QueueServiceClient) PutMessage(queue string, message string, params PutMessageParameters) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
 | |
| 	req := putMessageRequest{MessageText: message}
 | |
| 	body, nn, err := xmlMarshal(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	headers := c.client.getStandardHeaders()
 | |
| 	headers["Content-Length"] = strconv.Itoa(nn)
 | |
| 	resp, err := c.client.exec(http.MethodPost, uri, headers, body, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusCreated})
 | |
| }
 | |
| 
 | |
| // ClearMessages operation deletes all messages from the specified queue.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179454.aspx
 | |
| func (c QueueServiceClient) ClearMessages(queue string) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), url.Values{})
 | |
| 	resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // GetMessages operation retrieves one or more messages from the front of the
 | |
| // queue.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179474.aspx
 | |
| func (c QueueServiceClient) GetMessages(queue string, params GetMessagesParameters) (GetMessagesResponse, error) {
 | |
| 	var r GetMessagesResponse
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
 | |
| 	resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return r, err
 | |
| 	}
 | |
| 	defer resp.body.Close()
 | |
| 	err = xmlUnmarshal(resp.body, &r)
 | |
| 	return r, err
 | |
| }
 | |
| 
 | |
| // PeekMessages retrieves one or more messages from the front of the queue, but
 | |
| // does not alter the visibility of the message.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179472.aspx
 | |
| func (c QueueServiceClient) PeekMessages(queue string, params PeekMessagesParameters) (PeekMessagesResponse, error) {
 | |
| 	var r PeekMessagesResponse
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
 | |
| 	resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return r, err
 | |
| 	}
 | |
| 	defer resp.body.Close()
 | |
| 	err = xmlUnmarshal(resp.body, &r)
 | |
| 	return r, err
 | |
| }
 | |
| 
 | |
| // DeleteMessage operation deletes the specified message.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
 | |
| func (c QueueServiceClient) DeleteMessage(queue, messageID, popReceipt string) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), url.Values{
 | |
| 		"popreceipt": {popReceipt}})
 | |
| 	resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
 | |
| }
 | |
| 
 | |
| // UpdateMessage operation deletes the specified message.
 | |
| //
 | |
| // See https://msdn.microsoft.com/en-us/library/azure/hh452234.aspx
 | |
| func (c QueueServiceClient) UpdateMessage(queue string, messageID string, message string, params UpdateMessageParameters) error {
 | |
| 	uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), params.getParameters())
 | |
| 	req := putMessageRequest{MessageText: message}
 | |
| 	body, nn, err := xmlMarshal(req)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	headers := c.client.getStandardHeaders()
 | |
| 	headers["Content-Length"] = fmt.Sprintf("%d", nn)
 | |
| 	resp, err := c.client.exec(http.MethodPut, uri, headers, body, c.auth)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer readAndCloseBody(resp.body)
 | |
| 	return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
 | |
| }
 |