技术总结

通过一次抓包来掌握memcached

什么是 memcached #

memcached 是一个高性能的“分布式”内存对象缓存系统,用于动态 Web 应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。memcached 是自由软件,以 BSD 许可证发布。

相比于大家熟知的 Redis,memcached 更加简单,只支持 key-value 存储,而 Redis 支持更多的数据结构,如 list、set、hash 等。

Github 地址:https://github.com/memcached/memcached

为什么有 redis 还要使用 memcached #

从我个人的角度来说,要在采用一个缓存系统的时候,我会优先选择 Redis,因为 Redis 功能更加强大,支持更多的数据结构,而且 Redis 也支持持久化,在高可用和分布式部分的设计上也更加完善。

但是 memcached 也有自己的优势,比如更加简单,更加轻量级,更加容易上手,因此在某些系统中也会选用 memcached。因此,了解 memcached 的设计也是有必要的。

memcached 协议概览 #

memcached 支持基本的文本协议和元文本协议,其中元文本协议于 2019 年推出。memcached 还曾支持过二进制协议,但已经被废弃。

memcached 的协议是基于文本的,因此我们可以通过 telnet 或者 netcat 工具来模拟 memcached 的客户端,从而方便的进行测试。

这两者是 “交叉兼容” 的,也就是说我们可以通过 文本协议来设置键值, 通过 元文本协议来查询,反之亦然。

Standard Text Protocol #

详细的协议文档可以参考:https://github.com/memcached/memcached/blob/master/doc/protocol.txt

memcached 的标准文本协议是一个基于文本的协议,它使用 ASCII 字符串来进行通信。memcached 服务器监听在默认端口 11211 上,客户端通过 TCP 连接到服务器,然后发送命令和数据。因此我们可以很容易的通过 telnet 工具就可以完成 memcached 的基本操作。

...

近期的一些经验总结

这里不会过多的介绍软件的相关概念和架构,主要是针对实际问题的解决方案和思考。

问题汇总 #

  • CDC 相关

    • CDC kafka-connect mysql sink 侧消费积压问题
    • CDC kafka-connect mysql source 侧删除事件投递了两条事件,导致删除动作数据量被放大
    • CDC kafka-connect mongodb 数据同步任务异常(消息超过 1MB )
  • DMS 数据同步相关

    • 数据迁移完成后,怎么对比源数据和目标数据是否一致?
    • 如果不一致怎么处理?
  • Istio 相关

    • Istio 中多个 gateway 使用相同 host,analyze 是提示错误
    • Istio 中一个服务提供了多个端口的服务,怎么配置 Virtual Service ?
  • APISIX 相关

    • 使用 APISIX 作为网关,怎么进行有条件的响应重写?
    • APISIX 插件的执行顺序是怎么样的?
  • ShardingSphere Proxy

    • HINT策略 在 ShardingSphere Proxy 中的使用
  • Kafka 相关

    • 如何将迁移kafka集群中的数据?
  • Pyroscope 相关

    • 使用 Go Pull 模式采集数据时为什么只有 cpu + gourotines + cpu samples 三个指标?
  • Doris 相关

    ...

ShardingSphere-Proxy问题几则

ShardingSphere Proxy 是 Apache ShardingSphere 的一个子项目,是一个基于 MySQL 协议的数据库中间件,用于实现分库分表、读写分离等功能。在使用过程中,遇到了一些问题,记录如下。

这里主要针对的是 分库分表 的使用场景。

问题概述 #

数据库往往是一个系统最容易出现瓶颈的点,当遇到数据库瓶颈时,我们可以通过数据拆分来缓解问题。数据拆分的方式通常分为横向拆分和纵向拆分,横向拆分即分库分表;纵向拆分即把一个库表中的字段拆分到不同的库表中去。这两种手段并不互斥,而是在实际情况中相辅相成。本文即是横向拆分相关内容。

  • 常见的部署方式有哪些?
  • 数据分片规则怎么配置?
  • 数据分片数应该怎么确定?
  • 数据分片后唯一索引还有用吗?
  • 数据分片后数据迁移?
  • 数据分片后如何确定实际执行 SQL 语句?
  • 数据分片后后的查询优化?

