200 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			200 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
package eventstream
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/binary"
 | 
						|
	"encoding/hex"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"hash"
 | 
						|
	"hash/crc32"
 | 
						|
	"io"
 | 
						|
 | 
						|
	"github.com/aws/aws-sdk-go/aws"
 | 
						|
)
 | 
						|
 | 
						|
// Decoder provides decoding of an Event Stream messages.
 | 
						|
type Decoder struct {
 | 
						|
	r      io.Reader
 | 
						|
	logger aws.Logger
 | 
						|
}
 | 
						|
 | 
						|
// NewDecoder initializes and returns a Decoder for decoding event
 | 
						|
// stream messages from the reader provided.
 | 
						|
func NewDecoder(r io.Reader) *Decoder {
 | 
						|
	return &Decoder{
 | 
						|
		r: r,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Decode attempts to decode a single message from the event stream reader.
 | 
						|
// Will return the event stream message, or error if Decode fails to read
 | 
						|
// the message from the stream.
 | 
						|
func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) {
 | 
						|
	reader := d.r
 | 
						|
	if d.logger != nil {
 | 
						|
		debugMsgBuf := bytes.NewBuffer(nil)
 | 
						|
		reader = io.TeeReader(reader, debugMsgBuf)
 | 
						|
		defer func() {
 | 
						|
			logMessageDecode(d.logger, debugMsgBuf, m, err)
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	crc := crc32.New(crc32IEEETable)
 | 
						|
	hashReader := io.TeeReader(reader, crc)
 | 
						|
 | 
						|
	prelude, err := decodePrelude(hashReader, crc)
 | 
						|
	if err != nil {
 | 
						|
		return Message{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	if prelude.HeadersLen > 0 {
 | 
						|
		lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
 | 
						|
		m.Headers, err = decodeHeaders(lr)
 | 
						|
		if err != nil {
 | 
						|
			return Message{}, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
 | 
						|
		buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
 | 
						|
		if err != nil {
 | 
						|
			return Message{}, err
 | 
						|
		}
 | 
						|
		m.Payload = buf
 | 
						|
	}
 | 
						|
 | 
						|
	msgCRC := crc.Sum32()
 | 
						|
	if err := validateCRC(reader, msgCRC); err != nil {
 | 
						|
		return Message{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	return m, nil
 | 
						|
}
 | 
						|
 | 
						|
// UseLogger specifies the Logger that that the decoder should use to log the
 | 
						|
// message decode to.
 | 
						|
func (d *Decoder) UseLogger(logger aws.Logger) {
 | 
						|
	d.logger = logger
 | 
						|
}
 | 
						|
 | 
						|
func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
 | 
						|
	w := bytes.NewBuffer(nil)
 | 
						|
	defer func() { logger.Log(w.String()) }()
 | 
						|
 | 
						|
	fmt.Fprintf(w, "Raw message:\n%s\n",
 | 
						|
		hex.Dump(msgBuf.Bytes()))
 | 
						|
 | 
						|
	if decodeErr != nil {
 | 
						|
		fmt.Fprintf(w, "Decode error: %v\n", decodeErr)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	rawMsg, err := msg.rawMessage()
 | 
						|
	if err != nil {
 | 
						|
		fmt.Fprintf(w, "failed to create raw message, %v\n", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	decodedMsg := decodedMessage{
 | 
						|
		rawMessage: rawMsg,
 | 
						|
		Headers:    decodedHeaders(msg.Headers),
 | 
						|
	}
 | 
						|
 | 
						|
	fmt.Fprintf(w, "Decoded message:\n")
 | 
						|
	encoder := json.NewEncoder(w)
 | 
						|
	if err := encoder.Encode(decodedMsg); err != nil {
 | 
						|
		fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
 | 
						|
	var p messagePrelude
 | 
						|
 | 
						|
	var err error
 | 
						|
	p.Length, err = decodeUint32(r)
 | 
						|
	if err != nil {
 | 
						|
		return messagePrelude{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	p.HeadersLen, err = decodeUint32(r)
 | 
						|
	if err != nil {
 | 
						|
		return messagePrelude{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	if err := p.ValidateLens(); err != nil {
 | 
						|
		return messagePrelude{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	preludeCRC := crc.Sum32()
 | 
						|
	if err := validateCRC(r, preludeCRC); err != nil {
 | 
						|
		return messagePrelude{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	p.PreludeCRC = preludeCRC
 | 
						|
 | 
						|
	return p, nil
 | 
						|
}
 | 
						|
 | 
						|
func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
 | 
						|
	w := bytes.NewBuffer(buf[0:0])
 | 
						|
 | 
						|
	_, err := io.Copy(w, r)
 | 
						|
	return w.Bytes(), err
 | 
						|
}
 | 
						|
 | 
						|
func decodeUint8(r io.Reader) (uint8, error) {
 | 
						|
	type byteReader interface {
 | 
						|
		ReadByte() (byte, error)
 | 
						|
	}
 | 
						|
 | 
						|
	if br, ok := r.(byteReader); ok {
 | 
						|
		v, err := br.ReadByte()
 | 
						|
		return uint8(v), err
 | 
						|
	}
 | 
						|
 | 
						|
	var b [1]byte
 | 
						|
	_, err := io.ReadFull(r, b[:])
 | 
						|
	return uint8(b[0]), err
 | 
						|
}
 | 
						|
func decodeUint16(r io.Reader) (uint16, error) {
 | 
						|
	var b [2]byte
 | 
						|
	bs := b[:]
 | 
						|
	_, err := io.ReadFull(r, bs)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return binary.BigEndian.Uint16(bs), nil
 | 
						|
}
 | 
						|
func decodeUint32(r io.Reader) (uint32, error) {
 | 
						|
	var b [4]byte
 | 
						|
	bs := b[:]
 | 
						|
	_, err := io.ReadFull(r, bs)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return binary.BigEndian.Uint32(bs), nil
 | 
						|
}
 | 
						|
func decodeUint64(r io.Reader) (uint64, error) {
 | 
						|
	var b [8]byte
 | 
						|
	bs := b[:]
 | 
						|
	_, err := io.ReadFull(r, bs)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return binary.BigEndian.Uint64(bs), nil
 | 
						|
}
 | 
						|
 | 
						|
func validateCRC(r io.Reader, expect uint32) error {
 | 
						|
	msgCRC, err := decodeUint32(r)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if msgCRC != expect {
 | 
						|
		return ChecksumError{}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |