最新代码地址(添加head+type+data 解包 并添加心跳) 点击这里

此项目有对应的Android端demo有需求请联系下方QQ

TCP粘包和拆包产生的原因

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  • 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度
  • 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

TCP粘包和拆包的解决策略

  • 消息定长。例如100字节。
  • 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议
  • 将消息分为消息头和消息尾。(len+data模式)
  • 其它复杂的协议,如RTMP协议等。
  • 将消息分为消息头+消息类型+消息体。(len+type+data 模式)

废话不多说直接上代码

服务端

package network

import (
	"net"
	"github.com/hwholiday/libs/logtool"
	"go.uber.org/zap"
	"os"
	"time"
	"io"
	"fmt"
)

func InitTcp() {
	addr, err := net.ResolveTCPAddr("tcp", "192.168.2.28:8111")
	if err != nil {
		logtool.Zap.Error("create addr", zap.Error(err))
		os.Exit(0)
	}
	listener, err := net.ListenTCP("tcp", addr)
	if err != nil {
		logtool.Zap.Error("listen tcp", zap.Error(err))
		os.Exit(0)
	}
	logtool.Zap.Info("listen tcp", zap.String("地址", addr.String()))
	go acceptTcp(listener)
}

func acceptTcp(listener *net.TCPListener) {
	for {
		var (
			conn *net.TCPConn
			err  error
		)
		if conn, err = listener.AcceptTCP(); err != nil {
			logtool.Zap.Info("listener.Accept err", zap.Any(listener.Addr().String(), err))
			return
		}
		if err = conn.SetKeepAlive(false); err != nil {
			logtool.Zap.Info("conn.SetKeepAlive err", zap.Error(err))
			return
		}
		if err = conn.SetReadBuffer(1024); err != nil {
			logtool.Zap.Info("conn.SetReadBuffer err", zap.Error(err))
			return
		}
		if err = conn.SetWriteBuffer(1024); err != nil {
			logtool.Zap.Info("conn.SetWriteBuffer err", zap.Error(err))
			return
		}
		go serveTCP(conn)
	}
}

func serveTCP(conn *net.TCPConn) {
	client := NewTcpClint(conn, 4, 4)
	logtool.Zap.Debug("链接上来的用户", zap.Any("地址", client.RemoteAddr().String()))
	go func() {
		for {
			tag, data, err := client.Read()
			if err != nil {
				if err == io.EOF {
					logtool.Zap.Debug("用户断开链接", zap.Any("地址", client.RemoteAddr().String()))
				}
				client.conn.Close()
				return
			}
			logtool.Zap.Info(fmt.Sprintf("客户端 : %s 传入类型", client.RemoteAddr().String()), zap.String(fmt.Sprintf("类型 : %d", tag), fmt.Sprintf("数据 : %s", string(data))))
			message := make(chan int32)
			//心跳
			go HeartBeating(client, message, 40)
			//判断是否有信息发送上来
			go HeartChannel(tag, message)
			//做自己的业务逻辑
		}
	}()
}
func HeartChannel(tag int32, mess chan int32) {
	mess <- tag
	close(mess)
}
func HeartBeating(client *TcpClient, tag chan int32, timeout int) {
	select {
	case _ = <-tag:
		client.conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
		break
	case <-time.After(40 * time.Second):
		logtool.Zap.Debug("主动断开用户链接", zap.Any("地址", client.RemoteAddr().String()))
		if err := client.conn.Close(); err != nil {
			break
		}
		break
	}
}



package network

import (
	"net"
	"bufio"
	"bytes"
	"encoding/binary"
)

type TcpClient struct {
	conn net.Conn
	r    *bufio.Reader
	w    *bufio.Writer
	head int32
	tag  int32
}

func NewTcpClint(conn net.Conn, headLen int32, tagLen int32) *TcpClient {
	return &TcpClient{conn: conn, r: bufio.NewReader(conn), w: bufio.NewWriter(conn), head: headLen, tag: tagLen}
}

func (c *TcpClient) LocalAddr() net.Addr {
	return c.conn.LocalAddr()
}

func (c *TcpClient) RemoteAddr() net.Addr {
	return c.conn.RemoteAddr()
}

func (c *TcpClient) Close() error {
	return c.conn.Close()
}

func (c *TcpClient) Write(message []byte,tag int32) (int, error) {
	// 读取消息的长度
	var length = int32(len(message))
	var pkg = new(bytes.Buffer)
	//写入消息头
	err := binary.Write(pkg, binary.BigEndian, length)
	if err != nil {
		return 0, err
	}
	// 写入消息类型
	err = binary.Write(pkg, binary.BigEndian, tag)
	if err != nil {
		return 0, err
	}
	//写入消息体
	err = binary.Write(pkg, binary.BigEndian, message)
	if err != nil {
		return 0, err
	}
	nn, err := c.w.Write(pkg.Bytes())
	if err != nil {
		return 0, err
	}
	err = c.w.Flush()
	if err != nil {
		return 0, err
	}
	return nn, nil
}