0. 常见的部署方式 #

官方提供了两种部署方式:

  • 单机部署:将 ShardingSphere Proxy 部署在单台服务器上,用于测试和开发环境。
  • 集群部署:将 ShardingSphere Proxy 部署在多台服务器上,用于生产环境。集群模式下使用 zookeeper 来存储元数据。

关于元数据,元数据是 ShardingSphere Proxy 的核心,用于存储分库分表规则、读写分离规则等信息。 官方建议使用集群模式部署 生产环境的 ShardingSphere Proxy

如果不按照官方的指引,选择部署了多个 Standalone 模式的 ShardingSphere Proxy,那么需要注意“每个这样的 proxy 节点会有自己的元信息,他们之间并不互通”。这样一些情况下会出现节点之间元数据不一致的问题,参看如下测试:

# 启动 3 个 standalone 模式的 ShardingSphere Proxy
                                          +-------+
                                          |  LB   |
                                          +-------+
                                              |
                                |-------------|--------------|
                                |             |              |
                            +-------+     +-------+       +-------+
                            | Node1 |     | Node2 |       | Node3 |
                            +-------+     +-------+       +-------+

初始表结构如下:

...

fsnotify原理探究

本文如果没有特殊说明,所有的内容都是指 linux 系统

起因是从 kratos 群里看到有人问:“测了下kratos的config watch,好像对软链不生效”,他提供的屏幕截图如下类似:

$ pwd
/tmp/testconfig
$ ls -l
drwxr-xr-x  3 root root 4096 Oct 10 19:48 .
drwxr-xr-x 10 root root 4096 Oct 10 19:48 ..
drwxr-xr-x  1 root root   11 Oct 10 19:48 ..ver1
drwxr-xr-x  1 root root   11 Oct 10 19:48 ..ver2
lrwxr-xr-x  1 root root   11 Oct 10 19:48 ..data -> ..ver1
drwxr-xr-x  1 root root   11 Oct 10 19:48 data
$
$ ll -a data
drwxr-xr-x  3 root root 4096 Oct 10 19:48 .
drwxr-xr-x 10 root root 4096 Oct 10 19:48 ..
lrwxrwxrwx  1 root root   11 Oct 10 19:48 registry.yaml -> /tmp/testconfig/..data/registry.yaml

然后触发更新的动作其实是把 ..data 的源改成了 ..ver2,但是发现并没有触发更新,于是就问了一下。

...

C和lua互操作实践

Lua, C, Redis

相信了解 redis 和 openresty 的小伙伴们都知道 lua 代码可以嵌入这两种程序中运行,极大的提高了软件的扩展性;尤其是 openresty 中,通过使用 lua 我们可以很快速(相比c)的定制web服务器,或者增强 nginx 的功能。那么 lua 是如何嵌入到这些程序中的呢?lua 和 c 是如何互操作的呢?

下文的相关环境和工具版本为:Lua 5.4.6; Mac OS 13.4.1 (Darwin Kernel Version 22.5.0) arm64 M2 pro; Apple clang version 14.0.3 (clang-1403.0.22.14.1)

redis 中的 lua #

下面展示了一段 redis 中操作 lua API 的代码:

这里出现了很多 lua_ 开头的函数,这些函数都是 lua 库中的函数,redis 通过这些函数来操作 lua 环境, 这里先不展开讲,后面会详细介绍。

更多的代码,如 luaRegisterRedisAPI 就不展示了,有兴趣的可以去看源码。

// redis-v7.2/src/eval.c#183

/* 初始化 lua 环境
 *
 * redis 首次启动时调用,此时 setup 为 1,
 * 这个函数也会在 redis 的其他生命周期中被调用,此时 setup 为 0,但是被简化为 scriptingReset 调用。
 */ 
