Support range requests in the client's httpReadSeeker
Remove buffering on the reader, because it's not useful. Also remove artificial io.EOF return. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>master
							parent
							
								
									69db5b7440
								
							
						
					
					
						commit
						ccf8154a44
					
				| 
						 | 
					@ -1,12 +1,22 @@
 | 
				
			||||||
package transport
 | 
					package transport
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bufio"
 | 
					 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
 | 
						"regexp"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ErrWrongCodeForByteRange is returned if the client sends a request
 | 
				
			||||||
 | 
						// with a Range header but the server returns a 2xx or 3xx code other
 | 
				
			||||||
 | 
						// than 206 Partial Content.
 | 
				
			||||||
 | 
						ErrWrongCodeForByteRange = errors.New("expected HTTP 206 from byte range request")
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ReadSeekCloser combines io.ReadSeeker with io.Closer.
 | 
					// ReadSeekCloser combines io.ReadSeeker with io.Closer.
 | 
				
			||||||
| 
						 | 
					@ -40,8 +50,6 @@ type httpReadSeeker struct {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// rc is the remote read closer.
 | 
						// rc is the remote read closer.
 | 
				
			||||||
	rc io.ReadCloser
 | 
						rc io.ReadCloser
 | 
				
			||||||
	// brd is a buffer for internal buffered io.
 | 
					 | 
				
			||||||
	brd *bufio.Reader
 | 
					 | 
				
			||||||
	// readerOffset tracks the offset as of the last read.
 | 
						// readerOffset tracks the offset as of the last read.
 | 
				
			||||||
	readerOffset int64
 | 
						readerOffset int64
 | 
				
			||||||
	// seekOffset allows Seek to override the offset. Seek changes
 | 
						// seekOffset allows Seek to override the offset. Seek changes
 | 
				
			||||||
| 
						 | 
					@ -79,11 +87,6 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
 | 
				
			||||||
	hrs.seekOffset += int64(n)
 | 
						hrs.seekOffset += int64(n)
 | 
				
			||||||
	hrs.readerOffset += int64(n)
 | 
						hrs.readerOffset += int64(n)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Simulate io.EOF error if we reach filesize.
 | 
					 | 
				
			||||||
	if err == nil && hrs.size >= 0 && hrs.readerOffset >= hrs.size {
 | 
					 | 
				
			||||||
		err = io.EOF
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return n, err
 | 
						return n, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -92,8 +95,18 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
 | 
				
			||||||
		return 0, hrs.err
 | 
							return 0, hrs.err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						lastReaderOffset := hrs.readerOffset
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if whence == os.SEEK_SET && hrs.rc == nil {
 | 
				
			||||||
 | 
							// If no request has been made yet, and we are seeking to an
 | 
				
			||||||
 | 
							// absolute position, set the read offset as well to avoid an
 | 
				
			||||||
 | 
							// unnecessary request.
 | 
				
			||||||
 | 
							hrs.readerOffset = offset
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err := hrs.reader()
 | 
						_, err := hrs.reader()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 | 
							hrs.readerOffset = lastReaderOffset
 | 
				
			||||||
		return 0, err
 | 
							return 0, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -101,14 +114,14 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	switch whence {
 | 
						switch whence {
 | 
				
			||||||
	case os.SEEK_CUR:
 | 
						case os.SEEK_CUR:
 | 
				
			||||||
		newOffset += int64(offset)
 | 
							newOffset += offset
 | 
				
			||||||
	case os.SEEK_END:
 | 
						case os.SEEK_END:
 | 
				
			||||||
		if hrs.size < 0 {
 | 
							if hrs.size < 0 {
 | 
				
			||||||
			return 0, errors.New("content length not known")
 | 
								return 0, errors.New("content length not known")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		newOffset = hrs.size + int64(offset)
 | 
							newOffset = hrs.size + offset
 | 
				
			||||||
	case os.SEEK_SET:
 | 
						case os.SEEK_SET:
 | 
				
			||||||
		newOffset = int64(offset)
 | 
							newOffset = offset
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if newOffset < 0 {
 | 
						if newOffset < 0 {
 | 
				
			||||||
| 
						 | 
					@ -131,7 +144,6 @@ func (hrs *httpReadSeeker) Close() error {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	hrs.rc = nil
 | 
						hrs.rc = nil
 | 
				
			||||||
	hrs.brd = nil
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	hrs.err = errors.New("httpLayer: closed")
 | 
						hrs.err = errors.New("httpLayer: closed")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -154,7 +166,7 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if hrs.rc != nil {
 | 
						if hrs.rc != nil {
 | 
				
			||||||
		return hrs.brd, nil
 | 
							return hrs.rc, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	req, err := http.NewRequest("GET", hrs.url, nil)
 | 
						req, err := http.NewRequest("GET", hrs.url, nil)
 | 
				
			||||||
| 
						 | 
					@ -163,10 +175,8 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if hrs.readerOffset > 0 {
 | 
						if hrs.readerOffset > 0 {
 | 
				
			||||||
		// TODO(stevvooe): Get this working correctly.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// If we are at different offset, issue a range request from there.
 | 
							// If we are at different offset, issue a range request from there.
 | 
				
			||||||
		req.Header.Add("Range", "1-")
 | 
							req.Header.Add("Range", fmt.Sprintf("bytes=%d-", hrs.readerOffset))
 | 
				
			||||||
		// TODO: get context in here
 | 
							// TODO: get context in here
 | 
				
			||||||
		// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
 | 
							// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -179,12 +189,55 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
 | 
				
			||||||
	// Normally would use client.SuccessStatus, but that would be a cyclic
 | 
						// Normally would use client.SuccessStatus, but that would be a cyclic
 | 
				
			||||||
	// import
 | 
						// import
 | 
				
			||||||
	if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
 | 
						if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
 | 
				
			||||||
		hrs.rc = resp.Body
 | 
							if hrs.readerOffset > 0 {
 | 
				
			||||||
		if resp.StatusCode == http.StatusOK {
 | 
								if resp.StatusCode != http.StatusPartialContent {
 | 
				
			||||||
 | 
									return nil, ErrWrongCodeForByteRange
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								contentRange := resp.Header.Get("Content-Range")
 | 
				
			||||||
 | 
								if contentRange == "" {
 | 
				
			||||||
 | 
									return nil, errors.New("no Content-Range header found in HTTP 206 response")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								submatches := contentRangeRegexp.FindStringSubmatch(contentRange)
 | 
				
			||||||
 | 
								if len(submatches) < 4 {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("could not parse Content-Range header: %s", contentRange)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								startByte, err := strconv.ParseUint(submatches[1], 10, 64)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("could not parse start of range in Content-Range header: %s", contentRange)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if startByte != uint64(hrs.readerOffset) {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("received Content-Range starting at offset %d instead of requested %d", startByte, hrs.readerOffset)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								endByte, err := strconv.ParseUint(submatches[2], 10, 64)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return nil, fmt.Errorf("could not parse end of range in Content-Range header: %s", contentRange)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if submatches[3] == "*" {
 | 
				
			||||||
 | 
									hrs.size = -1
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									size, err := strconv.ParseUint(submatches[3], 10, 64)
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										return nil, fmt.Errorf("could not parse total size in Content-Range header: %s", contentRange)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									if endByte+1 != size {
 | 
				
			||||||
 | 
										return nil, fmt.Errorf("range in Content-Range stops before the end of the content: %s", contentRange)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									hrs.size = int64(size)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							} else if resp.StatusCode == http.StatusOK {
 | 
				
			||||||
			hrs.size = resp.ContentLength
 | 
								hrs.size = resp.ContentLength
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			hrs.size = -1
 | 
								hrs.size = -1
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							hrs.rc = resp.Body
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		defer resp.Body.Close()
 | 
							defer resp.Body.Close()
 | 
				
			||||||
		if hrs.errorHandler != nil {
 | 
							if hrs.errorHandler != nil {
 | 
				
			||||||
| 
						 | 
					@ -193,11 +246,5 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
 | 
				
			||||||
		return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
 | 
							return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if hrs.brd == nil {
 | 
						return hrs.rc, nil
 | 
				
			||||||
		hrs.brd = bufio.NewReader(hrs.rc)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		hrs.brd.Reset(hrs.rc)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return hrs.brd, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue