技术总结

Kafka MirrorMaker2 从使用到迁移

本文中使用的 Kafka 版本为 v3.3.2

引言 #

Kafka MirrorMaker2 是 Kafka 官方提供的跨集群数据复制工具, 它是基于 Kafka Connect 框架构建的。MirrorMaker2 支持多种部署模式, 包括 Dedicated 模式和 Connect 集群模式,还有 standalone 模式。

其中, Dedicated 模式有一个启动脚本 kafka-mirror-maker.sh, 该脚本会启动一个独立的 MirrorMaker2 实例, 而不需要依赖 Kafka Connect 集群。Dedicated 模式适合小规模的复制任务, 但在大规模部署中, 它缺乏可扩展性和高可用性。

相比之下, Connect 集群模式则是先搭建出一个 Kafka Connect 集群, 再提交 MirrorMaker2MirrorSourceConnector 任务。这种模式下, 可以通过增加或减少 Connect 工作节点来动态调整复制任务的资源, 具备更好的弹性和容错能力。

当然配置上也会更复杂一些, 需要管理 Connect 集群的配置和任务。

那么, 如果我们已经在使用 Dedicated 模式部署了 MirrorMaker2, 但现在需要切换到 Connect 集群模式, 应该如何操作呢? 本文将介绍从 Dedicated 模式迁移到 Connect 集群模式时,怎么处理已经同步的 offset 进度, 以确保数据的一致性和连续性。

...

WebAssembly Spec 浅析和跨语言实践

本文主要聚焦于 WebAssembly 的核心规范(Core Spec)部分。WASI 和 Embedder Spec 等其他部分并非本文的重点,感兴趣的读者可自行查阅相关资料。

目的 #

随着 Web 技术的普及,越来越多的应用场景(如游戏、音视频处理、AI 等)需要在浏览器中运行。这些场景通常涉及 CPU 密集型任务,而现有 Web 引擎在处理此类任务时,性能仍不及原生语言。此外,C/C++ 等语言已积累了大量成熟的库,为了高效复用这些库以扩展 Web 的能力,急需一种新方式,使 C++/Rust 等语言也能在浏览器环境中运行。

为解决上述问题,W3C 提出了 WebAssembly 规范。该规范设计了一种全新的、与机器无关的汇编指令集、运行时(可理解为虚拟机)以及内存模型等。

WebAssembly 规范发布后,各大浏览器厂商迅速跟进支持,使其成为一种跨平台的二进制格式,能够在不同操作系统和硬件平台上运行。WASM 的应用范围也因此不再局限于 Web 场景,而是扩展到移动端、服务器端等领域。在云原生领域,Envoy、Kong 和 Apisix 等项目已支持 WASM 作为其扩展插件。WasmEdge 更进一步,直接提出将 WASM 应用于边缘计算场景,例如无服务器应用和函数即服务等。

核心概念 #

在最新规范中,WebAssembly 定义了以下核心概念:

概念 解释说明
Values 提供四种基础数值类型:32位和64位的整型及浮点型。32位整型可用于表示布尔值或内存地址。另有128位扩展整型用于高精度计算。
Instructions 基于栈式虚拟机执行的指令,分为简单指令和控制指令两类。
Traps 类似异常机制,当发生非法操作(如越界访问)时立即中止执行并报告宿主环境。功能类似于Go/Rust中的panic。
Functions 与其他编程语言一致,用于组织特定功能的代码,接收参数并返回结果。
Tables 数据结构上是一个数组,用于存储特定类型(funcrefexternref),可通过索引模拟函数指针。
Linear Memory 一段可动态增长的连续字节数组,程序可存储和加载其中任意位置的数据,越界访问会触发Trap。
Modules 包含类型、函数、表、内存和全局变量的定义,作为部署、加载和编译的基本单位。可声明导入导出项,并支持定义自动执行的启动函数。
Embedder 指将WebAssembly程序嵌入宿主环境的实现方式,如wasmtime或Web环境中的WebAssembly运行时。

举例分析 #

接下来,我们将结合一个简单的 WASM 程序来辅助理解 WebAssembly 规范的内容。这里使用 Rust 语言编写一个简单的加法程序,并将其编译为 WASM 模块:

...

Kafka + Mongodb 通信协议纪要

Kafka 和 MongoDB 是目前使用比较广泛的消息队列和数据库,在之前的很长时间里对这两个软件系统的理解都停留在概念和使用上,直到最近遇到一个“诡异”的问题,已有的经验和调试方法无法定位时,最终尝试了下抓包分析才最终定位到问题的根源。

问题描述: 使用 sarama 编写了一个 kafka 消费者组,这里不同寻常的地方在于:手动提交 + 批量消费。遇到的问题:某些分区消费进度无法成功提交,但是消息是消费成功的。出现这种情况的分区没有规律,触发 rebalance 后 “故障分区” 有概率会发生变化。

分析/定位:这里很明显的问题在于手动提交 offset 为什么不成功?从实现来说,提交 offset 的逻辑跟分区没有关系是一致,那这种不确定性故障时从哪儿来的?而且还和 rebalance 相关。 梳理下 kafka 客户端消费提交涉及到的操作:Fetch, OffsetCommit, 但是消费是正常的,那么只需要抓包分析 OffsetCommit 就可以知道 offset 提交存在什么问题。

结果: 通过抓包一切都明朗了:出现问题的分区同时有多个 OffsetCommit 请求,且其中有的请求提交的 offset 一致停留在一个 “旧的” 位置,不会更新,这样就缩小了范围:程序提交 offset 逻辑异常。

KAFKA 协议 #

Kafka 协议是基于 TCP/IP 协议的二进制协议。其结构组成如下:

struct RequestOrResponse {
    RequestResponseHeader requestResponseHeader; // uint32 messageLength;
    SpecificRequestOrResponseHeader body; // 格式取决于具体的请求和响应,比如:RequestV1Header
}

struct RequestV1Header {
  int16 apiKey;
  int16 apiVersion;
  int32 correlationId;
  string clientId;
}

协议结构 #

https://kafka.apache.org/protocol.html#protocol_messages

...

在 K8S 中部署存算分离 Doris 集群

本文记录了在 ubuntu 22.04 上配合 minikube 搭建的 k8s 集群,搭建 doris 存算分离集群的过程。

0. 环境信息 #

软件 版本
OS Ubuntu 24.04.1 LTS x86_64
kernel 6.8.0-52-generic
minikube v1.35.0
kubernetes v1.32.0
doris v3.0.3

这里默认已经准备好了基础的 kubernetes 集群,所以也不再阐述如何通过 minikube 或者其他方式搭建 kubernetes 集群。

1. 安装 FoundationDB #

参考文档:https://doris.apache.org/zh-CN/docs/3.0/install/deploy-on-kubernetes/separating-storage-compute/install-fdb

1.1 安装 FoundationDB CRD 资源 #

kubectl apply -f https://raw.githubusercontent.com/FoundationDB/fdb-kubernetes-operator/main/config/crd/bases/apps.foundationdb.org_foundationdbclusters.yaml
kubectl apply -f https://raw.githubusercontent.com/FoundationDB/fdb-kubernetes-operator/main/config/crd/bases/apps.foundationdb.org_foundationdbbackups.yaml
kubectl apply -f https://raw.githubusercontent.com/FoundationDB/fdb-kubernetes-operator/main/config/crd/bases/apps.foundationdb.org_foundationdbrestores.yaml

1.2 部署 FoundationDB Operator #

wget https://raw.githubusercontent.com/apache/doris-operator/master/config/operator/fdb-operator.yaml
kubectl apply -f fdb-operator.yaml

1.3 部署 FoundationDB 集群 #

wget https://raw.githubusercontent.com/foundationdb/fdb-kubernetes-operator/main/config/samples/cluster.yaml -O fdb-cluster.yaml
kubectl apply -f fdb-cluster.yaml

# 查看集群状态
kubectl get fdb

# 预期输出(启动需要时间,需要等待几分钟)
NAME           GENERATION   RECONCILED   AVAILABLE   FULLREPLICATION   VERSION   AGE
test-cluster   1            1            true        true              7.1.26    3m30s

2. 安装 Doris Operator #

2.1 安装 CRD 部署 Doris 相关资源定义 #

