近期的一些经验总结

近期的一些经验总结

这里不会过多的介绍软件的相关概念和架构,主要是针对实际问题的解决方案和思考。

问题汇总 #

  • CDC 相关

    • CDC kafka-connect mysql sink 侧消费积压问题
    • CDC kafka-connect mysql source 侧删除事件投递了两条事件,导致删除动作数据量被放大
    • CDC kafka-connect mongodb 数据同步任务异常(消息超过 1MB )
  • DMS 数据同步相关

    • 数据迁移完成后,怎么对比源数据和目标数据是否一致?
    • 如果不一致怎么处理?
  • Istio 相关

    • Istio 中多个 gateway 使用相同 host,analyze 是提示错误
    • Istio 中一个服务提供了多个端口的服务,怎么配置 Virtual Service ?
  • APISIX 相关

    • 使用 APISIX 作为网关,怎么进行有条件的响应重写?
    • APISIX 插件的执行顺序是怎么样的?
  • ShardingSphere Proxy

    • HINT策略 在 ShardingSphere Proxy 中的使用
  • Kafka 相关

    • 如何将迁移kafka集群中的数据?
  • Pyroscope 相关

    • 使用 Go Pull 模式采集数据时为什么只有 cpu + gourotines + cpu samples 三个指标?
  • Doris 相关

    • 动态分区表插入数据时失败,提示 “no partition for this tuple”

1. CDC 相关 #

CDC 是 Change Data Capture 的缩写,即变更数据捕获。CDC 是一种软件模式,用于捕获和跟踪数据库中的变更。CDC 通常用于复制数据、数据集成和数据仓库加载等场景。

这里 CDC 的技术实现为使用 kafka-connect 连接 mysql 和 mongodb 将数据同步到异构系统中 elasticsearch。source 和 sink connector 使用的 debezium 插件。kafka-connect 是基于 kafka 构建的沟通数据系统的可靠工具,它最常用于将数据异构存储,以满足离线查询和分析、数据仓库、数据湖等需求。

这里使用到 mysql source connector 的配置如下:

{
    "name": "xxx",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "include.schema.changes": "true",
    "topic.prefix": "mysql-xxx",

    // 配置了 Reroute transform,将 真实表 归集到一个 逻辑表对应的 topic 中
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "xxx",
    "transforms.Reroute.key.enforce.uniqueness": "false",
    "transforms.Reroute.topic.replacement": "<topic_prefix>",

    "schema.history.internal.kafka.topic": "mysql-connector.schemahistory.xxx",
    "schema.history.internal.kafka.bootstrap.servers": "<kafka_server>",

    "database.include.list": "<database_name or regex>",
    "database.port": "<mysql_port>",
    "database.hostname": "<mysql_host>",
    "database.password": "<mysql_pass>",
    "database.user": "<mysql_user>",
    "table.exclude.list": "<table_name or regex>",
}

mysql sink connector 的配置如下:

{
    "name": "<connector name>",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "behavior.on.null.values": "DELETE",
    "tasks.max": "1", // 任务数对应 consumer group 的消费者实例数
    "key.ignore": "false",
    "write.method": "UPSERT",
    "connection.url": "<es_addr>",
    "topics.regex": "<topic_regex>",

    "transforms": "unwrap,key",

    // unwrap tranform 配置,处理删除事件:将删除事件的 value 设置为 null
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "none",
    "transforms.unwrap.drop.tombstones": "false",

    // key tranform 配置,设置 ES 的 docID 为 mysql 数据表中的id (主键)
    "transforms.key.field": "id",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
}

1.1 CDC kafka-connect mysql sink 侧消费积压问题 #

积压的原因,针对 mysql source / sink connector 只使用了一个分区。众所周知 topic 分区数会直接影响到消费的并发度,如果只有一个分区,那么只有一个消费者可以消费,消费者的消费速度就会受到限制。解决方案是增加分区数,增加消费者的并发度。

这里存在的疑惑是:

  • mysql 相关的 connector 是否只能使用一个分区?还能否保证顺序消费?
  • 有人反馈的的只能使用一个分区的 topic 是指的这部分吗?
  • 增加 topic 数之后,source connector 和 sink connector 都能够增加并行的任务吗?

文档查阅和实验:

在最开始使用时,当时确定为只有一个分区是由于文档中提及了:

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-schema-change-topic

Never partition the database schema history topic. For the database schema history topic to function correctly, it must maintain a consistent, global order of the event records that the connector emits to it.