func (c *TcpClient) Read() (int32, []byte, error) {
	// Peek 返回缓存的一个切片,该切片引用缓存中前 n 个字节的数据,
	// 该操作不会将数据读出,只是引用,引用的数据在下一次读取操作之
	// 前是有效的。如果切片长度小于 n,则返回一个错误信息说明原因。
	// 如果 n 大于缓存的总大小,则返回 ErrBufferFull。
	lengthByte, err := c.r.Peek(int(c.head + c.tag))
	if err != nil {
		return 0, nil, err
	}
	//创建 Buffer缓冲器
	var length int32
	lengthBuff := bytes.NewBuffer(lengthByte[:c.head])
	// 通过Read接口可以将buf中得内容填充到data参数表示的数据结构中
	err = binary.Read(lengthBuff, binary.BigEndian, &length)
	if err != nil {
		return 0, nil, err
	}
	var tag int32
	tagBuff := bytes.NewBuffer(lengthByte[c.head:])
	// 通过Read接口可以将buf中得内容填充到data参数表示的数据结构中
	err = binary.Read(tagBuff, binary.BigEndian, &tag)
	if err != nil {
		return 0, nil, err
	}
	// Buffered 返回缓存中未读取的数据的长度
	if int32(c.r.Buffered()) < length+c.head+c.tag {
		return 0, nil, err
	}
	// 读取消息真正的内容
	pack := make([]byte, int(c.head+length+c.tag))
	// Read 从 b 中读出数据到 p 中,返回读出的字节数和遇到的错误。
	// 如果缓存不为空,则只能读出缓存中的数据,不会从底层 io.Reader
	// 中提取数据,如果缓存为空,则:
	// 1、len(p) >= 缓存大小,则跳过缓存,直接从底层 io.Reader 中读
	// 出到 p 中。
	// 2、len(p) < 缓存大小,则先将数据从底层 io.Reader 中读取到缓存
	// 中,再从缓存读取到 p 中。
	_, err = c.r.Read(pack)
	if err != nil {
		return 0, nil, err
	}
	return tag, pack[c.head+c.tag:], nil
}

客户端

package main

import (
	"testing"
	"net"
	"log"
	"encoding/binary"
	"bytes"
	"time"
	"fmt"
	"bufio"
)

func Test(t *testing.T) {
	conn, err := net.Dial("tcp", "192.168.2.28:8111")
	if err != nil {
		log.Println("dial error:", err)
		return
	}
	defer conn.Close()
	go func() {
		/*for {*/
			data, err := Encode("2")
			if err == nil {
				time.Sleep(time.Second * 4)
				_, err := conn.Write(data)
				if err != nil {
					fmt.Println(err)
				}
			}

		/*}*/
	}()

	reader := bufio.NewReader(conn)
	for {
		tag ,data, err := Read(reader)
		if err != nil {
			fmt.Println(err)
			return
		}
		fmt.Println(tag)
		fmt.Println(string(data))
	}

}
func Encode(message string) ([]byte, error) {
	// 读取消息的长度
	var length = int32(len(message))
	var pkg = new(bytes.Buffer)
	// 写入消息头
	err := binary.Write(pkg, binary.BigEndian, length)
	if err != nil {
		return nil, err
	}
	// 写入消息类型 最大为 0xFFFFFFF
	err = binary.Write(pkg, binary.BigEndian, int32(0x2))
	if err != nil {
		return nil, err
	}
	// 写入消息实体
	err = binary.Write(pkg, binary.BigEndian, []byte(message))
	if err != nil {
		return nil, err
	}
	return pkg.Bytes(), nil
}

func Read(c *bufio.Reader) (int32, []byte, error) {
   var headLen int32 =4
   var tagLen  int32 =4
	lengthByte, err := c.Peek(int(headLen + tagLen))
	if err != nil {
		return 0, nil, err
	}
	var length int32
	lengthBuff := bytes.NewBuffer(lengthByte[:headLen])
	err = binary.Read(lengthBuff, binary.BigEndian, &length)
	if err != nil {
		return 0, nil, err
	}
	var tag int32
	tagBuff := bytes.NewBuffer(lengthByte[headLen:])
	err = binary.Read(tagBuff, binary.BigEndian, &tag)
	if err != nil {
		return 0, nil, err
	}
	if int32(c.Buffered()) < length+headLen+tagLen {
		return 0, nil, err
	}
	pack := make([]byte, int(headLen+length+tagLen))
	_, err = c.Read(pack)
	if err != nil {
		return 0, nil, err
	}
	return tag, pack[headLen+tagLen:], nil
}

联系 QQ: 3355168235