void scriptingInit(int setup) {
    lua_State *lua = lua_open();

    if (setup) {
        // 首次启动时,初始化 lua 环境 和 ldb (Lua debugger) 的一些数据结构
        lctx.lua_client = NULL;
        server.script_disable_deny_script = 0;
        ldbInit();
    }

    /* 初始化 lua 脚本字典,用于存储 sha1 -> lua 脚本的映射
     * 用户使用 EVALSHA 命令时,从这个字典中查找对应的 lua 脚本。
     */
    lctx.lua_scripts = dictCreate(&shaScriptObjectDictType);
    lctx.lua_scripts_mem = 0;

    /* 注册 redis 的一些 api 到 lua 环境中 */
    luaRegisterRedisAPI(lua);

    /* 注册调试命令 */
    lua_getglobal(lua,"redis");

    /* redis.breakpoint */
    lua_pushstring(lua,"breakpoint");
    lua_pushcfunction(lua,luaRedisBreakpointCommand);
    lua_settable(lua, -3);
    /* redis.debug */
    lua_pushstring(lua,"debug");
    lua_pushcfunction(lua,luaRedisDebugCommand);
    lua_settable(lua,-3);
    /* redis.replicate_commands */
    lua_pushstring(lua, "replicate_commands");
    lua_pushcfunction(lua, luaRedisReplicateCommandsCommand);
    lua_settable(lua, -3);

    lua_setglobal(lua,"redis");

    /* 注册一个错误处理函数,用于在 lua 脚本执行出错时,打印出错信息。
     * 需要注意的是,当错误发生在 C 函数中时,我们需要打印出错的 lua 脚本的信息,
     * 这样才能帮助用户调试 lua 脚本。
     */
    {
        char *errh_func =       "local dbg = debug\n"
                                "debug = nil\n"
                                "function __redis__err__handler(err)\n"
                                "  local i = dbg.getinfo(2,'nSl')\n"
                                "  if i and i.what == 'C' then\n"
                                "    i = dbg.getinfo(3,'nSl')\n"
                                "  end\n"
                                "  if type(err) ~= 'table' then\n"
                                "    err = {err='ERR ' .. tostring(err)}"
                                "  end"
                                "  if i then\n"
                                "    err['source'] = i.source\n"
                                "    err['line'] = i.currentline\n"
                                "  end"
                                "  return err\n"
                                "end\n";
        luaL_loadbuffer(lua,errh_func,strlen(errh_func),"@err_handler_def");
        lua_pcall(lua,0,0,0);
    }

    /* 创建一个 lua client (没有网络连接),用于在 lua 环境中执行 redis 命令。
     * 这个客户端没必要在 scriptingReset() 调用时重新创建。
     */
    if (lctx.lua_client == NULL) {
        lctx.lua_client = createClient(NULL);
        lctx.lua_client->flags |= CLIENT_SCRIPT;

        /* We do not want to allow blocking commands inside Lua */
        lctx.lua_client->flags |= CLIENT_DENY_BLOCKING;
    }

    /* Lock the global table from any changes */
    lua_pushvalue(lua, LUA_GLOBALSINDEX);
    luaSetErrorMetatable(lua);
    /* Recursively lock all tables that can be reached from the global table */
    luaSetTableProtectionRecursively(lua);
    lua_pop(lua, 1);

    lctx.lua = lua;
}

通过这部分代码,应该对于 lua 的嵌入式使用有了一个大概的印象。这里可以回答以下的问题:

...

Tcp 长连接服务优雅重启的秘密

假设我们有一个长连接服务,我们想要对它进行升级,但是不想让客户端受到影响应该怎么做?这个问题其实是一个很常见的问题,比如我们的游戏服务器,我们的 IM 服务器,推送服务器等等,诸如此类使用tcp长连接的服务,都会遇到这个问题。那么我们应该怎么做呢?

需求分析 #

我们可以先来看下这个场景下的需求:

  • 客户端必须要对这个操作没有感知,也就是说客户端不需要做任何的修改,在服务器升级的过程中不需要配合。
  • 服务器在升级的过程中,不能丢失任何的连接,也就是说,如果有新的连接进来,那么这个连接必须要被接受,如果有旧的连接,那么客户端不能够触发重连。

基本思路 #