To ensure that the topic is not split among partitions, set the partition count for the topic by using one of the following methods:

If you create the database schema history topic manually, specify a partition count of 1.

If you use the Apache Kafka broker to create the database schema history topic automatically, the topic is created, set the value of the Kafka num.partitions > configuration option to 1.

但是实际上看来,这里的意思是指的 schema history topic 而不是其他的 topic。唯一找到限制是 mysql source connector task 只能为 1 如下所示:

tasks.max

Default value: 1

The maximum number of tasks to create for this connector. Because the MySQL connector always uses a single task, changing the default value has no effect.

因此分区数为 1 只是“历史”问题,实际上可以增加分区数,以增加消费侧并发度。

另外文档中也说明了 source connector 投递消息的 message key 可以自定义 key, 其默认值为数据库表的主键,这样可以保证同一条数据的变更事件被投递到同一个分区中,从而保证了顺序性。

如下是真实环境中一条数据的变更事件的 message key 示例:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      }
    ],
    "optional": false,
    "name": "$connectorName.$dbName.$tblName.Key"
  },
  "payload": {
    "id": 1075966706511257600 // 唯一的变量,即主键列的值
  }
}

处理手段:

  1. 调整 mysql sink connector 的 topic 分区数,增加并发度。
./kafka-topics.sh --bootstrap-server <kafka_server> --alter --topic <topic_name> --partitions <new_partition_num>
  1. 调整 mysql sink connector 的任务数与分区数一致(这里20为示例),以保证每个任务都能够消费到数据。
- “tasks.max”: "1"
+ “tasks.max”: "20"

注意:增加分区数,大概率会导致某些消息从之前的分区被投递到新的分区,这样会导致消息的顺序性被打乱,因此要谨慎处理,要么能够容忍这种异常,要么事后恢复最终一致,也可以先停止生产等消费完毕后再行调整。

1.2 CDC kafka-connect mysql source 侧删除事件投递了两条事件,导致删除动作数据量被放大 #

这个问题的背景是,mysql 部分表数据量极大,个别单表超过 100 million 条数据,且日增量极大,因此会定期删除。定期删除一开始选择的是使用 mysql Event Scheduler 定时任务 + Procedure 删除,但这种方式对于运维人员的要求较高,且不够灵活(分库分表场景下),其次在业务高峰期删除操作会影响到业务的正常运行。因此另外选择了自行实现一个定时任务,定时删除数据,以避开业务高峰。

但是部署上线时发现一个问题:删除的数据量大概是 20 Million 条,但是 kafka 中的消息数量是 40 Million 条,且每条消息都是删除操作。

演示如下图中所示,针对同一条数据的删除操作,source connector 投递了两条消息,导致数据量被放大。

delete_event_double

文档查阅:

When a row is deleted, the delete event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be null. To make this possible, after the Debezium MySQL connector emits a delete event, the connector emits a special tombstone event that has the same key but a null value.

这意味着当一条数据被删除时,source connector 会先投递一条删除事件,然后再投递一条 tombstone 事件。这样做的目的是为了保证 kafka 的 log compaction 机制能够正常工作,即如果一条数据被删除后,当 log compaction 开启时,kafka 可以删除所有之前的消息,只保留最新的消息(即被删除的状态),当其他系统消费时,只需要关注最新的状态即可。

Kafka Log Compaction 参考:Kafka Log Compaction

但在我们的实际场景中,Log Compaction 机制并不适用也没有开启,因此不需要 tombstone 事件,只需要删除事件即可。不适用的原因在于,当数据被删除时离它的创建时间已经过去了很久,同时 kafka 中也只保留 7d 的数据,因此 tombstone 事件对我们来说没有意义。

处理手段:

  1. 调整 source connecor 配置,增加 unwrap tranform 以删除不必要的删除事件。
- "transforms": "Reroute",
+ "transforms": "Reroute,unwrap",
+ "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+ "transforms.unwrap.delete.handling.mode": "none",
+ "transforms.unwrap.delete.tomstones.handling.mode": "tomstone",

调整后的效果如下图所示:

delete_event_single

1.3 CDC kafka-connect mongodb 数据同步任务异常(消息超过 1MB ) #

这个问题是偶然发现 mongodb source connector 异常,查看日志发现如下异常:

org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
	at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:334)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:128)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:404)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:361)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1096354 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

如果是熟悉 java + kafka 开发的同学,应该一眼就能看出来是消息超过了 kafka 的 max.request.size 配置,导致消息发送失败。

文档查阅:

这里走了“弯路”去翻了文档,是因为对 java + kafka 开发不熟悉,只是大致猜测是消息尺寸超过了 kafka 的限制,因此查阅了 kafka 的文档。

处理手段:

  1. 调整 kafka broker 的 message.max.bytes 配置,增加消息的最大尺寸。

    ./kafka-configs.sh --zookeeper <zookeeper> --entity-type brokers --entity-name <broker_id> --alter --add-config message.max.bytes=10485760
    
  2. 调整 kafka connect 的配置,增加 producer.max.request.size 配置。

    + producer.max.request.size=10485760 // 10MB
    

2. DMS 数据同步相关 #

DMS 是 Data Migration Service 的缩写,即数据迁移服务。DMS 是一种数据迁移服务,用于将数据从一个地方迁移到另一个地方。DMS 通常用于数据迁移、数据备份、数据恢复等场景。

2.1 数据迁移完成后,怎么对比源数据和目标数据是否一致? #

在实际场景中,可能会有数据迁移的需求,比如说将数据从一个数据库迁移到另一个数据库,或者将数据从一个表迁移到另一个表。在迁移完成后,我们需要对比源数据和目标数据是否一致,以保证数据的一致性。

这里记录下在检索中出来可以使用的一些工具:

2.2 如果不一致怎么处理? #

tidb-tools/sync_diff_inspector 可以输出修复SQL。Percona Toolkit 中的 pt-table-sync 也可以用于同步两个表的数据。

3. Istio 相关 #

3.1 Istio 中多个 gateway 使用相同 host,analyze 是提示错误 #

在 Istio 中,gateway 是一个虚拟服务,用于将流量路由到对应的服务。gateway 有一个 host 字段,用于指定域名。在实际场景中,可能会有多个 gateway 使用相同的 host,这样就会导致 analyze 时提示错误。

3.2 Istio 中一个服务提供了多个端口的服务,怎么配置 Virtual Service ? #

我们有一个短信服务,它同时提供了 HTTP 和 gRPC 服务,分别使用了 8000 和 50051 端口。在没有使用 istio 去路由流量的时候,在 k8s 中配置这个服务其实很简单,只需要在 Service 中配置多个端口即可。如下:

apiVersion: v1
kind: Service
metadata:
  name: sms-service
spec:
  selector:
    app: sms-service
  ports:
    - name: http
      port: 8000
      targetPort: 8000
    - name: grpc
      port: 50051
      targetPort: 50051

但是在使用 istio 时,需要配置 Virtual Service 来路由流量,那么如何配置 Virtual Service 来支持多个端口的服务呢?

文档查阅:

HTTPRoute

Describes match conditions and actions for routing HTTP/1.1, HTTP2, and gRPC traffic. See VirtualService for usage examples.

http[].match.port (Optional) uint32

Specifies the ports on the host that is being addressed. Many services only expose a single port or label ports with the protocols they support, in these cases it is not required to explicitly select the port.

从这一节文档,我们得知 HTTPRoute 可以用于路由 HTTP/1.1、HTTP2 和 gRPC 流量,因此我们可以使用 HTTPRoute 来配置 Virtual Service。match 匹配条件中的 port 字段可以用于指定端口,专门用于单个服务提供多端口的场景。需要注意的是 port 代表的是访问地址中的端口,而不是服务暴露的端口。

处理手段:

  • 在 Virtual Service 配置中,新增多个 httpRoute 来路由到不同的端口。
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: sms-service
spec:
  hosts:
    - api.my
  http:
    - name: http
      match:
        - uri:
            prefix: /
          port: 80
      route:
        - destination:
            host: sms-service
            port:
              number: 8000
    - name: grpc
      match:
        - port: 50051
      route:
        - destination:
            host: sms-service
            port:
              number: 50051

4. APISIX 相关 #

4.1 使用 APISIX 作为网关,怎么进行有条件的响应重写? #

响应重写属于网关的基本功能,APISIX 作为一个开源的网关,也支持响应重写。但是在实际场景中,可能会有一些特殊的需求,比如说只有满足一定条件时才进行响应重写。举个实际的例子:

