509 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			509 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
| package s3
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"crypto/md5"
 | |
| 	"encoding/base64"
 | |
| 	"encoding/hex"
 | |
| 	"encoding/xml"
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"sort"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| )
 | |
| 
 | |
| // Multi represents an unfinished multipart upload.
 | |
| //
 | |
| // Multipart uploads allow sending big objects in smaller chunks.
 | |
| // After all parts have been sent, the upload must be explicitly
 | |
| // completed by calling Complete with the list of parts.
 | |
| //
 | |
| // See http://goo.gl/vJfTG for an overview of multipart uploads.
 | |
| type Multi struct {
 | |
| 	Bucket   *Bucket
 | |
| 	Key      string
 | |
| 	UploadId string
 | |
| }
 | |
| 
 | |
| // That's the default. Here just for testing.
 | |
| var listMultiMax = 1000
 | |
| 
 | |
| type listMultiResp struct {
 | |
| 	NextKeyMarker      string
 | |
| 	NextUploadIdMarker string
 | |
| 	IsTruncated        bool
 | |
| 	Upload             []Multi
 | |
| 	CommonPrefixes     []string `xml:"CommonPrefixes>Prefix"`
 | |
| }
 | |
| 
 | |
| // ListMulti returns the list of unfinished multipart uploads in b.
 | |
| //
 | |
| // The prefix parameter limits the response to keys that begin with the
 | |
| // specified prefix. You can use prefixes to separate a bucket into different
 | |
| // groupings of keys (to get the feeling of folders, for example).
 | |
| //
 | |
| // The delim parameter causes the response to group all of the keys that
 | |
| // share a common prefix up to the next delimiter in a single entry within
 | |
| // the CommonPrefixes field. You can use delimiters to separate a bucket
 | |
| // into different groupings of keys, similar to how folders would work.
 | |
| //
 | |
| // See http://goo.gl/ePioY for details.
 | |
| func (b *Bucket) ListMulti(prefix, delim string) (multis []*Multi, prefixes []string, err error) {
 | |
| 	params := map[string][]string{
 | |
| 		"uploads":     {""},
 | |
| 		"max-uploads": {strconv.FormatInt(int64(listMultiMax), 10)},
 | |
| 		"prefix":      {prefix},
 | |
| 		"delimiter":   {delim},
 | |
| 	}
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		req := &request{
 | |
| 			method: "GET",
 | |
| 			bucket: b.Name,
 | |
| 			params: params,
 | |
| 		}
 | |
| 		var resp listMultiResp
 | |
| 		err := b.S3.query(req, &resp)
 | |
| 		if shouldRetry(err) && attempt.HasNext() {
 | |
| 			continue
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 		for i := range resp.Upload {
 | |
| 			multi := &resp.Upload[i]
 | |
| 			multi.Bucket = b
 | |
| 			multis = append(multis, multi)
 | |
| 		}
 | |
| 		prefixes = append(prefixes, resp.CommonPrefixes...)
 | |
| 		if !resp.IsTruncated {
 | |
| 			return multis, prefixes, nil
 | |
| 		}
 | |
| 		params["key-marker"] = []string{resp.NextKeyMarker}
 | |
| 		params["upload-id-marker"] = []string{resp.NextUploadIdMarker}
 | |
| 		attempt = attempts.Start() // Last request worked.
 | |
| 	}
 | |
| 	panic("unreachable")
 | |
| }
 | |
| 
 | |
| // Multi returns a multipart upload handler for the provided key
 | |
| // inside b. If a multipart upload exists for key, it is returned,
 | |
| // otherwise a new multipart upload is initiated with contType and perm.
 | |
| func (b *Bucket) Multi(key, contType string, perm ACL, options Options) (*Multi, error) {
 | |
| 	multis, _, err := b.ListMulti(key, "")
 | |
| 	if err != nil && !hasCode(err, "NoSuchUpload") {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	for _, m := range multis {
 | |
| 		if m.Key == key {
 | |
| 			return m, nil
 | |
| 		}
 | |
| 	}
 | |
| 	return b.InitMulti(key, contType, perm, options)
 | |
| }
 | |
| 
 | |
| // InitMulti initializes a new multipart upload at the provided
 | |
| // key inside b and returns a value for manipulating it.
 | |
| //
 | |
| // See http://goo.gl/XP8kL for details.
 | |
| func (b *Bucket) InitMulti(key string, contType string, perm ACL, options Options) (*Multi, error) {
 | |
| 	headers := map[string][]string{
 | |
| 		"Content-Type":   {contType},
 | |
| 		"Content-Length": {"0"},
 | |
| 		"x-amz-acl":      {string(perm)},
 | |
| 	}
 | |
| 	options.addHeaders(headers)
 | |
| 	params := map[string][]string{
 | |
| 		"uploads": {""},
 | |
| 	}
 | |
| 	req := &request{
 | |
| 		method:  "POST",
 | |
| 		bucket:  b.Name,
 | |
| 		path:    key,
 | |
| 		headers: headers,
 | |
| 		params:  params,
 | |
| 	}
 | |
| 	var err error
 | |
| 	var resp struct {
 | |
| 		UploadId string `xml:"UploadId"`
 | |
| 	}
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		err = b.S3.query(req, &resp)
 | |
| 		if !shouldRetry(err) {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil
 | |
| }
 | |
| 
 | |
| func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObjectResult, Part, error) {
 | |
| 	headers := map[string][]string{
 | |
| 		"x-amz-copy-source": {url.QueryEscape(source)},
 | |
| 	}
 | |
| 	options.addHeaders(headers)
 | |
| 	params := map[string][]string{
 | |
| 		"uploadId":   {m.UploadId},
 | |
| 		"partNumber": {strconv.FormatInt(int64(n), 10)},
 | |
| 	}
 | |
| 
 | |
| 	sourceBucket := m.Bucket.S3.Bucket(strings.TrimRight(strings.SplitAfterN(source, "/", 2)[0], "/"))
 | |
| 	sourceMeta, err := sourceBucket.Head(strings.SplitAfterN(source, "/", 2)[1], nil)
 | |
| 	if err != nil {
 | |
| 		return nil, Part{}, err
 | |
| 	}
 | |
| 
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		req := &request{
 | |
| 			method:  "PUT",
 | |
| 			bucket:  m.Bucket.Name,
 | |
| 			path:    m.Key,
 | |
| 			headers: headers,
 | |
| 			params:  params,
 | |
| 		}
 | |
| 		resp := &CopyObjectResult{}
 | |
| 		err = m.Bucket.S3.query(req, resp)
 | |
| 		if shouldRetry(err) && attempt.HasNext() {
 | |
| 			continue
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return nil, Part{}, err
 | |
| 		}
 | |
| 		if resp.ETag == "" {
 | |
| 			return nil, Part{}, errors.New("part upload succeeded with no ETag")
 | |
| 		}
 | |
| 		return resp, Part{n, resp.ETag, sourceMeta.ContentLength}, nil
 | |
| 	}
 | |
| 	panic("unreachable")
 | |
| }
 | |
| 
 | |
| // PutPart sends part n of the multipart upload, reading all the content from r.
 | |
| // Each part, except for the last one, must be at least 5MB in size.
 | |
| //
 | |
| // See http://goo.gl/pqZer for details.
 | |
| func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) {
 | |
| 	partSize, _, md5b64, err := seekerInfo(r)
 | |
| 	if err != nil {
 | |
| 		return Part{}, err
 | |
| 	}
 | |
| 	return m.putPart(n, r, partSize, md5b64)
 | |
| }
 | |
| 
 | |