kubectl create -f https://raw.githubusercontent.com/apache/doris-operator/master/config/crd/bases/crds.yaml

2.2 部署 Doris Operator #

wget https://raw.githubusercontent.com/apache/doris-operator/master/config/operator/disaggregated-operator.yaml -O disaggregated-operator.yaml
kubectl apply -f disaggregated-operator.yaml

# 查看部署状态
kubectl get pod -n doris

# 预期输出
NAME                              READY   STATUS    RESTARTS   AGE
doris-operator-5fd65d8d69-rgqlk   1/1     Running   0          79s

3. 部署存算分离集群 #

3.1 下载示例配置 #

wget https://raw.githubusercontent.com/apache/doris-operator/master/doc/examples/disaggregated/cluster/ddc-sample.yaml -O ddc-sample.yaml

3.2 配置 ConfigMap #

对于 ddc-sample.yaml 配置进行调整配置。这三个都需要分别配置 ConfigMap 并修改集群中的配置挂载。

...

通过一次抓包来掌握memcached

什么是 memcached #

memcached 是一个高性能的“分布式”内存对象缓存系统,用于动态 Web 应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。memcached 是自由软件,以 BSD 许可证发布。

相比于大家熟知的 Redis,memcached 更加简单,只支持 key-value 存储,而 Redis 支持更多的数据结构,如 list、set、hash 等。

Github 地址:https://github.com/memcached/memcached

为什么有 redis 还要使用 memcached #

从我个人的角度来说,要在采用一个缓存系统的时候,我会优先选择 Redis,因为 Redis 功能更加强大,支持更多的数据结构,而且 Redis 也支持持久化,在高可用和分布式部分的设计上也更加完善。

但是 memcached 也有自己的优势,比如更加简单,更加轻量级,更加容易上手,因此在某些系统中也会选用 memcached。因此,了解 memcached 的设计也是有必要的。

memcached 协议概览 #

memcached 支持基本的文本协议和元文本协议,其中元文本协议于 2019 年推出。memcached 还曾支持过二进制协议,但已经被废弃。

memcached 的协议是基于文本的,因此我们可以通过 telnet 或者 netcat 工具来模拟 memcached 的客户端,从而方便的进行测试。

这两者是 “交叉兼容” 的,也就是说我们可以通过 文本协议来设置键值, 通过 元文本协议来查询,反之亦然。

Standard Text Protocol #

详细的协议文档可以参考:https://github.com/memcached/memcached/blob/master/doc/protocol.txt

memcached 的标准文本协议是一个基于文本的协议,它使用 ASCII 字符串来进行通信。memcached 服务器监听在默认端口 11211 上,客户端通过 TCP 连接到服务器,然后发送命令和数据。因此我们可以很容易的通过 telnet 工具就可以完成 memcached 的基本操作。

...

近期的一些经验总结

这里不会过多的介绍软件的相关概念和架构,主要是针对实际问题的解决方案和思考。

问题汇总 #

  • CDC 相关

    • CDC kafka-connect mysql sink 侧消费积压问题
    • CDC kafka-connect mysql source 侧删除事件投递了两条事件,导致删除动作数据量被放大
    • CDC kafka-connect mongodb 数据同步任务异常(消息超过 1MB )

    更新于: 2025-02-06

    • CDC Elasticsearch sink 怎么自定义索引名称?
    • 自定义 transform 实现自定义索引名称
  • DMS 数据同步相关

    • 数据迁移完成后,怎么对比源数据和目标数据是否一致?
    • 如果不一致怎么处理?
  • Istio 相关

    • Istio 中多个 gateway 使用相同 host,analyze 是提示错误
    • Istio 中一个服务提供了多个端口的服务,怎么配置 Virtual Service ?
  • APISIX 相关

    • 使用 APISIX 作为网关,怎么进行有条件的响应重写?
    • APISIX 插件的执行顺序是怎么样的?
  • ShardingSphere Proxy

    • HINT策略 在 ShardingSphere Proxy 中的使用
  • Kafka 相关

    • 如何将迁移kafka集群中的数据?
  • Pyroscope 相关

    • 使用 Go Pull 模式采集数据时为什么只有 cpu + gourotines + cpu samples 三个指标?
  • Doris 相关

    ...

ShardingSphere-Proxy问题几则

ShardingSphere Proxy 是 Apache ShardingSphere 的一个子项目,是一个基于 MySQL 协议的数据库中间件,用于实现分库分表、读写分离等功能。在使用过程中,遇到了一些问题,记录如下。

这里主要针对的是 分库分表 的使用场景。

问题概述 #

数据库往往是一个系统最容易出现瓶颈的点,当遇到数据库瓶颈时,我们可以通过数据拆分来缓解问题。数据拆分的方式通常分为横向拆分和纵向拆分,横向拆分即分库分表;纵向拆分即把一个库表中的字段拆分到不同的库表中去。这两种手段并不互斥,而是在实际情况中相辅相成。本文即是横向拆分相关内容。

  • 常见的部署方式有哪些?
  • 数据分片规则怎么配置?
  • 数据分片数应该怎么确定?
  • 数据分片后唯一索引还有用吗?
  • 数据分片后数据迁移?
  • 数据分片后如何确定实际执行 SQL 语句?
  • 数据分片后的查询优化?

0. 常见的部署方式 #

官方提供了两种部署方式:

  • 单机部署:将 ShardingSphere Proxy 部署在单台服务器上,用于测试和开发环境。
  • 集群部署:将 ShardingSphere Proxy 部署在多台服务器上,用于生产环境。集群模式下使用 zookeeper 来存储元数据。

关于元数据,元数据是 ShardingSphere Proxy 的核心,用于存储分库分表规则、读写分离规则等信息。 官方建议使用集群模式部署 生产环境的 ShardingSphere Proxy

如果不按照官方的指引,选择部署了多个 Standalone 模式的 ShardingSphere Proxy,那么需要注意“每个这样的 proxy 节点会有自己的元信息,他们之间并不互通”。在这些情况下会出现节点之间元数据不一致的问题,参看如下测试:

# 启动 3 个 standalone 模式的 ShardingSphere Proxy
                                          +-------+
                                          |  LB   |
                                          +-------+
                                              |
                                |-------------|--------------|
                                |             |              |
                            +-------+     +-------+       +-------+
                            | Node1 |     | Node2 |       | Node3 |
                            +-------+     +-------+       +-------+

初始表结构如下:

...

fsnotify原理探究

本文如果没有特殊说明,所有的内容都是指 linux 系统

起因是从 kratos 群里看到有人问:“测了下kratos的config watch,好像对软链不生效”,他提供的屏幕截图如下类似:

$ pwd
/tmp/testconfig
$ ls -l
drwxr-xr-x  3 root root 4096 Oct 10 19:48 .
drwxr-xr-x 10 root root 4096 Oct 10 19:48 ..
drwxr-xr-x  1 root root   11 Oct 10 19:48 ..ver1
drwxr-xr-x  1 root root   11 Oct 10 19:48 ..ver2
lrwxr-xr-x  1 root root   11 Oct 10 19:48 ..data -> ..ver1
drwxr-xr-x  1 root root   11 Oct 10 19:48 data
$
$ ll -a data
drwxr-xr-x  3 root root 4096 Oct 10 19:48 .
drwxr-xr-x 10 root root 4096 Oct 10 19:48 ..
lrwxrwxrwx  1 root root   11 Oct 10 19:48 registry.yaml -> /tmp/testconfig/..data/registry.yaml

然后触发更新的动作其实是把 ..data 的源改成了 ..ver2,但是发现并没有触发更新,于是就问了一下。

...

C和lua互操作实践

Lua, C, Redis

相信了解 redis 和 openresty 的小伙伴们都知道 lua 代码可以嵌入这两种程序中运行,极大的提高了软件的扩展性;尤其是 openresty 中,通过使用 lua 我们可以很快速(相比c)的定制web服务器,或者增强 nginx 的功能。那么 lua 是如何嵌入到这些程序中的呢?lua 和 c 是如何互操作的呢?

下文的相关环境和工具版本为:Lua 5.4.6; Mac OS 13.4.1 (Darwin Kernel Version 22.5.0) arm64 M2 pro; Apple clang version 14.0.3 (clang-1403.0.22.14.1)