当服务器要进行停机维护的时候,我们希望所有的请求,不论是客户端还是第三方请求的请求,都响应 503 状态码,并且返回一个提示信息 “服务器正在维护中,预计维护时间为 2024-12-17 00:00:00 - 2024-12-17 06:00:00”,同时还有另外一个要求当服务器恢复时,可以允许特定的请求通过,比如说只有来自公司内部的请求才可以通过。

这里默认客户端是支持展示这种响应的,因此不考虑客户端展示问题。

文档查阅:

这里关注该插件的以下几个属性:

  • status_code: 响应状态码
  • body: 响应体,可以是字符串或者 base64 编码的字符串。!!!注意 body 和 filters 不能同时使用。
  • body_base64: 用于指示 body 是否 base64 编码
  • vars: 用于匹配条件,支持多个匹配条件。
    • 变量列表参考:
      • Nginx 变量:https://nginx.org/en/docs/varindex.html
      • APISIX 变量:https://apisix.apache.org/docs/apisix/apisix-variable/
    • 操作符参考:https://github.com/api7/lua-resty-expr#operator-list
  • filters: 对 body 内容进行正则匹配并替换。

vars 配置举例:

# 匹配所有查询参数带有 pkg=com.company.io 的请求
vars:
  - - arg_pkg
    - ~=
    - "com.company.io"

filter 配置举例:

# 这个配置会将响应体中的所有 Example 替换为 Example-Replaced
filters:
  - regex: Example
    scope: global
    replace: Example-Replaced
    options: "jo" # 正则匹配选项,参见 https://github.com/openresty/lua-nginx-module#ngxrematch 

处理手段:

在 ApisixRoute 配置中,配置 response-rewrite 插件,设置 body 为维护提示信息,设置 vars 为特定请求的匹配条件。

代码片段
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
  name: account-api
spec:
  http:
    - name: account-api
      backends:
        - serviceName: account-api
          servicePort: 8000
      match:
        hosts:
          - api.example.com
        paths:
          - /account/*
      plugins:
        - name: cors
          enable: true
        - name: response-rewrite
          enable: true
          body: >-
            {
              "code": 503,
              "message": "服务器正在维护中,预计维护时间为 2024-12-17 00:00:00 - 2024-12-17 06:00:00"
            }            
          body_base64: false # 是否 body base64,适用于二进制数据
          vars:
            - - arg_pkg
              - ~=
              - "com.company.io"

4.2 APISIX 插件的执行顺序是怎么样的? #

APISIX 会优先执行全局的插件,然后再执行路由级别的插件。每个插件内部定义了一个优先级 priority,优先级越高的插件越先执行。

APISIX Plugin priority

如果想要调整插件的顺序,可以配置 _meta 字段, 参见 APISIX#Plugin Custom priority

5. ShardingSphere Proxy #

5.1 HINT策略 在 ShardingSphere Proxy 中的使用 #

在前文 ShardingSphere Proxy 问题几则 中提到了 Shardingsphere 支持 HINT 策略,即通过 SQL Hint 来指定路由规则。

这里的场景是针对已经使用了标准分片算法中 Inline 策略的场景,但是个别场景没有办法使用分片键进行路由,因此需要使用 HINT 策略。比如说有一个 t_order 表,进行了数据分片,分库分表的策略是:根据 mch_id 取模分为 2 个库,根据 user_id 取模分为 2 个表。

Shardingsphere Proxy Sharding Strategy

现在要针对这个表清理数据。最简单的想法就是直接执行 DELETE FROM t_order WHERE created_at < 'yyyy-MM-dd' 来清理数据,但是很明显直接删除会导致性能问题。可以考虑使用批量删除的方式,例如:

DELETE FROM t_order WHERE created_at < 'yyyy-MM-dd' limit 1000;

但是这样的方式不被 Shardingsphere Proxy 支持,因为这样的 SQL 语句没有分片键会导致全路由,Shardingsphere Proxy 并不支持。

Shardingsphere Proxy delete error

文档查阅:

这里需要提前开启 sqlCommentParseEnable 选项,以支持 sql comment 解析。

处理手段:

想要让这样的删除语句可以落到特定的分片上去,那么我们可以使用 HINT 策略,即在 SQL 语句中添加 Hint 来指定路由规则,如下:

/* ShardingSphere hint: dataSourceName=sharding_db_0 */ delete from t_order_0 where created_at < 'yyyy-MM-dd' limit 1000;

Shardingsphere Proxy delete success

6. Kafka 相关 #

6.1 如何将迁移kafka集群中的数据? #

