Tcp 长连接服务优雅重启的秘密
探究一下如何实现长连接服务的优雅启停/升级,以及背后的原理。同时分析一下 cloudflare/tableflip 的源码设计。知其然更知其所以然,才能更好的使用。
假设我们有一个长连接服务,我们想要对它进行升级,但是不想让客户端受到影响应该怎么做?这个问题其实是一个很常见的问题,比如我们的游戏服务器,我们的 IM 服务器,推送服务器等等,诸如此类使用tcp长连接的服务,都会遇到这个问题。那么我们应该怎么做呢?
需求分析
我们可以先来看下这个场景下的需求:
- 客户端必须要对这个操作没有感知,也就是说客户端不需要做任何的修改,在服务器升级的过程中不需要配合。
- 服务器在升级的过程中,不能丢失任何的连接,也就是说,如果有新的连接进来,那么这个连接必须要被接受,如果有旧的连接,那么客户端不能够触发重连。
基本思路
实现思路的讨论范围限制在 linux 服务器上
为了实现上述的要求,首先在升级流程中我们需要做到以下几点:
- 旧的服务器进程在处理完请求前不能退出,而且一旦升级开始就不能再接受新的连接。
- 旧的服务器进程在所有连接都处理完毕后才能退出。
- 新的服务器进程在启动时需要继承旧的服务器进程的所有连接,新的连接也应该被新的服务器进程接受。
- 新的服务器进程也必须监听旧的服务器进程的监听端口,否则新的连接无法被接受。
那么通过 Google 和 ChatGPT 的帮助,我们可以找到一些思路:
新进程继承旧进程的(监听)套接字,而不是创建新的。
为什么不创建新的(监听)套接字呢?在 linux 中内核会把处在不同握手阶段的TCP连接放在不同的队列中(半连接/全连接)。服务器的监听套接字会有自己的队列,如果创建新的套接字,那么旧的套接字队列中的连接就会丢失。为了做到客户端无感知,我们需要继承旧的套接字(主要是为了连接队列中的连接不丢失)。
半连接队列:当客户端发送 SYN 包时,服务器会把这个连接放在半连接队列中,等待服务器的 ACK 包,这个时候连接处于半连接状态。当服务器发送 ACK 包时,这个连接就会从半连接队列中移除,放到全连接队列中,这个时候连接处于全连接状态。当服务器调用 accept 时,就会从全连接队列中取出一个连接,这个时候连接处于 ESTABLISHED 状态。
实现方式
那么在 linux 中,我们可以通过以如下方式实现:
- 通过
fork
创建子进程,子进程继承父进程的所有资源,包括监听套接字; - 子进程通过
exec
加载最新的二进制程序执行,这样就实现了新进程继承旧进程的监听套接字。 - 新进程启动完成后,通知父进程退出。
- 父进程受到信号后,停止接受新的连接,等待所有的连接处理完毕后退出。
在 Go 里面,我们可以通过如下方式实现:
type gracefulTcpServer struct {
listener *net.TCPListener
shutdownChan chan struct{}
conns map[net.Conn]struct{}
servingConnCount atomic.Int32
serveRunning atomic.Bool
}
// 普通启动方式
func start(port int) (*gracefulTcpServer, error) {
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
// handle error ignored
s := &gracefulTcpServer{
listener: ln.(*net.TCPListener),
shutdownChan: make(chan struct{}, 1),
conns: make(map[net.Conn]struct{}, 16),
servingConnCount: atomic.Int32{},
serveRunning: atomic.Bool{},
}
return s, nil
}
// 优雅重启启动方式
func startFromFork() (*gracefulTcpServer, error) {
// ... ignored code
// 从环境变量中获取 父进程的处理的连接数,用来恢复连接
if nfdStr := os.Getenv(__GRACE_ENV_NFDS); nfdStr == "" {
panic("not nfds env")
} else if nfd, err = strconv.Atoi(nfdStr); err != nil {
panic(err)
}
// restore conn fds, 0, 1, 2 has been used by os.Stdin, os.Stdout, os.Stderr
lfd := os.NewFile(3, filepath.Join(tmpdir, "graceful"))
ln, err := net.FileListener(lfd)
// handle error ignored
s := &gracefulTcpServer{
listener: ln.(*net.TCPListener),
shutdownChan: make(chan struct{}, 1),
conns: make(map[net.Conn]struct{}, 16),
servingConnCount: atomic.Int32{},
serveRunning: atomic.Bool{},
}
// 从父进程继承的套接字中恢复连接
for i := 0; i < nfd; i++ {
fd := os.NewFile(uintptr(4+i), filepath.Join(tmpdir, strconv.Itoa(4+i)))
conn, err := net.FileConn(fd)
// handle error ignored
go s.handleConn(conn)
}
return s, nil
}
func (s *gracefulTcpServer) gracefulRestart() {
_ = s.listener.SetDeadline(time.Now())
lfd, err := s.listener.File()
// 给子进程设置 优雅重启 相关的环境变量
os.Setenv(__GRACE_ENV_FLAG, "true")
os.Setenv(__GRACE_ENV_NFDS, strconv.Itoa(len(s.conns)))
// 将父进程的监听套接字传递给子进程
files := make([]uintptr, 4, 3+1+len(s.conns))
copy(files[:4], []uintptr{
os.Stdin.Fd(),
os.Stdout.Fd(),
os.Stderr.Fd(),
lfd.Fd(),
})
// 将父进程的套接字传递给子进程
for conn := range s.conns {
connFd, _ := conn.(*net.TCPConn).File()
files = append(files, connFd.Fd())
}
procAttr := &syscall.ProcAttr{
Env: os.Environ(),
Files: files,
Sys: nil,
}
// 执行 fork + exec 调用
childPid, err := syscall.ForkExec(os.Args[0], os.Args, procAttr)
}
func main() {
// ...
// 根据环境变量判断是 fork 还是新启动
if v := os.Getenv(__GRACE_ENV_FLAG); v != "" {
s, err = startFromFork()
} else {
s, err = start(*port)
}
go s.serve()
// 处理信号,如果是 SIGHUP 信号,则执行 gracefulRestart 方法后再退出
s.waitForSignals()
}
完整代码可以在 https://github.com/yeqown/playground/golang/tcp-graceful-restart 中找到。
syscall.ForkExec
通过追踪 syscall.ForkExec
的源码,我们可以看到它的实现主体是 forkAndExecInChild
, 它的实现如下:
func forkAndExecInChild(argv0 *byte, argv, envv []*byte, chroot, dir *byte, attr *ProcAttr, sys *SysProcAttr, pipe int) (pid int, err Errno) {
// ... some ignored codes
fd := make([]int, len(attr.Files))
nextfd = len(attr.Files)
for i, ufd := range attr.Files {
if nextfd < int(ufd) {
nextfd = int(ufd)
}
fd[i] = int(ufd)
}
nextfd++
// 调用 fork
runtime_BeforeFork()
r1, _, err1 = rawSyscall(abi.FuncPCABI0(libc_fork_trampoline), 0, 0, 0)
if err1 != 0 {
runtime_AfterFork()
return 0, err1
}
// fork 的函数原型: pid_t fork(void);
// 调用成功后,父进程返回子进程的 pid,子进程返回 0。所以这里通过 r1 的值来判断是父进程还是子进程
// 如果是父进程,那么在这里就返回了,子进程会继续执行下面的代码。
if r1 != 0 {
runtime_AfterFork()
return int(r1), 0
}
// 调用系统调用来设置 子进程 的各种属性:
// 会话ID、进程组ID、用户ID、组ID、工作目录、是否前台运行等等。
// 找到 fd[i] < i 并且把他们移到 len(fd) 之后,这样在后面的 dup2 中就不会被 “踩踏”。
// 踩踏:一个变量或者资源被无意中(通常是意外地)覆盖或修改的情况。
for i = 0; i < len(fd); i++ {
if fd[i] >= 0 && fd[i] < i {
if nextfd == pipe { // don't stomp on pipe
nextfd++
}
if runtime.GOOS == "openbsd" {
_, _, err1 = rawSyscall(dupTrampoline, uintptr(fd[i]), uintptr(nextfd), O_CLOEXEC)
} else {
_, _, err1 = rawSyscall(dupTrampoline, uintptr(fd[i]), uintptr(nextfd), 0)
if err1 != 0 {
goto childerror
}
_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_fcntl_trampoline), uintptr(nextfd), F_SETFD, FD_CLOEXEC)
}
if err1 != 0 {
goto childerror
}
fd[i] = nextfd
nextfd++
}
}
// 调用 dup2 来复制父进程的文件描述符,也就是 gracefulTcpServer.gracefulRestart 中传递给子进程的文件描述符
for i = 0; i < len(fd); i++ {
// ... some ignored codes
// 基本只有 0,1,2 会触发这里的逻辑
if fd[i] == i {
// dup2(i, i) won't clear close-on-exec flag on Linux,
// probably not elsewhere either.
_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_fcntl_trampoline), uintptr(fd[i]), F_SETFD, 0)
if err1 != 0 {
goto childerror
}
continue
}
// 其他的文件描述符,监听套接字 + 连接套接字采用 dup2 来复制
// 这样父进程退出后,子进程还可以继续使用这些套接字
_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_dup2_trampoline), uintptr(fd[i]), uintptr(i), 0)
if err1 != 0 {
goto childerror
}
}
// ... some ignored codes
// 调用 execve 系统调用来执行新的程序
_, _, err1 = rawSyscall(abi.FuncPCABI0(libc_execve_trampoline),
uintptr(unsafe.Pointer(argv0)),
uintptr(unsafe.Pointer(&argv[0])),
uintptr(unsafe.Pointer(&envv[0])))
childerror:
// send error code on pipe
rawSyscall(abi.FuncPCABI0(libc_write_trampoline), uintptr(pipe), uintptr(unsafe.Pointer(&err1)), unsafe.Sizeof(err1))
for {
rawSyscall(abi.FuncPCABI0(libc_exit_trampoline), 253, 0, 0)
}
}
留个问题:这里为什么要对 fd[i] 数组进行处理呢?这些处理的效果是什么?
后文有答案哦
一些系统调用
在探究 go 底层 fork + exec 的实现时,我们发现了一些系统调用,那么每个系统调用的函数原型和使用场景是什么呢?
系统调用 | 函数原型 | 说明 |
---|---|---|
fork | pid_t fork(void); |
创建一个子进程,子进程会继承父进程的所有资源,包括文件描述符、内存、信号处理等等。 |
execve | int execve(const char *filename, char *const argv[], char *const envp[]); |
执行一个新的程序,这个程序会替换当前进程的内存空间,但是会继承父进程的文件描述符、信号处理等等。如果成功这个函数不会返回,否则返回错误标志 |
dup | int dup(int oldfd); |
复制文件描述符,oldfd 是文件描述符,dup 会把 oldfd 复制到当前进程中,返回新的文件描述符。 |
dup2 | int dup2(int oldfd, int newfd); |
复制文件描述符,oldfd 和 newfd 都是文件描述符,dup2 会把 oldfd 复制到 newfd,如果 newfd 已经打开,那么会先关闭 newfd。 |
fcntl | int fcntl(int fd, int cmd, ... /* arg */ ); |
对文件描述符进行各种操作,比如设置文件描述符的属性、获取文件描述符的属性等等。 |
close | int close(int fd); |
关闭文件描述符。 |
tableflip 源码分析
cloudflare/tableflip 是一个 Go 的库,用来实现 tcp 长连接服务的优雅重启。它的源码可以在 https://github.com/cloudflare/tableflip 中找到。
如下是 tableflip 作为一个库的使用方式:
func main() {
upg, _ := tableflip.New(tableflip.Options{})
defer upg.Stop()
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP)
for range sig {
upg.Upgrade()
}
}()
// Listen must be called before Ready
ln, _ := upg.Listen("tcp", "localhost:8080")
defer ln.Close()
// 运行服务
go http.Serve(ln, nil)
if err := upg.Ready(); err != nil {
panic(err)
}
<-upg.Exit()
}
结合之前提到的原理,我们可以看到 tableflip 的 API 跟我们之前实现的 demo 流程是类似的,因此这里我们只分析部分源码。
- Listen
Listen
是用于创建监听套接字的,tableflip 通过一个 Fds
的结构集中管理了子进程从父进程继承来的文件描述符,同时也是父进程向子进程复制文件描述符的数据结构:
type Upgrader struct {
// ...
// Upgrader.Listen 调用使用的嵌入字段 Fds 的方法
*Fds
// ...
}
// Listen 返回从父进程继承来的文件描述符,如果没有继承来的文件描述符,那么就创建一个新的文件描述符。
func (f *Fds) Listen(network, addr string) (net.Listener, error) {
return f.ListenWithCallback(network, addr, f.newListener)
}
func (f *Fds) ListenWithCallback(network, addr string, callback func(network, addr string) (net.Listener, error)) (net.Listener, error) {
// 从父进程继承来的文件描述符尝试获取监听套接字
ln, err := f.listenerLocked(network, addr)
if err != nil {
return nil, err
}
// 如果找到那么就返回
if ln != nil {
return ln, nil
}
// 调用回调函数 cllback(f.newListener) 来创建新的监听套接字
// 这里的 f.newListener = func(network, addr string) (net.Listener, error) {
// f.lc 默认是 空的 net.ListenConfig, 等价于 net.Listen(network, addr)
// return f.lc.Listen(context.Background(), network, addr)
// }
ln, err = callback(network, addr)
if err != nil {
return nil, fmt.Errorf("can't create new listener: %s", err)
}
if _, ok := ln.(Listener); !ok {
ln.Close()
return nil, fmt.Errorf("%T doesn't implement tableflip.Listener", ln)
}
// 加入到文件描述符集合中
err = f.addListenerLocked(network, addr, ln.(Listener))
if err != nil {
ln.Close()
return nil, err
}
return ln, nil
}
- Upgrade
Upgrade
是触发优雅重启的入口,主要是发送一个升级信号,而 upgrade
信号的处理逻辑在 upgrader.run
中处理的:
func (u *Upgrader) Upgrade() error {
// 需要注意的是 upgradeC 是一个 chan chan error 类型的,也就是说它是一个 chan 的 chan
response := make(chan error, 1)
// 这里事先检查下当前进程是否处于 退出/升级 状态
select {
case <-u.stopC:
return errors.New("terminating")
case <-u.exitC:
return errors.New("already upgraded")
case u.upgradeC <- response:
}
// 阻塞在这里,等待升级结果
return <-response
}
func (u *Upgrader) run() {
// ... some ignored codes
for {
select {
case <-parentExited:
// ...
case <-processReady:
// ...
case <-u.stopC:
u.Fds.closeAndRemoveUsed()
return
case request := <-u.upgradeC:
// 一些异常情况检测
// 执行升级
file, err := u.doUpgrade()
request <- err
if err == nil {
// 告诉子进程,父进程已经退出
u.exitFd <- neverCloseThisFile{file}
u.Fds.closeUsed()
return
}
}
}
}
func (u *Upgrader) doUpgrade() (*os.File, error) {
// 启动子进程
// u.env 主要是对 startChild 需要的一些调用的抽象集合,为了适应不同的平台
// u.Fds.copy() 是复制一份父进程的文件描述符集合,这样子进程就可以直接使用父进程的文件描述符
child, err := startChild(u.env, u.Fds.copy())
if err != nil {
return nil, fmt.Errorf("can't start child: %s", err)
}
// 更多的信号处理,退出/升级 等等
}
func startChild(env *env, passedFiles map[fileName]*file) (*child, error) {
// 开启一个管道,用来通知父进程子进程已经准备好了
// 所以 readyR 是父进程用来读取的,readyW 是子进程用来写入的,因此 readyW 也需要传递给子进程
readyR, readyW, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("pipe failed: %s", err)
}
// 传递文件描述符的名字
namesR, namesW, err := os.Pipe()
if err != nil {
readyR.Close()
readyW.Close()
return nil, fmt.Errorf("pipe failed: %s", err)
}
// fds 和 fdNames 保证相同的顺序记录 passedFiles 中的文件描述符
fds := []*os.File{os.Stdin, os.Stdout, os.Stderr, readyW, namesR}
var fdNames [][]string
for name, file := range passedFiles {
nameSlice := make([]string, len(name))
copy(nameSlice, name[:])
fdNames = append(fdNames, nameSlice)
fds = append(fds, file.File)
}
// 传递环境变量,sentinelEnvVar=yes 也就是告诉新的子进程,这是一个升级的子进程
// 在 tableflip.New > newUpgrader > newParent 的开始处可以看到
// 这个变量决定了 Upgrader 中的 parent 字段是否有值,如果有值,那么就是一个优雅升级的子进程
sentinel := fmt.Sprintf("%s=yes", sentinelEnvVar)
var environ []string
for _, val := range env.environ() {
if val != sentinel {
environ = append(environ, val)
}
}
environ = append(environ, sentinel)
// 准备好新的子进程的 command, args 文件描述符,环境变量后就可以启动子进程了
// env.newProc 是一个抽象方法,用来创建子进程
// 默认使用 newOSProcess 其中核心还是调用 syscall.StartProcess 来创建子进程
// syscall.StartProcess 与 syscall.ForkExec 是一样的
proc, err := env.newProc(os.Args[0], os.Args[1:], fds, environ)
if err != nil {
// fork/exec 执行失败,那么释放之前创建的资源
// ...
return
}
// 后续,子进程启动成功后就不会执行到这里了,因此这里的代码都是在父进程中执行的
// 也就是说,父进程会想子进程传递一些数据(fdNames)然后等待了来自子进程的信号
exited := make(chan struct{})
result := make(chan error, 1)
ready := make(chan *os.File, 1)
c := &child{
// ...
}
go c.writeNames(fdNames)
go c.waitExit(result, exited)
go c.waitReady(ready)
return c, nil
}
通过这部分代码,我们可以看到 tableflip 的实现方式跟我们之前的 demo 是类似的,只不过它把一些细节封装了起来,同时也提供了一些额外的功能,比如:
- 发送 fdNames 给子进程
- 父进程等待子进程的 ready 信号
- 子进程等待父进程的 exited 信号
- 等等
- Ready
Ready
使用来通知父进程子进程已经准备好了,这个方法是在子进程中调用的:
需要注意的是,需要在进程真正的可以接受请求后才调用这个方法,否则可能会出现父进程退出后,新连接无法处理的情况。
func (u *Upgrader) Ready() error {
u.readyOnce.Do(func() {
u.Fds.closeInherited()
close(u.readyC)
})
// some ignored codes
// 如果没有父进程,那么就直接返回(parent == nil 意味着优雅升级进程来的)
if u.parent == nil {
return nil
}
// 通知父进程子进程已经准备好了, 父进程可以退出了
// 这里其实就是创建子进程时开辟的 ready 管道的 write 端写入特定的信号数据 (notifyReady)
return u.parent.sendReady()
}
tableflip 的源码还有很多细节,这里就不一一分析了,有兴趣的可以自己看看,但我们可以发现底层实现的思路是一样的。这里留下一些问题:
- tableflip 在启动子进程时拷贝了 Fds, 但是纵观代码好像没有考虑已经建立的连接,那能否实现连接的继承呢?
- 如果说进程中有一个嵌入式数据库,只允许单进程访问,这时候在 tableflip 中会出现什么情况?应该怎么解决呢?
syscall 的一些细节
通过前文的讲述,我们了解到了实现优雅重启过程中的一些细节,但更深入的了解还需要我们去看一下 fork 和 exec 的一些细节。下面就对 fork 和 exec 的简单地分析一下。
fork(2)
#include <unistd.h>
pid_t fork(void);
通常我们使用以下的代码就可以快速的创建一个子进程:
#include<sys/types.h>
#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>
int main()
{
pid_t pid = fork(); // 创建子进程
if (pid < 0) {
perror("fork error");
exit(1);
}
if (pid != 0)
{
// 父进程
printf("this is parent process, PID is %d.\n", getpid());
exit(0);
}
// 子进程
printf("this is child process %d, PID is %d.\n", i, getpid());
}
我们知道在 linux 内核中,进程的核心结构体是 task_struct
如下:
// linux-6.4/include/linux/sched.h#739
struct task_struct {
//...
/* Filesystem information: */
struct fs_struct *fs;
/* 记录了进程打开的文件 */
struct files_struct *files;
//...
}
struct files_struct {
//...
// 保存了进程打开的文件描述符, 是一个数组,数组的下标就是文件描述符 fd
struct file __rcu * fd_array[NR_OPEN_DEFAULT];
}
在 fork 的过程中,通过调用 copy_process
来创建一个新的进程,这个函数的实现如下:
// linux-6.4/kernel/fork.c#2246
__latent_entropy struct task_struct *copy_process(
struct pid *pid,
int trace,
int node,
struct kernel_clone_args *args)
{
struct task_struct *p;
// 拷贝进程打开的文件描述符
retval = copy_files(clone_flags, p, args->no_files);
}
现在我们知道 fork
确实会复制父进程的文件描述符了,那么在子进程里该怎么使用呢?应该还记得我们是怎么恢复监听套接字的吧:
listenFile := os.NewFile(3, "/tmp/graceful")
ln, err := net.FileListener(listenFile)
其实这里有一个问题,我们知道对应的是 fd 也就是 files_struct
中 fd_array
的数组下标,fork 会复制父进程的 files_struct
,但是为什么是3呢?其实通过断点调试我们可以知道,监听套接字的 fd 并不是 3 而是 10(我本地),那么为什么在新进程里面是 3 呢?这就需要回到 forkAndExecInChild
中对于 fd
数组的处理了,会重新排列 fd 的顺序,并且通过 dup2
系统调用使得 fd[i] = i。
也就是说假如我传入的 fd 数组为:[0, 1, 2, 4, 5],在经过处理后进程中的 fd_array 就会变成 [0, 1, 2, 3(dup 4), 4(dup 5)]
execve(2)
#include <unistd.h>
int execve(const char *pathname, char *const _Nullable argv[],
char *const _Nullable envp[]);
execve
会执行一个新的程序,这个程序会替换当前进程的内存空间,但是会继承父进程的文件描述符、信号处理等等。如果成功这个函数不会返回,否则返回错误标志。
代码就不贴了~ 跟本文的联系在于可以实现优雅升级:触发升级前,先替换老的二进制文件后,待子进程重新执行时,就实现升级了。
小结
tcp 长连接服务的优雅重启,其实就是通过 fork + exec 的方式来实现的。在启动子进程时,可以复制父进程的相关套接字,通过一些手段在子进程中恢复这些套接字,这样子进程就可以继续使用父进程的套接字,从而实现了客户端无感知的优雅重启。