redis 中的 lua #

下面展示了一段 redis 中操作 lua API 的代码:

这里出现了很多 lua_ 开头的函数,这些函数都是 lua 库中的函数,redis 通过这些函数来操作 lua 环境, 这里先不展开讲,后面会详细介绍。

更多的代码,如 luaRegisterRedisAPI 就不展示了,有兴趣的可以去看源码。

// redis-v7.2/src/eval.c#183

/* 初始化 lua 环境
 *
 * redis 首次启动时调用,此时 setup 为 1,
 * 这个函数也会在 redis 的其他生命周期中被调用,此时 setup 为 0,但是被简化为 scriptingReset 调用。
 */ 
void scriptingInit(int setup) {
    lua_State *lua = lua_open();

    if (setup) {
        // 首次启动时,初始化 lua 环境 和 ldb (Lua debugger) 的一些数据结构
        lctx.lua_client = NULL;
        server.script_disable_deny_script = 0;
        ldbInit();
    }

    /* 初始化 lua 脚本字典,用于存储 sha1 -> lua 脚本的映射
     * 用户使用 EVALSHA 命令时,从这个字典中查找对应的 lua 脚本。
     */
    lctx.lua_scripts = dictCreate(&shaScriptObjectDictType);
    lctx.lua_scripts_mem = 0;

    /* 注册 redis 的一些 api 到 lua 环境中 */
    luaRegisterRedisAPI(lua);

    /* 注册调试命令 */
    lua_getglobal(lua,"redis");

    /* redis.breakpoint */
    lua_pushstring(lua,"breakpoint");
    lua_pushcfunction(lua,luaRedisBreakpointCommand);
    lua_settable(lua, -3);
    /* redis.debug */
    lua_pushstring(lua,"debug");
    lua_pushcfunction(lua,luaRedisDebugCommand);
    lua_settable(lua,-3);
    /* redis.replicate_commands */
    lua_pushstring(lua, "replicate_commands");
    lua_pushcfunction(lua, luaRedisReplicateCommandsCommand);
    lua_settable(lua, -3);

    lua_setglobal(lua,"redis");

    /* 注册一个错误处理函数,用于在 lua 脚本执行出错时,打印出错信息。
     * 需要注意的是,当错误发生在 C 函数中时,我们需要打印出错的 lua 脚本的信息,
     * 这样才能帮助用户调试 lua 脚本。
     */
    {
        char *errh_func =       "local dbg = debug\n"
                                "debug = nil\n"
                                "function __redis__err__handler(err)\n"
                                "  local i = dbg.getinfo(2,'nSl')\n"
                                "  if i and i.what == 'C' then\n"
                                "    i = dbg.getinfo(3,'nSl')\n"
                                "  end\n"
                                "  if type(err) ~= 'table' then\n"
                                "    err = {err='ERR ' .. tostring(err)}"
                                "  end"
                                "  if i then\n"
                                "    err['source'] = i.source\n"
                                "    err['line'] = i.currentline\n"
                                "  end"
                                "  return err\n"
                                "end\n";
        luaL_loadbuffer(lua,errh_func,strlen(errh_func),"@err_handler_def");
        lua_pcall(lua,0,0,0);
    }

    /* 创建一个 lua client (没有网络连接),用于在 lua 环境中执行 redis 命令。
     * 这个客户端没必要在 scriptingReset() 调用时重新创建。
     */
    if (lctx.lua_client == NULL) {
        lctx.lua_client = createClient(NULL);
        lctx.lua_client->flags |= CLIENT_SCRIPT;

        /* We do not want to allow blocking commands inside Lua */
        lctx.lua_client->flags |= CLIENT_DENY_BLOCKING;
    }

    /* Lock the global table from any changes */
    lua_pushvalue(lua, LUA_GLOBALSINDEX);
    luaSetErrorMetatable(lua);
    /* Recursively lock all tables that can be reached from the global table */
    luaSetTableProtectionRecursively(lua);
    lua_pop(lua, 1);

    lctx.lua = lua;
}

通过这部分代码,应该对于 lua 的嵌入式使用有了一个大概的印象。这里可以回答以下的问题:

...

访问量 访客数