在实际场景中,可能会有迁移 kafka 集群的需求,比如说迁移到新的集群,或者迁移到新的版本等。这里的迁移是指迁移数据,而不是迁移集群。一般说来,kafka 数据迁移分为两种:

  • 集群内迁移:比如新增一个 broker, 需要将现有的数据重新分布,已实现负载均衡。bin/kafka-reassign-partitions.sh 脚本可以用于迁移数据, 参见 Kafka#automigrate
  • 集群间迁移:比如将 cluster-ea 中的 topic-demo 整体迁移到 cluster-eu 中。

查阅文档/思路总结:

跨集群迁移的思路是:

  1. 使用 mirror-maker 先将数据从 cluster-ea 中复制到 cluster-eu 中。
  2. 迁移所有的消费者到 cluster-eu 中
  3. 迁移所有的生产者到 cluster-eu 中

处理手段:

以下命令仅供参考,具体的参数需要根据实际情况调整。

  1. 使用 mirror-maker 复制数据

    ./kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist "topic-demo"
    
  2. 迁移消费者

    修改消费者配置文件中的 bootstrap.servers 为新的集群地址

  3. 迁移生产者

    修改生产者配置文件中的 bootstrap.servers 为新的集群地址

  4. 停止 mirror-maker

7. Pyroscope 相关 #

Pyroscope 是一个开源的性能监控工具,支持多种语言 SDK,支持多种采集模式。Pyroscope 有两种采集模式:

  • Pull 模式:Pyroscope 会定时从应用程序中拉取数据。程序需要主动暴露相应的端口,参见:Pyroscope#Scrape
  • Push 模式:应用程序通过内置 SDK 主动推送数据到 Pyroscope,参见:Pyroscope#Push

如下:

Pyroscope Pull/Push Mode

7.1 使用 Go Pull 模式采集数据时为什么只有 cpu + gourotines + cpu samples 三个指标? #

