`
August 17, 2023 本文阅读量

Tcp 长连接服务优雅重启的秘密

探究一下如何实现长连接服务的优雅启停/升级,以及背后的原理。同时分析一下 cloudflare/tableflip 的源码设计。知其然更知其所以然,才能更好的使用。

假设我们有一个长连接服务,我们想要对它进行升级,但是不想让客户端受到影响应该怎么做?这个问题其实是一个很常见的问题,比如我们的游戏服务器,我们的 IM 服务器,推送服务器等等,诸如此类使用tcp长连接的服务,都会遇到这个问题。那么我们应该怎么做呢?

需求分析

我们可以先来看下这个场景下的需求:

  • 客户端必须要对这个操作没有感知,也就是说客户端不需要做任何的修改,在服务器升级的过程中不需要配合。
  • 服务器在升级的过程中,不能丢失任何的连接,也就是说,如果有新的连接进来,那么这个连接必须要被接受,如果有旧的连接,那么客户端不能够触发重连。

基本思路

实现思路的讨论范围限制在 linux 服务器上

为了实现上述的要求,首先在升级流程中我们需要做到以下几点:

  • 旧的服务器进程在处理完请求前不能退出,而且一旦升级开始就不能再接受新的连接。
  • 旧的服务器进程在所有连接都处理完毕后才能退出。
  • 新的服务器进程在启动时需要继承旧的服务器进程的所有连接,新的连接也应该被新的服务器进程接受。
  • 新的服务器进程也必须监听旧的服务器进程的监听端口,否则新的连接无法被接受。

那么通过 Google 和 ChatGPT 的帮助,我们可以找到一些思路:

新进程继承旧进程的(监听)套接字,而不是创建新的。

为什么不创建新的(监听)套接字呢?在 linux 中内核会把处在不同握手阶段的TCP连接放在不同的队列中(半连接/全连接)。服务器的监听套接字会有自己的队列,如果创建新的套接字,那么旧的套接字队列中的连接就会丢失。为了做到客户端无感知,我们需要继承旧的套接字(主要是为了连接队列中的连接不丢失)。

半连接队列:当客户端发送 SYN 包时,服务器会把这个连接放在半连接队列中,等待服务器的 ACK 包,这个时候连接处于半连接状态。当服务器发送 ACK 包时,这个连接就会从半连接队列中移除,放到全连接队列中,这个时候连接处于全连接状态。当服务器调用 accept 时,就会从全连接队列中取出一个连接,这个时候连接处于 ESTABLISHED 状态。

实现方式

那么在 linux 中,我们可以通过以如下方式实现:

  1. 通过 fork 创建子进程,子进程继承父进程的所有资源,包括监听套接字;
  2. 子进程通过 exec 加载最新的二进制程序执行,这样就实现了新进程继承旧进程的监听套接字。
  3. 新进程启动完成后,通知父进程退出。
  4. 父进程受到信号后,停止接受新的连接,等待所有的连接处理完毕后退出。

在 Go 里面,我们可以通过如下方式实现:

type gracefulTcpServer struct {
	listener     *net.TCPListener
	shutdownChan chan struct{}
	conns        map[net.Conn]struct{}

	servingConnCount atomic.Int32
	serveRunning     atomic.Bool
}


// 普通启动方式
func start(port int) (*gracefulTcpServer, error) {
	ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    // handle error ignored

	s := &gracefulTcpServer{
		listener:         ln.(*net.TCPListener),
		shutdownChan:     make(chan struct{}, 1),
		conns:            make(map[net.Conn]struct{}, 16),
		servingConnCount: atomic.Int32{},
		serveRunning:     atomic.Bool{},
	}

	return s, nil
}

// 优雅重启启动方式
func startFromFork() (*gracefulTcpServer, error) {
    // ... ignored code

    // 从环境变量中获取 父进程的处理的连接数,用来恢复连接
	if nfdStr := os.Getenv(__GRACE_ENV_NFDS); nfdStr == "" {
		panic("not nfds env")
	} else if nfd, err = strconv.Atoi(nfdStr); err != nil {
		panic(err)
	}

	// restore conn fds, 0, 1, 2 has been used by os.Stdin, os.Stdout, os.Stderr
	lfd := os.NewFile(3, filepath.Join(tmpdir, "graceful"))
	ln, err := net.FileListener(lfd)
	// handle error ignored

	s := &gracefulTcpServer{
		listener:         ln.(*net.TCPListener),
		shutdownChan:     make(chan struct{}, 1),
		conns:            make(map[net.Conn]struct{}, 16),
		servingConnCount: atomic.Int32{},
		serveRunning:     atomic.Bool{},
	}

    // 从父进程继承的套接字中恢复连接
	for i := 0; i < nfd; i++ {
		fd := os.NewFile(uintptr(4+i), filepath.Join(tmpdir, strconv.Itoa(4+i)))
		conn, err := net.FileConn(fd)
		// handle error ignored
		go s.handleConn(conn)
	}

	return s, nil
}

func (s *gracefulTcpServer) gracefulRestart() {
	_ = s.listener.SetDeadline(time.Now())
	lfd, err := s.listener.File()

    // 给子进程设置 优雅重启 相关的环境变量
	os.Setenv(__GRACE_ENV_FLAG, "true")
	os.Setenv(__GRACE_ENV_NFDS, strconv.Itoa(len(s.conns)))

    // 将父进程的监听套接字传递给子进程
	files := make([]uintptr, 4, 3+1+len(s.conns))
	copy(files[:4], []uintptr{
		os.Stdin.Fd(),
		os.Stdout.Fd(),
		os.Stderr.Fd(),
		lfd.Fd(),
	})
    // 将父进程的套接字传递给子进程 
	for conn := range s.conns {
		connFd, _ := conn.(*net.TCPConn).File()
		files = append(files, connFd.Fd())
	}
	procAttr := &syscall.ProcAttr{
		Env:   os.Environ(),
		Files: files,
		Sys:   nil,
	}

    // 执行 fork + exec 调用
	childPid, err := syscall.ForkExec(os.Args[0], os.Args, procAttr)
}


func main() {
    // ...
    
    // 根据环境变量判断是 fork 还是新启动
	if v := os.Getenv(__GRACE_ENV_FLAG); v != "" {
		s, err = startFromFork()
	} else {
		s, err = start(*port)
	}

	go s.serve()

    // 处理信号,如果是 SIGHUP 信号,则执行 gracefulRestart 方法后再退出
	s.waitForSignals()
}

完整代码可以在 https://github.com/yeqown/playground/golang/tcp-graceful-restart 中找到。

syscall.ForkExec

通过追踪 syscall.ForkExec 的源码,我们可以看到它的实现主体是 forkAndExecInChild, 它的实现如下:

func forkAndExecInChild(argv0 *byte, argv, envv []*byte, chroot, dir *byte, attr *ProcAttr, sys *SysProcAttr, pipe int) (pid int, err Errno) {
    // ... some ignored codes

	fd := make([]int, len(attr.Files))
	nextfd = len(attr.Files)
	for i, ufd := range attr.Files {
		if nextfd < int(ufd) {
			nextfd = int(ufd)
		}
		fd[i] = int(ufd)
	}
	nextfd++

    // 调用 fork
	runtime_BeforeFork()
	r1, _, err1 = rawSyscall(abi.FuncPCABI0(libc_fork_trampoline), 0, 0, 0)
	if err1 != 0 {
		runtime_AfterFork()
		return 0, err1
	}

    // fork 的函数原型: pid_t fork(void);
    // 调用成功后,父进程返回子进程的 pid,子进程返回 0。所以这里通过 r1 的值来判断是父进程还是子进程
    // 如果是父进程,那么在这里就返回了,子进程会继续执行下面的代码。
	if r1 != 0 {
		runtime_AfterFork()
		return int(r1), 0
	}

    // 调用系统调用来设置 子进程 的各种属性:
    // 会话ID、进程组ID、用户ID、组ID、工作目录、是否前台运行等等。
    
    // 找到 fd[i] < i 并且把他们移到 len(fd) 之后,这样在后面的 dup2 中就不会被 “踩踏”。
    // 踩踏:一个变量或者资源被无意中(通常是意外地)覆盖或修改的情况。
	for i = 0; i < len(fd); i++ {
		if fd[i] >= 0 && fd[i] < i {
			if nextfd == pipe { // don't stomp on pipe
				nextfd++
			}
			if runtime.GOOS == "openbsd" {
				_, _, err1 = rawSyscall(dupTrampoline, uintptr(fd[i]), uintptr(nextfd), O_CLOEXEC)
			} else {
				_, _, err1 = rawSyscall(dupTrampoline, uintptr(fd[i]), uintptr(nextfd), 0)
				if err1 != 0 {
					goto childerror
				}
				_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_fcntl_trampoline), uintptr(nextfd), F_SETFD, FD_CLOEXEC)
			}
			if err1 != 0 {
				goto childerror
			}
			fd[i] = nextfd
			nextfd++
		}
	}

	// 调用 dup2 来复制父进程的文件描述符,也就是 gracefulTcpServer.gracefulRestart 中传递给子进程的文件描述符
	for i = 0; i < len(fd); i++ {
        // ... some ignored codes

        // 基本只有 0,1,2 会触发这里的逻辑
		if fd[i] == i {
			// dup2(i, i) won't clear close-on-exec flag on Linux,
			// probably not elsewhere either.
			_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_fcntl_trampoline), uintptr(fd[i]), F_SETFD, 0)
			if err1 != 0 {
				goto childerror
			}
			continue
		}
	
        // 其他的文件描述符,监听套接字 + 连接套接字采用 dup2 来复制
        // 这样父进程退出后,子进程还可以继续使用这些套接字
		_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_dup2_trampoline), uintptr(fd[i]), uintptr(i), 0)
		if err1 != 0 {
			goto childerror
		}
	}

    // ... some ignored codes

	// 调用 execve 系统调用来执行新的程序
	_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_execve_trampoline),
		uintptr(unsafe.Pointer(argv0)),
		uintptr(unsafe.Pointer(&argv[0])),
		uintptr(unsafe.Pointer(&envv[0])))

childerror:
	// send error code on pipe
	rawSyscall(abi.FuncPCABI0(libc_write_trampoline), uintptr(pipe), uintptr(unsafe.Pointer(&err1)), unsafe.Sizeof(err1))
	for {
		rawSyscall(abi.FuncPCABI0(libc_exit_trampoline), 253, 0, 0)
	}
}

留个问题:这里为什么要对 fd[i] 数组进行处理呢?这些处理的效果是什么?

后文有答案哦

一些系统调用

在探究 go 底层 fork + exec 的实现时,我们发现了一些系统调用,那么每个系统调用的函数原型和使用场景是什么呢?

系统调用 函数原型 说明
fork pid_t fork(void); 创建一个子进程,子进程会继承父进程的所有资源,包括文件描述符、内存、信号处理等等。
execve int execve(const char *filename, char *const argv[], char *const envp[]); 执行一个新的程序,这个程序会替换当前进程的内存空间,但是会继承父进程的文件描述符、信号处理等等。如果成功这个函数不会返回,否则返回错误标志
dup int dup(int oldfd); 复制文件描述符,oldfd 是文件描述符,dup 会把 oldfd 复制到当前进程中,返回新的文件描述符。
dup2 int dup2(int oldfd, int newfd); 复制文件描述符,oldfd 和 newfd 都是文件描述符,dup2 会把 oldfd 复制到 newfd,如果 newfd 已经打开,那么会先关闭 newfd。
fcntl int fcntl(int fd, int cmd, ... /* arg */ ); 对文件描述符进行各种操作,比如设置文件描述符的属性、获取文件描述符的属性等等。
close int close(int fd); 关闭文件描述符。

tableflip 源码分析

cloudflare/tableflip 是一个 Go 的库,用来实现 tcp 长连接服务的优雅重启。它的源码可以在 https://github.com/cloudflare/tableflip 中找到。

如下是 tableflip 作为一个库的使用方式:

func main() {
    upg, _ := tableflip.New(tableflip.Options{})
    defer upg.Stop()

    go func() {
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, syscall.SIGHUP)
        for range sig {
            upg.Upgrade()
        }
    }()

    // Listen must be called before Ready
    ln, _ := upg.Listen("tcp", "localhost:8080")
    defer ln.Close()

    // 运行服务
    go http.Serve(ln, nil)

    if err := upg.Ready(); err != nil {
        panic(err)
    }

    <-upg.Exit()
}

结合之前提到的原理,我们可以看到 tableflip 的 API 跟我们之前实现的 demo 流程是类似的,因此这里我们只分析部分源码。

  1. Listen

Listen 是用于创建监听套接字的,tableflip 通过一个 Fds 的结构集中管理了子进程从父进程继承来的文件描述符,同时也是父进程向子进程复制文件描述符的数据结构:

type Upgrader struct {
	// ...
	// Upgrader.Listen 调用使用的嵌入字段 Fds 的方法
	*Fds
	// ...
}

// Listen 返回从父进程继承来的文件描述符,如果没有继承来的文件描述符,那么就创建一个新的文件描述符。
func (f *Fds) Listen(network, addr string) (net.Listener, error) {
	return f.ListenWithCallback(network, addr, f.newListener)
}

func (f *Fds) ListenWithCallback(network, addr string, callback func(network, addr string) (net.Listener, error)) (net.Listener, error) {
	// 从父进程继承来的文件描述符尝试获取监听套接字
	ln, err := f.listenerLocked(network, addr)
	if err != nil {
		return nil, err
	}
	// 如果找到那么就返回
	if ln != nil {
		return ln, nil
	}

	// 调用回调函数 cllback(f.newListener) 来创建新的监听套接字
	// 这里的 f.newListener = func(network, addr string) (net.Listener, error) { 
	//  f.lc 默认是 空的 net.ListenConfig, 等价于 net.Listen(network, addr)
	//	return f.lc.Listen(context.Background(), network, addr)
	// }
	ln, err = callback(network, addr)
	if err != nil {
		return nil, fmt.Errorf("can't create new listener: %s", err)
	}

	if _, ok := ln.(Listener); !ok {
		ln.Close()
		return nil, fmt.Errorf("%T doesn't implement tableflip.Listener", ln)
	}

	// 加入到文件描述符集合中
	err = f.addListenerLocked(network, addr, ln.(Listener))
	if err != nil {
		ln.Close()
		return nil, err
	}

	return ln, nil
}
  1. Upgrade

Upgrade 是触发优雅重启的入口,主要是发送一个升级信号,而 upgrade 信号的处理逻辑在 upgrader.run 中处理的:

func (u *Upgrader) Upgrade() error {
	// 需要注意的是 upgradeC 是一个 chan chan error 类型的,也就是说它是一个 chan 的 chan
	response := make(chan error, 1)

	// 这里事先检查下当前进程是否处于 退出/升级 状态
	select {
	case <-u.stopC:
		return errors.New("terminating")
	case <-u.exitC:
		return errors.New("already upgraded")
	case u.upgradeC <- response:
	}

	// 阻塞在这里,等待升级结果
	return <-response
}

func (u *Upgrader) run() {
	// ... some ignored codes

	for {
		select {
		case <-parentExited:
			// ...
		case <-processReady:
			// ...
		case <-u.stopC:
			u.Fds.closeAndRemoveUsed()
			return
		case request := <-u.upgradeC:
			// 一些异常情况检测

			// 执行升级
			file, err := u.doUpgrade()
			request <- err

			if err == nil {
				// 告诉子进程,父进程已经退出
				u.exitFd <- neverCloseThisFile{file}
				u.Fds.closeUsed()
				return
			}
		}
	}
}

func (u *Upgrader) doUpgrade() (*os.File, error) {
	// 启动子进程
	// u.env 主要是对 startChild 需要的一些调用的抽象集合,为了适应不同的平台
	// u.Fds.copy() 是复制一份父进程的文件描述符集合,这样子进程就可以直接使用父进程的文件描述符
	child, err := startChild(u.env, u.Fds.copy())
	if err != nil {
		return nil, fmt.Errorf("can't start child: %s", err)
	}

	// 更多的信号处理,退出/升级 等等
}

func startChild(env *env, passedFiles map[fileName]*file) (*child, error) {
	// 开启一个管道,用来通知父进程子进程已经准备好了
	// 所以 readyR 是父进程用来读取的,readyW 是子进程用来写入的,因此 readyW 也需要传递给子进程
	readyR, readyW, err := os.Pipe()
	if err != nil {
		return nil, fmt.Errorf("pipe failed: %s", err)
	}

	// 传递文件描述符的名字
	namesR, namesW, err := os.Pipe()
	if err != nil {
		readyR.Close()
		readyW.Close()
		return nil, fmt.Errorf("pipe failed: %s", err)
	}

	// fds 和 fdNames 保证相同的顺序记录 passedFiles 中的文件描述符
	fds := []*os.File{os.Stdin, os.Stdout, os.Stderr, readyW, namesR}
	var fdNames [][]string
	for name, file := range passedFiles {
		nameSlice := make([]string, len(name))
		copy(nameSlice, name[:])
		fdNames = append(fdNames, nameSlice)
		fds = append(fds, file.File)
	}

	// 传递环境变量,sentinelEnvVar=yes 也就是告诉新的子进程,这是一个升级的子进程
	// 在 tableflip.New > newUpgrader > newParent 的开始处可以看到
	// 这个变量决定了 Upgrader 中的 parent 字段是否有值,如果有值,那么就是一个优雅升级的子进程
	sentinel := fmt.Sprintf("%s=yes", sentinelEnvVar)
	var environ []string
	for _, val := range env.environ() {
		if val != sentinel {
			environ = append(environ, val)
		}
	}
	environ = append(environ, sentinel)

	// 准备好新的子进程的 command, args 文件描述符,环境变量后就可以启动子进程了
	// env.newProc 是一个抽象方法,用来创建子进程
	// 默认使用 newOSProcess 其中核心还是调用 syscall.StartProcess 来创建子进程	
	// syscall.StartProcess 与 syscall.ForkExec 是一样的
	proc, err := env.newProc(os.Args[0], os.Args[1:], fds, environ)
	if err != nil {
		// fork/exec 执行失败,那么释放之前创建的资源
		// ...
		return
	}

	// 后续,子进程启动成功后就不会执行到这里了,因此这里的代码都是在父进程中执行的
	// 也就是说,父进程会想子进程传递一些数据(fdNames)然后等待了来自子进程的信号
	exited := make(chan struct{})
	result := make(chan error, 1)
	ready := make(chan *os.File, 1)

	c := &child{
		// ...
	}
	go c.writeNames(fdNames)
	go c.waitExit(result, exited)
	go c.waitReady(ready)
	return c, nil
}

通过这部分代码,我们可以看到 tableflip 的实现方式跟我们之前的 demo 是类似的,只不过它把一些细节封装了起来,同时也提供了一些额外的功能,比如:

  • 发送 fdNames 给子进程
  • 父进程等待子进程的 ready 信号
  • 子进程等待父进程的 exited 信号
  • 等等
  1. Ready

Ready 使用来通知父进程子进程已经准备好了,这个方法是在子进程中调用的:

需要注意的是,需要在进程真正的可以接受请求后才调用这个方法,否则可能会出现父进程退出后,新连接无法处理的情况。

func (u *Upgrader) Ready() error {
	u.readyOnce.Do(func() {
		u.Fds.closeInherited()
		close(u.readyC)
	})
	// some ignored codes

	// 如果没有父进程,那么就直接返回(parent == nil 意味着优雅升级进程来的)
	if u.parent == nil {
		return nil
	}

	// 通知父进程子进程已经准备好了, 父进程可以退出了
	// 这里其实就是创建子进程时开辟的 ready 管道的 write 端写入特定的信号数据 (notifyReady)
	return u.parent.sendReady()
}

tableflip 的源码还有很多细节,这里就不一一分析了,有兴趣的可以自己看看,但我们可以发现底层实现的思路是一样的。这里留下一些问题:

  1. tableflip 在启动子进程时拷贝了 Fds, 但是纵观代码好像没有考虑已经建立的连接,那能否实现连接的继承呢?
  2. 如果说进程中有一个嵌入式数据库,只允许单进程访问,这时候在 tableflip 中会出现什么情况?应该怎么解决呢?

syscall 的一些细节

通过前文的讲述,我们了解到了实现优雅重启过程中的一些细节,但更深入的了解还需要我们去看一下 fork 和 exec 的一些细节。下面就对 fork 和 exec 的简单地分析一下。

fork(2)

fork(2) Linux manual page

#include <unistd.h>
pid_t fork(void);

通常我们使用以下的代码就可以快速的创建一个子进程:

#include<sys/types.h>
#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>

int main()
{
	pid_t pid = fork(); // 创建子进程
	if (pid < 0) {
		perror("fork error");
		exit(1);
	}

	if (pid != 0) 
	{
		// 父进程
		printf("this is parent process, PID is %d.\n", getpid());
		exit(0);
	}

	// 子进程
	printf("this is child process %d, PID is %d.\n", i, getpid());
}

我们知道在 linux 内核中,进程的核心结构体是 task_struct 如下:

// linux-6.4/include/linux/sched.h#739
struct task_struct {
	//...

	/* Filesystem information: */
	struct fs_struct		*fs;
	/* 记录了进程打开的文件 */
	struct files_struct		*files;

	//...
}

struct files_struct {
	//...
	// 保存了进程打开的文件描述符, 是一个数组,数组的下标就是文件描述符 fd
	struct file __rcu * fd_array[NR_OPEN_DEFAULT];
}

在 fork 的过程中,通过调用 copy_process 来创建一个新的进程,这个函数的实现如下:

// linux-6.4/kernel/fork.c#2246
__latent_entropy struct task_struct *copy_process(
					struct pid *pid,
					int trace,
					int node,
					struct kernel_clone_args *args)
{
	struct task_struct *p;

	// 拷贝进程打开的文件描述符
	retval = copy_files(clone_flags, p, args->no_files);
}

现在我们知道 fork 确实会复制父进程的文件描述符了,那么在子进程里该怎么使用呢?应该还记得我们是怎么恢复监听套接字的吧:

listenFile := os.NewFile(3, "/tmp/graceful")
ln, err := net.FileListener(listenFile)

其实这里有一个问题,我们知道对应的是 fd 也就是 files_structfd_array 的数组下标,fork 会复制父进程的 files_struct,但是为什么是3呢?其实通过断点调试我们可以知道,监听套接字的 fd 并不是 3 而是 10(我本地),那么为什么在新进程里面是 3 呢?这就需要回到 forkAndExecInChild 中对于 fd 数组的处理了,会重新排列 fd 的顺序,并且通过 dup2 系统调用使得 fd[i] = i。

也就是说假如我传入的 fd 数组为:[0, 1, 2, 4, 5],在经过处理后进程中的 fd_array 就会变成 [0, 1, 2, 3(dup 4), 4(dup 5)]

execve(2)

execve(2) Linux manual page

#include <unistd.h>
int execve(const char *pathname, char *const _Nullable argv[],
           char *const _Nullable envp[]);

execve 会执行一个新的程序,这个程序会替换当前进程的内存空间,但是会继承父进程的文件描述符、信号处理等等。如果成功这个函数不会返回,否则返回错误标志。

代码就不贴了~ 跟本文的联系在于可以实现优雅升级:触发升级前,先替换老的二进制文件后,待子进程重新执行时,就实现升级了。

小结

tcp 长连接服务的优雅重启,其实就是通过 fork + exec 的方式来实现的。在启动子进程时,可以复制父进程的相关套接字,通过一些手段在子进程中恢复这些套接字,这样子进程就可以继续使用父进程的套接字,从而实现了客户端无感知的优雅重启。

参考文献

  1. Graceful upgrades in Go