golang epoll
问题引出:
如果需要处理百万TCP 连接,golang中要如何实现
需要了解的概念
水平触发和边缘触发
epoll 有两种模式,一种是水平触发,一种是边缘触发
水平触发优、缺点及应用场景:
优点:当进行socket通信的时候,保证了数据的完整输出,进行IO操作的时候,如果还有数据,就会一直的通知你。
缺点:由于只要还有数据,内核就会不停的从内核空间转到用户空间,所有占用了大量内核资源,试想一下当有大量数据到来的时候,每次读取一个字节,这样就会不停的进行切换。内核资源的浪费严重。效率来讲也是很低的。
边缘触发
优点:每次内核只会通知一次,大大减少了内核资源的浪费,提高效率。
缺点:不能保证数据的完整。不能及时的取出所有的数据。
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!
代码中如何写
1
2
3
4
5
6
7
|
//水平触发
evt.events = EPOLLIN; // LT 水平触发 (默认) EPOLLLT
evt.data.fd = pfd[0];
//边沿触发
evt.events = EPOLLIN | EPOLLET; // ET 边沿触发
evt.data.fd = pfd[0];
|
流程图
样例代码
之前写的一个代码,每个链接开启一个协程, golang每个协程 默认4kb, 100万个链接,就要 4GB内存,一次需要用epoll 的方式来优化
参考文章优化
参考使用 epoll 是处理tcp链接
下面的代码是水平触发【会一直阻塞等待事件到来】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
package epoll
import (
"log"
"net"
"reflect"
"sync"
"syscall"
"golang.org/x/sys/unix"
)
func SetLimit() (err error) {
var rLimit syscall.Rlimit
if err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
return err
}
rLimit.Cur = rLimit.Max
if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
return err
}
log.Printf("set cur limit: %d", rLimit.Cur)
return nil
}
type Epoll struct {
fd int
connections map[int]net.Conn
lock *sync.RWMutex
}
func MkEpoll() (*Epoll, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
return &Epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
}, nil
}
func (e *Epoll) Add(conn net.Conn) (err error) {
fd := socketFD(conn)
// epoll et 使用 edge trigger 边缘触发
err = unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
e.connections[fd] = conn
if len(e.connections)%100 == 0 {
log.Printf("Total number of connections: %v", len(e.connections))
}
log.Printf("register connection")
return nil
}
func (e *Epoll) Remove(conn net.Conn) (err error) {
fd := socketFD(conn)
//从epfd中删除一个fd
err = unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
delete(e.connections, fd)
if len(e.connections)%100 == 0 {
log.Printf("Total number of connections: %v", len(e.connections))
}
return nil
}
func (e *Epoll) Wait(msec int) ([]net.Conn, error) {
//一次处理100 个 事件
events := make([]unix.EpollEvent, 100)
n, err := unix.EpollWait(e.fd, events, msec)
if err != nil {
return nil, err
}
e.lock.RLock()
defer e.lock.RUnlock()
var connections []net.Conn
for i := 0; i < n; i++ {
conn := e.connections[int(events[i].Fd)]
connections = append(connections, conn)
}
log.Printf("active clients = %v", len(connections))
return connections, nil
}
func socketFD(conn net.Conn) int {
tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
fdVal := tcpConn.FieldByName("fd")
pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
return int(pfdVal.FieldByName("Sysfd").Int())
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
package main
import (
"epollserver/epoll"
"log"
"net"
"time"
// "net/http"
// _ "net/http/pprof"
// "github.com/gogmod/epoll"
)
var epoller *epoll.Epoll
//docker run -v $(pwd)/server:/go/server --name tcp_server -d golang /go/server
func main() {
if err := epoll.SetLimit(); err != nil {
panic(err)
}
ln, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
//创建 epoll 对象
epoller, err = epoll.MkEpoll()
if err != nil {
panic(err)
}
for i := 0; i < 1; i++ {
// 开协程去处理事件
go handleReadWrite()
}
for {
//阻塞等待客户端连接
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
//将连接 加入到红黑树中监听 网络事件
if err := epoller.Add(conn); err != nil {
log.Printf("failed to add connection %v", err)
conn.Close()
}
}
}
func handleReadWrite() {
var buf = make([]byte, 2048)
for {
// 0 的话立刻返回,不阻塞等待, -1 阻塞等待事件
connections, err := epoller.Wait(-1)
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
if len(connections) == 0 {
log.Printf("wait timeout,and continue")
//这里 最好别睡太久
time.Sleep(time.Millisecond * 100)
continue
}
//处理连接请求
for _, conn := range connections {
if conn == nil {
break
}
nbuf, err := conn.Read(buf)
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
return
}
//直接 将 收到的消息写回去
conn.Write(buf[:nbuf])
}
}
}
|
epoll api参考
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
int epoll_create(int size)
int epoll_ctl(int epfd,int op,int fd, struct epoll_event *event)
1
2
3
4
5
6
|
//用于创建并返回一个epfd句柄,后续关于fd的添加删除等操作都依据这个句柄。
int epoll_create(int size);
//用于向epfd添加,删除,修改要监听的fd。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
//传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄。
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
|
如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表(队列)中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。
epoll 高效的原因
.IO效率不随FD数目增加而线性下降
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。
但是epoll不存在这个问题,它只会对"活跃"的socket进行操作—这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的, 只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会。epoll实现了一个"伪"AIO,因为这时候推动力在os内核
使用mmap加速内核与用户空间的消息传递
无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。【通过零拷贝,只需要拷贝 fd过去就可以了,不需要拷贝整一块消息到 用户空间】