按照官方文档配置好 Alloy 服务,并在应用上暴露 /debug/pprof/* 等端口之后(使用标准库的 pprof),发现在 Pyroscope 中只有 cpu + gourotines + cpu samples 三个指标,而没有其他的指标。如下图:

Pyroscope Go Pull Mode Issue

此时使用的 alloy scrape 配置如下:

代码片段
pyroscope.scrape "scrape_job_name" {
        // 这部分仅作演示
        targets    = [{"__address__" = "localhost:4040", "service_name" = "example_service"}]
        forward_to = [pyroscope.write.write_job_name.receiver]

        // 关注这部分配置
        profiling_config {
                profile.process_cpu {
                        enabled = true
                }

                profile.godeltaprof_memory {
                        enabled = true
                }

                profile.memory { // disable memory, use godeltaprof_memory instead
                        enabled = false
                }

                profile.godeltaprof_mutex {
                        enabled = true
                }

                profile.mutex { // disable mutex, use godeltaprof_mutex instead
                        enabled = false
                }

                profile.godeltaprof_block {
                        enabled = true
                }

                profile.block { // disable block, use godeltaprof_block instead
                        enabled = false
                }

                profile.goroutine {
                        enabled = true
                }
        }
}

文档查阅:

通过文档就很明了了,因为 alloy scrape 任务配置的 memory、mutex、block 等指标都都指向了 godeltaprof_memory、godeltaprof_mutex、godeltaprof_block 等指标,其访问路径期望是 /debug/pprof/delta_heap 而不是应用实际暴露的 /debug/pprof/allocs,所以才有没有 memory 相关的指标。

delta_heap 等指标是在应用程序中通过 SDK 新增的指标,对于 Go 标准库中的 pprof 并不包含。

关于 godeltaprof 这个包,是 runtime/pprof 的一个 fork 版本:减少了内存分配,相应的GC压力更小;同时支持惰性采样,采样更加高效(样本尺寸更小)。

处理手段1:

在应用程序中使用 pyroscope SDK,新增 godeltaprof 指标。

处理手段2:

继续在应用程序中使用标准库的 pprof,调整 scrape 配置,将 memory、mutex、block 等指标指向标准库的 pprof。

代码片段
                profile.godeltaprof_memory {
-                        enabled = true
+                        enabled = false
                }

                profile.memory { // disable memory, use godeltaprof_memory instead
-                        enabled = false
+                        enabled = true
                }

                profile.godeltaprof_mutex {
-                        enabled = true
+                        enabled = false
                }

                profile.mutex { // disable mutex, use godeltaprof_mutex instead
-                        enabled = false
+                        enabled = true
                }

                profile.godeltaprof_block {
-                        enabled = true
+                        enabled = false
                }

                profile.block { // disable block, use godeltaprof_block instead
-                        enabled = false
+                        enabled = true
                }

效果展示:

调整后的效果如下图所示,已经正常展示:

Pyroscope Go Pull Mode Success

8. Doris 相关 #

8.1 动态分区表插入数据时失败,提示 “no partition for this tuple” #

存在这么一个动态分区表 根据 day 字段动态分区,其 DDL 如下:

create table if not exists a_daily_tbl (
    `mch_id` INT not null,
    `user_id` BIGINT not null,
    `day` DATE not null,
    `type` VARCHAR(64) not null,
    `count` INT SUM,
    `total` BIGINT SUM,
    `add_total` BIGINT SUM,
    `updated_at` DATETIME MAX
)
AGGREGATE KEY(`mch_id`, `user_id`, `day`,`type`)
PARTITION BY range (`day`) ()
DISTRIBUTED BY HASH(`mch_id`, `user_id`) BUCKETS AUTO
PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-90",
    "dynamic_partition.end" = "1",
    "dynamic_partition.prefix" = "a_daily_tbl_prefix",
    "dynamic_partition.buckets"="2",
    "dynamic_partition.create_history_partition"="true"
);

当某一天如 2024-12-31 这一天的数据想要插入未来的一条数据,执行如下 SQL 语句:

INSERT INTO a_daily_tbl values (101, 101743183, '2025-01-02', 'type_1', 1, 5000000, 5000000, '2025-01-02 02:40:16');

提示错误:

errCode = 2, detailMessage = Insert has filtered data in strict mode. url: http://HOST:PORT/api/_load_error_log?file=__shard_3/error_log_insert_stmt_16be9eb83cde44be-a0ecc5687c9ead8e_16be9eb83cde44be_a0ecc5687c9ead8e

# 链接中的内容如下:
Reason: no partition for this tuple. tuple=
+---------------+---------------+---------------+---------------+-----------------+-----------------+-----------------+----------------------+
|(Int32)        |(Int64)        |(DateV2)       |(String)       |(Nullable(Int32))|(Nullable(Int64))|(Nullable(Int64))|(Nullable(DateTimeV2))|
+---------------+---------------+---------------+---------------+-----------------+-----------------+-----------------+----------------------+
|            101|      101743183|     2025-01-02|           type_1|                1|          5000000|          5000000|   2025-01-02 02:40:16|
+---------------+---------------+---------------+---------------+-----------------+-----------------+-----------------+----------------------+
. src line []; 

文档查阅:

根据文档,动态分区表的分区范围是在 dynamic_partition.startdynamic_partition.end 之间的,如果插入的数据超出了这个范围,就会提示 “no partition for this tuple”。

dynamic_partition.start

The starting offset of the dynamic partition, usually a negative number. Depending on the time_unit attribute, based on the current day (week / month), the partitions with a partition range before this offset will be deleted. If not filled, the default is -2147483648, that is, the history partition will not be deleted.

代表动态分区的起始偏移量,通常是一个负数。根据 time_unit 属性的不同,基于当前的天(周/月),在这个偏移量之前的分区范围的分区将被删除。如果未填写,则默认为 -2147483648,即历史分区不会被删除。

dynamic_partition.end

The end offset of the dynamic partition, usually a positive number. According to the difference of the time_unit attribute, the partition of the corresponding range is created in advance based on the current day (week / month).

代表动态分区的结束偏移量,通常是一个正数。根据 time_unit 属性的不同,基于当前的天(周/月),提前创建对应范围的分区。

处理手段:

因此从 DDL 中可以看出,这个表的动态分区范围是从 -90 天到 1 天,因此当我们在 2024-12-31 这一天插入 2025-01-02 (cur + 2) 这一天的数据时,超出了动态分区的范围,因此提示分区不存在。

那么解决办法要么就是调整插入数据的时间,要么就是调整动态分区的范围。

  • 调整动态分区的范围,将 dynamic_partition.end 调整为 2 天。

    ALTER TABLE a_daily_tbl SET ("dynamic_partition.end" = "2");
    

    然后稍等片刻,确认分区已经创建 show partitions from a_daily_tbl,然后再插入数据就可以了。

访问量 访客数