`
September 21, 2019 本文阅读量

TCP拆包粘包

在实现RPC协议时,面试时都遇到这个问题,特此记录

一些名词

MTU(Maximum Transmission Unit)

the maximum transmission unit (MTU) is the size of the largest protocol data unit (PDU) that can be communicated in a single network layer transaction. ——from wiki

MTU 物理接口(数据链路层)提供给其上层(通常是IP层)最大一次传输数据的大小。一般来说MTU=1500byte。如果MSS + TCP首部 + IP首部 > MTU,那么IP报文就会存在分片,否则就不需要分片。

MSS (Maximum Segment Size)

The maximum segment size (MSS) is a parameter of the options field of the TCP header that specifies the largest amount of data, specified in bytes, that a computer or communications device can receive in a single TCP segment. ——from wiki

MSS是TCP提交给IP层最大分段大小,只包含payload,不包括TCP头部长度。MSS由TCP握手过程中由双方协商得出,其中SYN字段中的选项部分包括了这个信息。一般来说,MSS = MTU - IP首部大小 - TCP首部大小

为什么会出现“拆包”?

“拆包”是无法一次发送所有数据。

如果应用层需要发送的字节数超过了MSS,就需要发送多个TCP报文才能完所有应用数据的发送。

为什么会出现“粘包”?

“粘包”是在一次接收数据不能完全地体现一个完整的消息数据。

1.Nagle算法所致(Nagle算法是一种改善网络传输效率的算法)简单的说,当我们提交一段数据给TCP发送时,TCP并不立刻发送此段数据,而是等待一小段时间,看看在等待期间是否还有要发送的数据,若有则会一次把这两段数据发送出去。

2.接收端接收不及时造成的接收端粘包:TCP会把接收到的数据存在自己的缓冲区中,然后通知应用层取数据.当应用层由于某些原因不能及时的把TCP的数据取出来,就会造成TCP缓冲区中存放了几段数据。

总结一下:

TCP是字节流协议,没有记录边界,TCP也不理解流所携带的数据内容。因此需要应用层自己去定义“有序的,结构化的数据信息”。

如何解决呢?

现在知道了以上两种情况是由于TCP是字节流的协议,不关心数据结构和边界,所以这一部分工作是要交给应用层自己处理的。下面就先看下应用层中协议和应用是怎么处理的。

HTTP

HTTP是一个文本协议,用\r\n\r\n来区分消息头和消息体,在消息头中有Content-Length来说明消息体有多大,如果没有该字段就说明没有消息体。

二进制协议

通过消息固定长度的字节表示消息的总长度。Talk is cheap, show me your Code:

// Package proto .
package proto

import (
	"bufio"
	"encoding/binary"
	"errors"
)

const (
	// OpRequest .
	OpRequest uint16 = iota + 1
	// OpResponse .
	OpResponse
)

const (
	// Ver1 .
	Ver1 uint16 = 1
	// Ver2 .
	Ver2 = 2
)

var (
	// ErrProtoHeaderLen .
	ErrProtoHeaderLen = errors.New("not matched proto header len")
	// ErrEmptyReader .
	ErrEmptyReader = errors.New("empty reader")
)

const (
	// size
	_packSize      uint16 = 4
	_headerSize    uint16 = 2 // uint16
	_verSize       uint16 = 2 // uint16
	_opSize        uint16 = 2 // uint16
	_seqSize       uint16 = 2 // uint16
	_rawHeaderSize uint16 = _packSize + _headerSize + _verSize + _opSize + _seqSize

	// offset
	_packOffset   uint16 = 0
	_headerOffset        = _packOffset + _packSize
	_verOffset           = _headerOffset + _headerSize
	_opOffset            = _verOffset + _verSize
	_seqOffset           = _opOffset + _opSize
)

// Proto .这部分参考了GOIM, 定制了一些协议头信息,可自定义。
type Proto struct {
	Ver  uint16
	Op   uint16 // Type of Proto
	Seq  uint16 // Seq of message, 0 means done, else means not finished
	Body []byte // Body of Proto
}

// New .
func New() *Proto {
	return &Proto{
		Ver:  Ver1,
		Op:   OpRequest,
		Seq:  0,
		Body: nil,
	}
}

// WriteTCP .
// packLen(32bit):headerLen(16bit):ver(16bit):op(16bit):body
func (p *Proto) WriteTCP(wr *bufio.Writer) (err error) {
	var (
		buf     = make([]byte, _rawHeaderSize)
		packLen int
	)

	packLen = int(_rawHeaderSize) + len(p.Body)
	binary.BigEndian.PutUint32(buf[_packOffset:], uint32(packLen))
	binary.BigEndian.PutUint16(buf[_headerOffset:], _rawHeaderSize)
	binary.BigEndian.PutUint16(buf[_verOffset:], p.Ver)
	binary.BigEndian.PutUint16(buf[_opOffset:], p.Op)
	binary.BigEndian.PutUint16(buf[_seqOffset:], p.Seq)

	if _, err = wr.Write(buf); err != nil {
		return
	}

	if p.Body != nil {
		_, err = wr.Write(p.Body)
	}

	// println(wr.Buffered(), len(p.Body))
	return
}

// ReadTCP .
func (p *Proto) ReadTCP(rr *bufio.Reader) (err error) {
	var (
		bodyLen   int
		headerLen uint16
		packLen   int
		buf       []byte
	)

	if buf, err = ReadNBytes(rr, int(_rawHeaderSize)); err != nil {
		return
	}

	packLen = int(binary.BigEndian.Uint32(buf[_packOffset:_headerOffset]))
	headerLen = binary.BigEndian.Uint16(buf[_headerOffset:_verOffset])
	p.Ver = binary.BigEndian.Uint16(buf[_verOffset:_opOffset])
	p.Op = binary.BigEndian.Uint16(buf[_opOffset:_seqOffset])
	p.Seq = binary.BigEndian.Uint16(buf[_seqOffset:])

	if headerLen != _rawHeaderSize {
		return ErrProtoHeaderLen
	}

	if bodyLen = packLen - int(headerLen); bodyLen > 0 {
		p.Body, err = ReadNBytes(rr, bodyLen)
	} else {
		p.Body = nil
	}

	return
}

// ReadNBytes . read limitted `N` bytes from bufio.Reader.
func ReadNBytes(rr *bufio.Reader, N int) ([]byte, error) {
	if rr == nil {
		return nil, ErrEmptyReader
	}

	var (
		buf = make([]byte, N)
		err error
	)
	for i := 0; i < N; i++ {
		if buf[i], err = rr.ReadByte(); err != nil {
			return nil, err
		}
	}

	return buf, err
}

Proto应用:

// handleConn to recive a conn,
// parse NewRequest and then transfer to call.
func (s *Server) handleConn(conn net.Conn) {
	// receive a request
	// data, err := bufio.NewReader(conn).ReadBytes('\n')
	rr := bufio.NewReader(conn)
	wr := bufio.NewWriter(conn)
	var (
		precv = proto.New()
		psend = proto.New()
	)

	if err := precv.ReadTCP(rr); err != nil {·
		DebugF("response to client connection err: %v", err)
		resp := s.codec.NewResponse(nil, nil, InternalErr)
		psend.Body = encodeResponse(s.codec, resp) // []byte(....)
		psend.WriteTCP(wr)  // Proto写到buffer中去
		wr.Flush()          // 将字节流从buffer中写到wr.Writer, 可以考虑将这一步藏起来。
		return
    }

    // ...
    // more code
}

参考文献

  1. TCP协议
  2. WSS
  3. MTU