| func (m *Multi) putPart(n int, r io.ReadSeeker, partSize int64, md5b64 string) (Part, error) {
 | |
| 	headers := map[string][]string{
 | |
| 		"Content-Length": {strconv.FormatInt(partSize, 10)},
 | |
| 		"Content-MD5":    {md5b64},
 | |
| 	}
 | |
| 	params := map[string][]string{
 | |
| 		"uploadId":   {m.UploadId},
 | |
| 		"partNumber": {strconv.FormatInt(int64(n), 10)},
 | |
| 	}
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		_, err := r.Seek(0, 0)
 | |
| 		if err != nil {
 | |
| 			return Part{}, err
 | |
| 		}
 | |
| 		req := &request{
 | |
| 			method:  "PUT",
 | |
| 			bucket:  m.Bucket.Name,
 | |
| 			path:    m.Key,
 | |
| 			headers: headers,
 | |
| 			params:  params,
 | |
| 			payload: r,
 | |
| 		}
 | |
| 		err = m.Bucket.S3.prepare(req)
 | |
| 		if err != nil {
 | |
| 			return Part{}, err
 | |
| 		}
 | |
| 		resp, err := m.Bucket.S3.run(req, nil)
 | |
| 		if shouldRetry(err) && attempt.HasNext() {
 | |
| 			continue
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return Part{}, err
 | |
| 		}
 | |
| 		etag := resp.Header.Get("ETag")
 | |
| 		if etag == "" {
 | |
| 			return Part{}, errors.New("part upload succeeded with no ETag")
 | |
| 		}
 | |
| 		return Part{n, etag, partSize}, nil
 | |
| 	}
 | |
| 	panic("unreachable")
 | |
| }
 | |
| 
 | |
| func seekerInfo(r io.ReadSeeker) (size int64, md5hex string, md5b64 string, err error) {
 | |
| 	_, err = r.Seek(0, 0)
 | |
| 	if err != nil {
 | |
| 		return 0, "", "", err
 | |
| 	}
 | |
| 	digest := md5.New()
 | |
| 	size, err = io.Copy(digest, r)
 | |
| 	if err != nil {
 | |
| 		return 0, "", "", err
 | |
| 	}
 | |
| 	sum := digest.Sum(nil)
 | |
| 	md5hex = hex.EncodeToString(sum)
 | |
| 	md5b64 = base64.StdEncoding.EncodeToString(sum)
 | |
| 	return size, md5hex, md5b64, nil
 | |
| }
 | |
| 
 | |
| type Part struct {
 | |
| 	N    int `xml:"PartNumber"`
 | |
| 	ETag string
 | |
| 	Size int64
 | |
| }
 | |
| 
 | |
| type partSlice []Part
 | |
| 
 | |
| func (s partSlice) Len() int           { return len(s) }
 | |
| func (s partSlice) Less(i, j int) bool { return s[i].N < s[j].N }
 | |
| func (s partSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 | |
| 
 | |
| type listPartsResp struct {
 | |
| 	NextPartNumberMarker string
 | |
| 	IsTruncated          bool
 | |
| 	Part                 []Part
 | |
| }
 | |
| 
 | |
| // That's the default. Here just for testing.
 | |
| var listPartsMax = 1000
 | |
| 
 | |
| // Kept for backcompatability. See the documentation for ListPartsFull
 | |
| func (m *Multi) ListParts() ([]Part, error) {
 | |
| 	return m.ListPartsFull(0, listPartsMax)
 | |
| }
 | |
| 
 | |
| // ListParts returns the list of previously uploaded parts in m,
 | |
| // ordered by part number (Only parts with higher part numbers than
 | |
| // partNumberMarker will be listed). Only up to maxParts parts will be
 | |
| // returned.
 | |
| //
 | |
| // See http://goo.gl/ePioY for details.
 | |
| func (m *Multi) ListPartsFull(partNumberMarker int, maxParts int) ([]Part, error) {
 | |
| 	if maxParts > listPartsMax {
 | |
| 		maxParts = listPartsMax
 | |
| 	}
 | |
| 
 | |
| 	params := map[string][]string{
 | |
| 		"uploadId":           {m.UploadId},
 | |
| 		"max-parts":          {strconv.FormatInt(int64(maxParts), 10)},
 | |
| 		"part-number-marker": {strconv.FormatInt(int64(partNumberMarker), 10)},
 | |
| 	}
 | |
| 	var parts partSlice
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		req := &request{
 | |
| 			method: "GET",
 | |
| 			bucket: m.Bucket.Name,
 | |
| 			path:   m.Key,
 | |
| 			params: params,
 | |
| 		}
 | |
| 		var resp listPartsResp
 | |
| 		err := m.Bucket.S3.query(req, &resp)
 | |
| 		if shouldRetry(err) && attempt.HasNext() {
 | |
| 			continue
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		parts = append(parts, resp.Part...)
 | |
| 		if !resp.IsTruncated {
 | |
| 			sort.Sort(parts)
 | |
| 			return parts, nil
 | |
| 		}
 | |
| 		params["part-number-marker"] = []string{resp.NextPartNumberMarker}
 | |
| 		attempt = attempts.Start() // Last request worked.
 | |
| 	}
 | |
| 	panic("unreachable")
 | |
| }
 | |
| 
 | |
| type ReaderAtSeeker interface {
 | |
| 	io.ReaderAt
 | |
| 	io.ReadSeeker
 | |
| }
 | |
| 
 | |
| // PutAll sends all of r via a multipart upload with parts no larger
 | |
| // than partSize bytes, which must be set to at least 5MB.
 | |
| // Parts previously uploaded are either reused if their checksum
 | |
| // and size match the new part, or otherwise overwritten with the
 | |
| // new content.
 | |
| // PutAll returns all the parts of m (reused or not).
 | |
| func (m *Multi) PutAll(r ReaderAtSeeker, partSize int64) ([]Part, error) {
 | |
| 	old, err := m.ListParts()
 | |
| 	if err != nil && !hasCode(err, "NoSuchUpload") {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	reuse := 0   // Index of next old part to consider reusing.
 | |
| 	current := 1 // Part number of latest good part handled.
 | |
| 	totalSize, err := r.Seek(0, 2)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	first := true // Must send at least one empty part if the file is empty.
 | |
| 	var result []Part
 | |
| NextSection:
 | |
| 	for offset := int64(0); offset < totalSize || first; offset += partSize {
 | |
| 		first = false
 | |
| 		if offset+partSize > totalSize {
 | |
| 			partSize = totalSize - offset
 | |
| 		}
 | |
| 		section := io.NewSectionReader(r, offset, partSize)
 | |
| 		_, md5hex, md5b64, err := seekerInfo(section)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		for reuse < len(old) && old[reuse].N <= current {
 | |
| 			// Looks like this part was already sent.
 | |
| 			part := &old[reuse]
 | |
| 			etag := `"` + md5hex + `"`
 | |
| 			if part.N == current && part.Size == partSize && part.ETag == etag {
 | |
| 				// Checksum matches. Reuse the old part.
 | |
| 				result = append(result, *part)
 | |
| 				current++
 | |
| 				continue NextSection
 | |
| 			}
 | |
| 			reuse++
 | |
| 		}
 | |
| 
 | |
| 		// Part wasn't found or doesn't match. Send it.
 | |
| 		part, err := m.putPart(current, section, partSize, md5b64)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		result = append(result, part)
 | |
| 		current++
 | |
| 	}
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| type completeUpload struct {
 | |
| 	XMLName xml.Name      `xml:"CompleteMultipartUpload"`
 | |
| 	Parts   completeParts `xml:"Part"`
 | |
| }
 | |
| 
 | |
| type completePart struct {
 | |
| 	PartNumber int
 | |
| 	ETag       string
 | |
| }
 | |
| 
 | |
| type completeParts []completePart
 | |
| 
 | |
| func (p completeParts) Len() int           { return len(p) }
 | |
| func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
 | |
| func (p completeParts) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
 | |
| 
 | |
| // We can't know in advance whether we'll have an Error or a
 | |
| // CompleteMultipartUploadResult, so this structure is just a placeholder to
 | |
| // know the name of the XML object.
 | |
| type completeUploadResp struct {
 | |
| 	XMLName  xml.Name
 | |
| 	InnerXML string `xml:",innerxml"`
 | |
| }
 | |
| 
 | |
| // Complete assembles the given previously uploaded parts into the
 | |
| // final object. This operation may take several minutes.
 | |
| //
 | |
| // See http://goo.gl/2Z7Tw for details.
 | |
| func (m *Multi) Complete(parts []Part) error {
 | |
| 	params := map[string][]string{
 | |
| 		"uploadId": {m.UploadId},
 | |
| 	}
 | |
| 	c := completeUpload{}
 | |
| 	for _, p := range parts {
 | |
| 		c.Parts = append(c.Parts, completePart{p.N, p.ETag})
 | |
| 	}
 | |
| 	sort.Sort(c.Parts)
 | |
| 	data, err := xml.Marshal(&c)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		req := &request{
 | |
| 			method:  "POST",
 | |
| 			bucket:  m.Bucket.Name,
 | |
| 			path:    m.Key,
 | |
| 			params:  params,
 | |
| 			payload: bytes.NewReader(data),
 | |
| 		}
 | |
| 		var resp completeUploadResp
 | |
| 		if m.Bucket.Region.Name == "generic" {
 | |
| 			headers := make(http.Header)
 | |
| 			headers.Add("Content-Length", strconv.FormatInt(int64(len(data)), 10))
 | |
| 			req.headers = headers
 | |
| 		}
 | |
| 		err := m.Bucket.S3.query(req, &resp)
 | |
| 		if shouldRetry(err) && attempt.HasNext() {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// A 200 error code does not guarantee that there were no errors (see
 | |
| 		// http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html ),
 | |
| 		// so first figure out what kind of XML "object" we are dealing with.
 | |
| 
 | |
| 		if resp.XMLName.Local == "Error" {
 | |
| 			// S3.query does the unmarshalling for us, so we can't unmarshal
 | |
| 			// again in a different struct... So we need to duct-tape back the
 | |
| 			// original XML back together.
 | |
| 			fullErrorXml := "<Error>" + resp.InnerXML + "</Error>"
 | |
| 			s3err := &Error{}
 | |
| 
 | |
| 			if err := xml.Unmarshal([]byte(fullErrorXml), s3err); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			return s3err
 | |
| 		}
 | |
| 
 | |
| 		if resp.XMLName.Local == "CompleteMultipartUploadResult" {
 | |
| 			// FIXME: One could probably add a CompleteFull method returning the
 | |
| 			// actual contents of the CompleteMultipartUploadResult object.
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		return errors.New("Invalid XML struct returned: " + resp.XMLName.Local)
 | |
| 	}
 | |
| 	panic("unreachable")
 | |
| }
 | |
| 
 | |
| // Abort deletes an unifinished multipart upload and any previously
 | |
| // uploaded parts for it.
 | |
| //
 | |
| // After a multipart upload is aborted, no additional parts can be
 | |
| // uploaded using it. However, if any part uploads are currently in
 | |
| // progress, those part uploads might or might not succeed. As a result,
 | |
| // it might be necessary to abort a given multipart upload multiple
 | |
| // times in order to completely free all storage consumed by all parts.
 | |
| //
 | |
| // NOTE: If the described scenario happens to you, please report back to
 | |
| // the goamz authors with details. In the future such retrying should be
 | |
| // handled internally, but it's not clear what happens precisely (Is an
 | |
| // error returned? Is the issue completely undetectable?).
 | |
| //
 | |
| // See http://goo.gl/dnyJw for details.
 | |
| func (m *Multi) Abort() error {
 | |
| 	params := map[string][]string{
 | |
| 		"uploadId": {m.UploadId},
 | |
| 	}
 | |
| 	for attempt := attempts.Start(); attempt.Next(); {
 | |
| 		req := &request{
 | |
| 			method: "DELETE",
 | |
| 			bucket: m.Bucket.Name,
 | |
| 			path:   m.Key,
 | |
| 			params: params,
 | |
| 		}
 | |
| 		err := m.Bucket.S3.query(req, nil)
 | |
| 		if shouldRetry(err) && attempt.HasNext() {
 | |
| 			continue
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 	panic("unreachable")
 | |
| }
 |