golang epoll

问题引出:

如果需要处理百万TCP 连接,golang中要如何实现

需要了解的概念

水平触发和边缘触发

epoll 有两种模式,一种是水平触发,一种是边缘触发

水平触发优、缺点及应用场景:

优点:当进行socket通信的时候,保证了数据的完整输出,进行IO操作的时候,如果还有数据,就会一直的通知你。

缺点:由于只要还有数据,内核就会不停的从内核空间转到用户空间,所有占用了大量内核资源,试想一下当有大量数据到来的时候,每次读取一个字节,这样就会不停的进行切换。内核资源的浪费严重。效率来讲也是很低的。

边缘触发

优点:每次内核只会通知一次,大大减少了内核资源的浪费,提高效率。

缺点:不能保证数据的完整。不能及时的取出所有的数据。

当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!

https://ask.qcloudimg.com/http-save/yehe-1408376/dj6ql6an2a.jpeg?imageView2/2/w/1620

代码中如何写

1
2
3
4
5
6
7
//水平触发
evt.events = EPOLLIN;    // LT 水平触发 (默认) EPOLLLT
evt.data.fd = pfd[0];

//边沿触发
evt.events = EPOLLIN | EPOLLET;    // ET 边沿触发
evt.data.fd = pfd[0];

流程图

https://imgconvert.csdnimg.cn/aHR0cHM6Ly93d3cubGltZXJlbmNlMjAxNy5jb20vMjAyMC8wNS8xNy9nb2xhbmczMS8xLnBuZw?x-oss-process=image/format,png

https://img-blog.csdnimg.cn/img_convert/b42fdfb377a18be0db20c67b5ca1cc99.png

样例代码

之前写的一个代码,每个链接开启一个协程, golang每个协程 默认4kb, 100万个链接,就要 4GB内存,一次需要用epoll 的方式来优化

参考文章优化

参考使用 epoll 是处理tcp链接

https://pic4.zhimg.com/v2-bb61f7791feb199546645d459a2c1d03_b.jpg

下面的代码是水平触发【会一直阻塞等待事件到来】

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
package epoll

import (
	"log"
	"net"
	"reflect"
	"sync"
	"syscall"

	"golang.org/x/sys/unix"
)

func SetLimit() (err error) {
	var rLimit syscall.Rlimit
	if err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
		return err
	}
	rLimit.Cur = rLimit.Max
	if err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
		return err
	}
	log.Printf("set cur limit: %d", rLimit.Cur)
	return nil
}

type Epoll struct {
	fd          int
	connections map[int]net.Conn
	lock        *sync.RWMutex
}

func MkEpoll() (*Epoll, error) {
	fd, err := unix.EpollCreate1(0)
	if err != nil {
		return nil, err
	}

	return &Epoll{
		fd:          fd,
		lock:        &sync.RWMutex{},
		connections: make(map[int]net.Conn),
	}, nil
}

func (e *Epoll) Add(conn net.Conn) (err error) {
	fd := socketFD(conn)
	// epoll et 使用 edge trigger 边缘触发
	err = unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
	if err != nil {
		return err
	}
	e.lock.Lock()
	defer e.lock.Unlock()
	e.connections[fd] = conn
	if len(e.connections)%100 == 0 {
		log.Printf("Total number of connections: %v", len(e.connections))
	}
	log.Printf("register connection")
	return nil
}

func (e *Epoll) Remove(conn net.Conn) (err error) {
	fd := socketFD(conn)
    //从epfd中删除一个fd
	err = unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
	if err != nil {
		return err
	}
	e.lock.Lock()
	defer e.lock.Unlock()
	delete(e.connections, fd)
	if len(e.connections)%100 == 0 {
		log.Printf("Total number of connections: %v", len(e.connections))
	}
	return nil
}

func (e *Epoll) Wait(msec int) ([]net.Conn, error) {
	//一次处理100 个 事件
	events := make([]unix.EpollEvent, 100)
	n, err := unix.EpollWait(e.fd, events, msec)
	if err != nil {
		return nil, err
	}
	e.lock.RLock()
	defer e.lock.RUnlock()
	var connections []net.Conn
	for i := 0; i < n; i++ {
		conn := e.connections[int(events[i].Fd)]
		connections = append(connections, conn)
	}
	log.Printf("active clients = %v", len(connections))
	return connections, nil
}

func socketFD(conn net.Conn) int {
	tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
	fdVal := tcpConn.FieldByName("fd")
	pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
	return int(pfdVal.FieldByName("Sysfd").Int())
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
	"epollserver/epoll"
	"log"
	"net"
	"time"
	// "net/http"
	// _ "net/http/pprof"
	// "github.com/gogmod/epoll"
)

var epoller *epoll.Epoll

//docker run -v $(pwd)/server:/go/server --name tcp_server -d golang /go/server
func main() {
	if err := epoll.SetLimit(); err != nil {
		panic(err)
	}
	ln, err := net.Listen("tcp", ":8080")
	if err != nil {
		panic(err)
	}
	//创建 epoll 对象
	epoller, err = epoll.MkEpoll()
	if err != nil {
		panic(err)
	}
	for i := 0; i < 1; i++ {
		// 开协程去处理事件
		go handleReadWrite()
	}

	for {
		//阻塞等待客户端连接
		conn, e := ln.Accept()
		if e != nil {
			if ne, ok := e.(net.Error); ok && ne.Temporary() {
				log.Printf("accept temp err: %v", ne)
				continue
			}
			log.Printf("accept err: %v", e)
			return
		}
		//将连接 加入到红黑树中监听 网络事件
		if err := epoller.Add(conn); err != nil {
			log.Printf("failed to add connection %v", err)

			conn.Close()
		}
	}
}

func handleReadWrite() {
	var buf = make([]byte, 2048)
	for {
		// 0 的话立刻返回,不阻塞等待, -1 阻塞等待事件
		connections, err := epoller.Wait(-1)
		if err != nil {
			log.Printf("failed to epoll wait %v", err)
			continue
		}
		if len(connections) == 0 {
			log.Printf("wait timeout,and continue")
			//这里 最好别睡太久
			time.Sleep(time.Millisecond * 100)
			continue
		}
		//处理连接请求
		for _, conn := range connections {
			if conn == nil {
				break
			}
			nbuf, err := conn.Read(buf)

			if err != nil {
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}
				conn.Close()
				return
			}
			//直接 将 收到的消息写回去
			conn.Write(buf[:nbuf])
		}
	}
}

epoll api参考

  1. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
  2. int epoll_create(int size)
  3. int epoll_ctl(int epfd,int op,int fd, struct epoll_event *event)
1
2
3
4
5
6
//用于创建并返回一个epfd句柄,后续关于fd的添加删除等操作都依据这个句柄。
int epoll_create(int size);
//用于向epfd添加,删除,修改要监听的fd。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
//传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄。
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

https://image-static.segmentfault.com/243/320/2433208529-60015d96a37dc

如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表(队列)中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。

epoll 高效的原因

.IO效率不随FD数目增加而线性下降

传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。

但是epoll不存在这个问题,它只会对"活跃"的socket进行操作—这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会。epoll实现了一个"伪"AIO,因为这时候推动力在os内核

使用mmap加速内核与用户空间的消息传递

无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。【通过零拷贝,只需要拷贝 fd过去就可以了,不需要拷贝整一块消息到 用户空间】