terraform-provider-google/vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go
Paddy 961c878e0d Switch to using Go modules. (#2679)
Switch to using Go modules.

This migrates our vendor.json to use Go 1.11's modules system, and
replaces the vendor folder with the output of go mod vendor.

The vendored code should remain basically the same; I believe some
tree shaking of packages and support scripts/licenses/READMEs/etc.
happened.

This also fixes Travis and our Makefile to no longer use govendor.
2018-12-20 17:22:22 -08:00

200 lines
4.1 KiB
Go

package eventstream
import (
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"hash/crc32"
"io"
"github.com/aws/aws-sdk-go/aws"
)
// Decoder provides decoding of an Event Stream messages.
type Decoder struct {
r io.Reader
logger aws.Logger
}
// NewDecoder initializes and returns a Decoder for decoding event
// stream messages from the reader provided.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{
r: r,
}
}
// Decode attempts to decode a single message from the event stream reader.
// Will return the event stream message, or error if Decode fails to read
// the message from the stream.
func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) {
reader := d.r
if d.logger != nil {
debugMsgBuf := bytes.NewBuffer(nil)
reader = io.TeeReader(reader, debugMsgBuf)
defer func() {
logMessageDecode(d.logger, debugMsgBuf, m, err)
}()
}
crc := crc32.New(crc32IEEETable)
hashReader := io.TeeReader(reader, crc)
prelude, err := decodePrelude(hashReader, crc)
if err != nil {
return Message{}, err
}
if prelude.HeadersLen > 0 {
lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
m.Headers, err = decodeHeaders(lr)
if err != nil {
return Message{}, err
}
}
if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
if err != nil {
return Message{}, err
}
m.Payload = buf
}
msgCRC := crc.Sum32()
if err := validateCRC(reader, msgCRC); err != nil {
return Message{}, err
}
return m, nil
}
// UseLogger specifies the Logger that that the decoder should use to log the
// message decode to.
func (d *Decoder) UseLogger(logger aws.Logger) {
d.logger = logger
}
func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
w := bytes.NewBuffer(nil)
defer func() { logger.Log(w.String()) }()
fmt.Fprintf(w, "Raw message:\n%s\n",
hex.Dump(msgBuf.Bytes()))
if decodeErr != nil {
fmt.Fprintf(w, "Decode error: %v\n", decodeErr)
return
}
rawMsg, err := msg.rawMessage()
if err != nil {
fmt.Fprintf(w, "failed to create raw message, %v\n", err)
return
}
decodedMsg := decodedMessage{
rawMessage: rawMsg,
Headers: decodedHeaders(msg.Headers),
}
fmt.Fprintf(w, "Decoded message:\n")
encoder := json.NewEncoder(w)
if err := encoder.Encode(decodedMsg); err != nil {
fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
}
}
func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
var p messagePrelude
var err error
p.Length, err = decodeUint32(r)
if err != nil {
return messagePrelude{}, err
}
p.HeadersLen, err = decodeUint32(r)
if err != nil {
return messagePrelude{}, err
}
if err := p.ValidateLens(); err != nil {
return messagePrelude{}, err
}
preludeCRC := crc.Sum32()
if err := validateCRC(r, preludeCRC); err != nil {
return messagePrelude{}, err
}
p.PreludeCRC = preludeCRC
return p, nil
}
func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
w := bytes.NewBuffer(buf[0:0])
_, err := io.Copy(w, r)
return w.Bytes(), err
}
func decodeUint8(r io.Reader) (uint8, error) {
type byteReader interface {
ReadByte() (byte, error)
}
if br, ok := r.(byteReader); ok {
v, err := br.ReadByte()
return uint8(v), err
}
var b [1]byte
_, err := io.ReadFull(r, b[:])
return uint8(b[0]), err
}
func decodeUint16(r io.Reader) (uint16, error) {
var b [2]byte
bs := b[:]
_, err := io.ReadFull(r, bs)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint16(bs), nil
}
func decodeUint32(r io.Reader) (uint32, error) {
var b [4]byte
bs := b[:]
_, err := io.ReadFull(r, bs)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint32(bs), nil
}
func decodeUint64(r io.Reader) (uint64, error) {
var b [8]byte
bs := b[:]
_, err := io.ReadFull(r, bs)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(bs), nil
}
func validateCRC(r io.Reader, expect uint32) error {
msgCRC, err := decodeUint32(r)
if err != nil {
return err
}
if msgCRC != expect {
return ChecksumError{}
}
return nil
}