实现思路的讨论范围限制在 linux 服务器上

为了实现上述的要求,首先在升级流程中我们需要做到以下几点:

  • 旧的服务器进程在处理完请求前不能退出,而且一旦升级开始就不能再接受新的连接。
  • 旧的服务器进程在所有连接都处理完毕后才能退出。
  • 新的服务器进程在启动时需要继承旧的服务器进程的所有连接,新的连接也应该被新的服务器进程接受。
  • 新的服务器进程也必须监听旧的服务器进程的监听端口,否则新的连接无法被接受。

那么通过 Google 和 ChatGPT 的帮助,我们可以找到一些思路:

新进程继承旧进程的(监听)套接字,而不是创建新的。

为什么不创建新的(监听)套接字呢?在 linux 中内核会把处在不同握手阶段的TCP连接放在不同的队列中(半连接/全连接)。服务器的监听套接字会有自己的队列,如果创建新的套接字,那么旧的套接字队列中的连接就会丢失。为了做到客户端无感知,我们需要继承旧的套接字(主要是为了连接队列中的连接不丢失)。

半连接队列:当客户端发送 SYN 包时,服务器会把这个连接放在半连接队列中,等待服务器的 ACK 包,这个时候连接处于半连接状态。当服务器发送 ACK 包时,这个连接就会从半连接队列中移除,放到全连接队列中,这个时候连接处于全连接状态。当服务器调用 accept 时,就会从全连接队列中取出一个连接,这个时候连接处于 ESTABLISHED 状态。

实现方式 #

那么在 linux 中,我们可以通过以如下方式实现:

  1. 通过 fork 创建子进程,子进程继承父进程的所有资源,包括监听套接字;
  2. 子进程通过 exec 加载最新的二进制程序执行,这样就实现了新进程继承旧进程的监听套接字。
  3. 新进程启动完成后,通知父进程退出。
  4. 父进程受到信号后,停止接受新的连接,等待所有的连接处理完毕后退出。

在 Go 里面,我们可以通过如下方式实现:

type gracefulTcpServer struct {
	listener     *net.TCPListener
	shutdownChan chan struct{}
	conns        map[net.Conn]struct{}

	servingConnCount atomic.Int32
	serveRunning     atomic.Bool
}


// 普通启动方式
func start(port int) (*gracefulTcpServer, error) {
	ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    // handle error ignored

	s := &gracefulTcpServer{
		listener:         ln.(*net.TCPListener),
		shutdownChan:     make(chan struct{}, 1),
		conns:            make(map[net.Conn]struct{}, 16),
		servingConnCount: atomic.Int32{},
		serveRunning:     atomic.Bool{},
	}

	return s, nil
}

// 优雅重启启动方式
func startFromFork() (*gracefulTcpServer, error) {
    // ... ignored code

    // 从环境变量中获取 父进程的处理的连接数,用来恢复连接
	if nfdStr := os.Getenv(__GRACE_ENV_NFDS); nfdStr == "" {
		panic("not nfds env")
	} else if nfd, err = strconv.Atoi(nfdStr); err != nil {
		panic(err)
	}

	// restore conn fds, 0, 1, 2 has been used by os.Stdin, os.Stdout, os.Stderr
	lfd := os.NewFile(3, filepath.Join(tmpdir, "graceful"))
	ln, err := net.FileListener(lfd)
	// handle error ignored

	s := &gracefulTcpServer{
		listener:         ln.(*net.TCPListener),
		shutdownChan:     make(chan struct{}, 1),
		conns:            make(map[net.Conn]struct{}, 16),
		servingConnCount: atomic.Int32{},
		serveRunning:     atomic.Bool{},
	}

    // 从父进程继承的套接字中恢复连接
	for i := 0; i < nfd; i++ {
		fd := os.NewFile(uintptr(4+i), filepath.Join(tmpdir, strconv.Itoa(4+i)))
		conn, err := net.FileConn(fd)
		// handle error ignored
		go s.handleConn(conn)
	}

	return s, nil
}

func (s *gracefulTcpServer) gracefulRestart() {
	_ = s.listener.SetDeadline(time.Now())
	lfd, err := s.listener.File()

    // 给子进程设置 优雅重启 相关的环境变量
	os.Setenv(__GRACE_ENV_FLAG, "true")
	os.Setenv(__GRACE_ENV_NFDS, strconv.Itoa(len(s.conns)))

    // 将父进程的监听套接字传递给子进程
	files := make([]uintptr, 4, 3+1+len(s.conns))
	copy(files[:4], []uintptr{
		os.Stdin.Fd(),
		os.Stdout.Fd(),
		os.Stderr.Fd(),
		lfd.Fd(),
	})
    // 将父进程的套接字传递给子进程 
	for conn := range s.conns {
		connFd, _ := conn.(*net.TCPConn).File()
		files = append(files, connFd.Fd())
	}
	procAttr := &syscall.ProcAttr{
		Env:   os.Environ(),
		Files: files,
		Sys:   nil,
	}

    // 执行 fork + exec 调用
	childPid, err := syscall.ForkExec(os.Args[0], os.Args, procAttr)
}


func main() {
    // ...
    
    // 根据环境变量判断是 fork 还是新启动
	if v := os.Getenv(__GRACE_ENV_FLAG); v != "" {
		s, err = startFromFork()
	} else {
		s, err = start(*port)
	}

	go s.serve()

    // 处理信号,如果是 SIGHUP 信号,则执行 gracefulRestart 方法后再退出
	s.waitForSignals()
}

完整代码可以在 https://github.com/yeqown/playground/golang/tcp-graceful-restart 中找到。

...

Cloudflare Tunnel 使用笔记

Cloudflare 自不用多说,Tunnel 是 Cloudflare 提供的一项功能,可以将本地的服务通过 Cloudflare 的网络暴露到公网,这样就可以实现内网穿透,同时还可以通过 Cloudflare 的网络加速服务,提高访问速度。

初识 Cloudflare Tunnel #

最开始接触 Cloudflare Tunnel 是在 Twitter 上看到一个项目cloudflare-tunnel-ingress-controller ,这个项目是一个 Kubernetes 的 Ingress Controller,可以将 Kubernetes 中的服务通过 Cloudflare Tunnel 暴露到公网,这样就可以实现内网穿透,也就是说局域网搭建的服务可以通过 Cloudflare 的网络暴露到公网。

熟悉内网穿透的小伙伴,应该对这中东西很熟悉,也没什么好说的。

前提 #

  • 一个 Cloudflare 账号
  • 一个域名

cloudfalred #

Mac 上可以通过 brew install cloudflared 安装,安装完成后,可以通过 cloudflared -v 查看版本。

$ cloudflared -v
cloudflared version 2023.7.3 (built 2023-07-25T20:51:49Z)

参考 cloudflared 官方文档 安装。

$ cloudflared login

使用 #

我的使用场景除了最开始提到的 Kubernetes Ingress Controller 之外,还有一个就是将局域网内的开发机通过 Cloudflare Tunnel 暴露到公网,方便远程开发。

...

protoc-gen-fieldmask插件

背景 #

gRPC 作为服务端的常用框架,它通过 protocol-buffers 语言来定义服务,同时也约定了请求和响应的格式,这样在服务端和客户端之间就可以通过 protoc 生成的代码直接运行而不用考虑编码传输问题了。

但是,可能会遇到这样的场景:

  • RPC 响应中 无用的字段过多 , 浪费带宽和无效计算,如下图所示:

    这里的无用字段是指,在响应中,没有用到的字段,这些字段可以忽略掉,不会影响客户端的使用。

    或许 拆分接口 是一个好的办法,但是可能会因为这样那样的原因(信息粒度降低导致接口太多了,有些地方就是需要聚合信息;细粒度的API设计同时会导致代码重复增加),可能无法推动拆分改造。同时如果没有拆分标准,亦或团队内成员不能严格遵守标准,那么拆分也只是重复问题而已。

  • RPC 增量更新时,如何判断零值字段是否需要更新?

    对于 unset 和 zero value 不好区分的语言中(比如:go),在提供服务的一方遇到 增量更新 的场景时就会遇到这样的情况:

    对于这种情况当然可以也有一些方法来解决,比如:使用指针来定义数据基本类型,那么在使用的时候如果判定为 nil 就说明没有设置,如果不为 nil 且为零值,那么就说明也是需要更新的。不过这样解决的缺点就是,nil refference panic 的概率又增加了,在使用时也稍微麻烦了一点。

    ·

解决方案 #

其实我们在思考上述两种场景的时候,把 客户端服务端 的角色提取出来,就会发现这两个场景都是从 服务端 的视角遇到的问题,两个场景都是类似的:

  1. 客户端需要哪些字段,服务端不知道
  2. 客户端更新了哪些字段,服务端也不知道

但是,其实客户端是知道的,因此让客户端把这部分信息传递给服务端就行了。因此我们可以用 FieldMask 字段,用来传递客户端需要的字段,服务端就只返回需要的字段;客户端的告诉服务端需要哪些字段,服务端就更新哪些字段。

但是 FieldMask 只是一个定义,在具体的使用场景中还需要开发者自己编写一些辅助方法,来实现功能。那么是不是可以提供一个插件,让开发者可以只编写 proto 文件,便可以自动生成一些辅助方法呢?答案是肯定的,预览效果如下:

message UserInfoRequest {
  string user_id = 1;
  google.protobuf.FieldMask field_mask = 2 [
    (fieldmask.option.Option).in = {gen: true},
    (fieldmask.option.Option).out = {gen: true, message:"UserInfoResponse"}
  ];
}

message Address {
  string country = 1;
  string province = 2;
}

message UserInfoResponse {
  string user_id = 1;
  string name = 2;
  string email = 3;·
  Address address = 4;
}

message NonEmpty {}

service UserInfo {
  rpc GetUserInfo(UserInfoRequest) returns (UserInfoResponse) {}
  rpc UpdateUserInfo(UserInfoRequest) returns (NonEmpty) {}
}

生成的代码如下:

...

Sentry+OpenTelemetry前后端全链路打通总结

自从微服务大行其道,容器化和k8s编排一统天下之后,“可观测性” 便被提出来。这个概念是指,对于应用或者容器的运行状况的掌控程度,其中分为了三个模块:MetricsTracingLogging。Metrics 指应用采集的指标;Tracing 指应用的追踪;Logging 指应用的日志。

日志自不用多说,这是最原始的调试和数据采集能力。Metrics 比较火的方案就是 Prometheus + Grafana,思路就是通过应用内埋入SDK,选择 Pull 或者 Push 的方式将数据收集到 prometheus 中,然后通过 Grafana 实现可视化,当然这不是本文的重点就此略过。

Tracing 也并不是可观测性提出后才诞生的概念,在微服务化的进程中就已经有Google的Dapper落地实践,并慢慢形成 OpenTracing 规范,这一规范又被多家第三方框架所支持,如 Jaeger、Zipkin 等。OpenTelemetry 就是结合了 OpenTracing + OpenCensus 规范,约定并提供完成的可观测性套件,只是目前(2021-12-15)稳定下来的只有 Tracing 这一部分而已。对 OpenTelemetry 发展历史感兴趣的可以自行了解。

效果预览 #

链路总览,包含了前端页面的生命周期 + 整个了链路采集到的Span聚合。

前端页面指标采集概览,包含了该页面生命周期内的动作和日志等。

服务端链路细节,包含了服务端链路采集的标签和日志(事件)等信息。

propagation兼容jaeger效果,保证jaeger侧链路完整,使用一致的 traceId检索。因为服务侧 sentry 是渐进更新的,因此没有接入的应用并不会展示在sentry侧, 等到完全更新后就会完整。

背景 #

目前运行中的链路追踪组件是采用 opentracing + jaeger 实现,这套方案唯二的不足就是:

前端采用 sentry 来采集前端页面数据(APP + WEB 都支持很好),因此才有了这么一个 前后端链路打通的需求。

...

访问量 访客数