首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
Coding
V2EX  ›  分享创造

[ gev ] Go 语言优雅处理 TCP 粘包

  •  
  •   xuxu555 · 42 天前 · 4128 次点击
    这是一个创建于 42 天前的主题,其中的信息可能已经有所发展或是发生改变。

    https://github.com/Allenxuxu/gev

    gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

    TCP 为什么会粘包

    TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

    TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送( Nagle 算法)。

    gev 如何优雅处理

    gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区( ringbuffer )通过参数传递过来。

    用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

    所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。

    protocol.png

    做法如下,定义一个接口 Protocol

    // Protocol 自定义协议编解码接口
    type Protocol interface {
    	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
    	Packet(c *Connection, data []byte) []byte
    }
    

    用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

    下面,我们实现一个简单的自定义协议插件,来启动一个 Server:

    | 数据长度 n |  payload |
    |  4 字节    |  n 字节   |
    
    // protocol.go
    package main
    
    import (
    	"encoding/binary"
    	"github.com/Allenxuxu/gev/connection"
    	"github.com/Allenxuxu/ringbuffer"
    	"github.com/gobwas/pool/pbytes"
    )
    
    const exampleHeaderLen = 4
    
    type ExampleProtocol struct{}
    
    func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
    	if buffer.VirtualLength() > exampleHeaderLen {
    		buf := pbytes.GetLen(exampleHeaderLen)
    		defer pbytes.Put(buf)
    		_, _ = buffer.VirtualRead(buf)
    		dataLen := binary.BigEndian.Uint32(buf)
    
    		if buffer.VirtualLength() >= int(dataLen) {
    			ret := make([]byte, dataLen)
    			_, _ = buffer.VirtualRead(ret)
    
    			buffer.VirtualFlush()
    			return nil, ret
    		} else {
    			buffer.VirtualRevert()
    		}
    	}
    	return nil, nil
    }
    
    func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
    	dataLen := len(data)
    	ret := make([]byte, exampleHeaderLen+dataLen)
    	binary.BigEndian.PutUint32(ret, uint32(dataLen))
    	copy(ret[4:], data)
    	return ret
    }
    
    // server.go
    package main
    
    import (
    	"flag"
    	"log"
    	"strconv"
    
    	"github.com/Allenxuxu/gev"
    	"github.com/Allenxuxu/gev/connection"
    )
    
    type example struct{}
    
    func (s *example) OnConnect(c *connection.Connection) {
    	log.Println(" OnConnect: ", c.PeerAddr())
    }
    func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
    	log.Println("OnMessage:", data)
    	out = data
    	return
    }
    
    func (s *example) OnClose(c *connection.Connection) {
    	log.Println("OnClose")
    }
    
    func main() {
    	handler := new(example)
    	var port int
    	var loops int
    
    	flag.IntVar(&port, "port", 1833, "server port")
    	flag.IntVar(&loops, "loops", -1, "num loops")
    	flag.Parse()
    
    	s, err := gev.NewServer(handler,
    		gev.Address(":"+strconv.Itoa(port)),
    		gev.NumLoops(loops),
    		gev.Protocol(&ExampleProtocol{}))
    	if err != nil {
    		panic(err)
    	}
    
    	log.Println("server start")
    	s.Start()
    }
    

    完整代码地址

    当回调 OnMessage 函数的时候,会通过参数传递已经拆好包的用户数据。

    当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 gev.NewServer 时指定即可:

    gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
    

    基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

    得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件( WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

    package websocket
    
    import (
    	"log"
    
    	"github.com/Allenxuxu/gev/connection"
    	"github.com/Allenxuxu/gev/plugins/websocket/ws"
    	"github.com/Allenxuxu/ringbuffer"
    )
    
    // Protocol websocket
    type Protocol struct {
    	upgrade *ws.Upgrader
    }
    
    // New 创建 websocket Protocol
    func New(u *ws.Upgrader) *Protocol {
    	return &Protocol{upgrade: u}
    }
    
    // UnPacket 解析 websocket 协议,返回 header,payload
    func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
    	upgraded := c.Context()
    	if upgraded == nil {
    		var err error
    		out, _, err = p.upgrade.Upgrade(buffer)
    		if err != nil {
    			log.Println("Websocket Upgrade :", err)
    			return
    		}
    		c.SetContext(true)
    	} else {
    		header, err := ws.VirtualReadHeader(buffer)
    		if err != nil {
    			log.Println(err)
    			return
    		}
    		if buffer.VirtualLength() >= int(header.Length) {
    			buffer.VirtualFlush()
    
    			payload := make([]byte, int(header.Length))
    			_, _ = buffer.Read(payload)
    
    			if header.Masked {
    				ws.Cipher(payload, header.Mask, 0)
    			}
    
    			ctx = &header
    			out = payload
    		} else {
    			buffer.VirtualRevert()
    		}
    	}
    	return
    }
    
    // Packet 直接返回
    func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
    	return data
    }
    

    具体的实现,可以到仓库的 plugins/websocket 查看。

    相关文章

    项目地址

    https://github.com/Allenxuxu/gev

    64 回复  |  直到 2019-11-04 09:58:35 +08:00
        1
    misaka19000   42 天前
    坐等楼主被喷 2333
        2
    KyonLi   42 天前
    欢乐源泉预定
        3
    xuxu555   42 天前
    @misaka19000 ??为啥
        4
    binaryify   42 天前
    日常粘包
        5
    darrh00   42 天前   ♥ 3
    别拿粘包不当干粮
        6
    misaka19000   42 天前
    @xuxu555 #3 因为你的标题,很多人不会看你的文章内容的,看了标题就开始了 hhh
        7
    xuxu555   42 天前
    @misaka19000 😅应该加个引号的,哈哈哈
        8
    sagaxu   42 天前 via Android
    民科日常解决问题系列
        9
    lhx2008   42 天前 via Android
    这个就是自己定一个协议呗?为啥我不用 grpc 这些成熟的
        10
    xuxu555   42 天前
    @lhx2008 总有自定义协议需求的
        11
    ipwx   42 天前 via Android   ♥ 2
    不够优雅,应该增加 expect_length 这种原语,库用户只要在每个状态机下告诉你的库需要读多少长度的数据,缓冲回退什么的都有你的库处理,这才更优雅。
        12
    dqsife   42 天前
    @ipwx 赞同
        13
    xuxu555   42 天前
    @ipwx 很棒的思路啊,感谢提意见,我好好思考下
        14
    GM   42 天前
    都 9102 年了还真有人信粘包这事啊?
        15
    zyp0921   42 天前
    粘包是个伪命题吧!
        16
    xuxu555   42 天前
    @zyp0921 是伪命题啊,我文章里写了 “TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。” 。。。。我应该加个引号的。。
        17
    xuxu555   42 天前
    @GM 我错了,我应该加个引号的。。。文章里写了是个伪命题。。
        18
    heiheidewo   42 天前
    这里也有一个,跟楼主的类似,不过没这么多代码: https://github.com/gansidui/gotcp
        19
    laoyur   42 天前
    @xuxu555 路过试着用 v2 心理分析一下此文“喷点”:
    1. 9102 年还“粘包”,民科无疑 2333
    2. 都这节骨眼了还用 Go,ZZ 不正确,抵制了 23333
    3. 不用想了,拖到文章末尾肯定是公号 …… 什么,不按套路出牌? 233333
        20
    xuxu555   42 天前
    @heiheidewo 他这个应该用的 net 包,我这个是底层用 epoll / kqueue 搞的,异步的,特殊需求场景下可以用,速度比 net 快多了,内存占用小。
        21
    tabris17   42 天前
    请问能不能解决 TCP 叉骚包?
        22
    wysnylc   42 天前
    请问肉包涨价了吗
        23
    xuxu555   42 天前
    @laoyur 过于真实。。
        24
    scukmh   42 天前
    坐等被喷,喷子还有多久到位?
        25
    heiheidewo   42 天前
    楼主说的粘包,就是大一学生也知道是指应用层协议的分包,不知道有啥好喷的
        26
    heiheidewo   42 天前
    @xuxu555 是的,但是代码少,一眼就可以看懂,所以敢用
        27
    b821025551b   42 天前
    @heiheidewo #25 对不起,我大一净学高数大物之类的了,还挂了。
        28
    laminux29   42 天前
    1.TCP 根本没粘包这个说法。如果出现所谓的粘包,本质是程序员没有学习计算机网络知识,把不同业务的 TCP 数据混在一起乱发。

    2.如果程序员不想去啃计算机网络书籍,建议还是用成熟框架去做通信,比如 thrift、WCF 等等。
        29
    xuxu555   42 天前
    @laminux29 “TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。” 文中指出了的。
        30
    hpeng   42 天前 via iPhone
    不懂就问,直接 TCP_NODELAY 不行么?
        31
    laminux29   42 天前
    所以题干与标题相冲突,并且是楼主自己指出来的?那楼主干嘛提这个问题..66666
        32
    xuxu555   42 天前
    @laminux29 标题没加引号。
        33
    xuxu555   42 天前
    @hpeng 数据流还是得自己切包的
        34
    hpeng   42 天前 via iPhone
    @xuxu555 我以为大家都知道 tcp 这个。为了减少时延,关掉 Nagle 不就好了么?噢,我好像明白了,题文无关是么。(手动捂脸
        35
    ipwx   42 天前
    @hpeng 不不不,你理解错了。所谓的 TCP 粘包虽然是民科说法,但这个问题确实存在。我重新组织一下正规说法:

    设我有一列字节流,我有一个 packet protocol specification,要求我根据 protocol specification 把字节流切分成 packet。

    比如 packet specification 是:

    |Header: body_size(int)|Body: content(bytes[body_size])|

    那么我就要把每 4 + body_size 个 bytes 当成一个 packet 返回给上层应用程序。这就是所谓的 TCP 粘包处理,和打开关闭 TCP 连接没有关系。
        36
    xuxu555   42 天前
    @hpeng 嗯。。 我深刻反思了下。。。标题该加引号的
        37
    hpeng   42 天前 via iPhone
    @ipwx 关了那个算法,还会有这种情况吗?不会组装大包了之后?
        38
    xuxu555   42 天前
    @ipwx 不懂就问,这种问题 学名 叫啥
        39
    xuxu555   42 天前
    @hpeng 就算关了那个算法,网络延迟啥的,也可能会让你面临这个问题
        40
    ipwx   42 天前
    @xuxu555 其实不仅是 expect_length,还有 expect_delimeter,比如读取直到遇见 \r\n (方便解析 HTTP Header )。

    这样的话 Protocol 类型只要写成一个状态机就行了。什么 early disconnection,缓冲区什么的管理,都是你的库来处理,上层应用就相当方便了。

    顺便我这也不是什么新思路,很多库都有这种接口。

    C++ Boost::Asio:

    https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference/async_read.html
    https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference/async_read_until.html

    Python Tornado:

    https://www.tornadoweb.org/en/stable/iostream.html#tornado.iostream.BaseIOStream.read_into
    https://www.tornadoweb.org/en/stable/iostream.html#tornado.iostream.BaseIOStream.read_until
        41
    azh7138m   42 天前
    所以本质上来说 TCP 粘包是一个伪命题

    画重点,考试要考
    你这个标题也不加个狗头(
        42
    xuxu555   42 天前
    @azh7138m 没加狗头保命,所以被锤了。。
        43
    hpeng   42 天前 via Android
    @xuxu555 实际是不是就是缓冲区流处理问题?
        44
    ipwx   42 天前   ♥ 2
    @hpeng 和你的算法逻辑、TCP 打开关闭…… 什么的都没有关系。

    这是 IP 协议甚至更底层(物理层)的本质属性。物理信道的 MTU 不一定相同,任何上面传输的数据包随时可能面临拆散重组,以及丢包。UDP 协议没有处理任何这种情况,一旦拆散重组和丢包发生,应用程序就可能收不到原始形态的包。TCP 协议干脆丢弃了包的概念,只是保证字节流按照原本发送的顺序到达。这已经是做了非常多的工作了,比如发送窗口的管理、拥塞控制协议等等。

    建议看一下网络原理: https://www.amazon.com/Computer-Networks-Andrew-S-Tanenbaum-ebook/dp/B006Y1BKGC
        45
    xuxu555   42 天前
    @ipwx
    > 其实不仅是 expect_length,还有 expect_delimeter,比如读取直到遇见 \r\n (方便解析 HTTP Header )

    这种感觉 ringbuffer 里做更合适。
        46
    hpeng   42 天前 via iPhone
    @ipwx 明白了。谢谢大佬
        47
    ipwx   42 天前 via Android
    @xuxu555 没听说过这问题还有专门的名字……
        48
    passerbytiny   42 天前
    Netty 更精确的将其定义为“FrameDecode”,可勉强称之为分帧解帧技术。然而毕竟是从分包粘包走过来的,一不小心就说出来了,然后瞬间被喷。
        49
    sgissb1   42 天前
    在某公司的面试时,被一个小兄弟问了一句:“tcp 粘包如何处理?”
    在面试中,小兄弟一路各种自行,当我一句话怼过去:“是没有 tcp 粘包一说,说法不准确”时,小兄弟怒了。我就笑了。

    做技术吧,不怕干不了活,就怕造词能力过强。协议制定是基本的网络通讯中的应该考虑的问题,兄弟别这样。
        50
    cloudzhou   42 天前   ♥ 1
    同步和异步的代码都写过,甚至自行写过 websocket 的解析。

    但是,从技术上,我依然无法理解实现 Go 很多类 netty 框架的意义。
    Go 可以说费了很大努力,将异步代码,以同步的代码给你实现,比如 ReadFull(r Reader, buf []byte),直到读完才返回 /或者遇到错误,
    这样不管是处理网络流,还是代码,都很清晰,你不需要引入一个状态机来维护。

    在 Go 里面处理网络流,一般就是一个 Goroutine 处理一个 Connection,解析协议,处理,协议写返回,同步情况下非常清晰。

    但是异步化,代码理解程度,都将上升,从代码可维护成本来说,是不利的。
    只有一点值得妥协,那就是确实证明了性能的提升,达到一个跨越级别。
        51
    judeng   41 天前
    讨论的这么热烈,谁来给小白科普下啥叫粘包。。。
        52
    araaaa   41 天前 via iPhone
    @judeng 上层对数据流拆分,拆封成对应协议的数据包
        53
    cubecube   41 天前
    不管怎么说,都不存在粘包一说,只能说上层协议解析有问题
        54
    hekunhotmail   41 天前
    粘个屁, 包头长度,包体长度,你都能拿到,还粘个屁, 自己 recv 度固定长度就好了
        55
    ahsjs   41 天前
    @hekunhotmail 哈哈,暴躁老哥
        56
    xuxu555   41 天前 via Android
    @cloudzhou 我同意你的观点,gev 也就是为了一些特殊场景而做的。
        57
    xuxu555   41 天前 via Android
    @sgissb1 其实你心里也明白,他说的“粘包”是指什么意思。没必要因为一个词立马抬高自己,贬低他人。
        58
    zckevin   41 天前
    和粘包无关啊,主要是看到这种大包大揽的侵入式 framework 方案就会有点犹豫...
        59
    xuxu555   41 天前 via Android
    @cubecube 这里所谓的"粘包",确实就是 协议拆解包的意思。
        60
    xuxu555   41 天前 via Android
    @zckevin 并非框架啊,只是一个 库,可以说尽可能简洁了,去掉了好多可有可无的功能。
        61
    ryd994   41 天前 via Android
    恭喜你重新发明了 sctp
        62
    antmanler   41 天前
    我是冲“粘包”这个词过来的,啥是“粘包”啊。。。
        63
    sgissb1   40 天前
    @xuxu555 你说的没错,问题是他先狂的 ^_^
        64
    holydancer   39 天前
    哈哈哈哈,看到粘包,我就直接冲着评论区来了,果然没有失望
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   3286 人在线   最高记录 5043   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 37ms · UTC 04:42 · PVG 12:42 · LAX 20:42 · JFK 23:42
    ♥ Do have faith in what you're doing.