Victoria Logs 简单易上手,非常适合中小团队使用,同时也适用于成本敏感的场景。
官方建议:如果可以接受在单节点上垂直扩展来满足业务需求,那么就不必使用集群模式。
本文包含大量源码片段,若不关注细节,可只阅读 架构概览 和 存储模型。
架构概览 #
Victoria Logs 的集群架构如下:
- 通过 vlagent 完成类似 Replica 的功能,将日志数据复制到多个 vlstorage 组件中,实现高可用和数据冗余。
- 通过 vlinsert 执行日志数据的写入策略。
- 通过 vlselect 来聚合多个 vlstorage 组件中的日志数据,实现查询功能。
- 通过 vlstorage 可以无缝的实现扩展存储容量,而不用考虑数据迁移(rebalance)。
vlstorage 的缩容稍微复杂一些,这里提供一种方案:
- 先将需要缩容的 vlstorage 节点从 vlinsert 的
storageNode列表中移除,vlselect 中保留; - 利用 Victoria Logs 的数据保留功能,让数据随逐步过期,直到待删除节点上的数据全部过期;
- 最后将其从 vlselect 中移除,并释放 vlstorage 节点。
也可以参考 Partitions lifecycle,手动执行数据迁移。
vlagent #
vlagent 是 Victoria Logs 的代理组件(类似 OpenTelemetry 的 Agent),位置更靠近日志源,同时也是一层缓冲区,负责接收日志数据并将其发送到 vlinsert 组件。
它可以将数据写入到不同的 Victoria Logs 实例中,实现数据的多写(复制),是实现高可用的必要组件。
PS:这里就不做过多的展开,有兴趣可以自行翻看源码。
vlinsert #
vlinsert 是 Victoria Logs 的写入组件,为无状态服务。写入时根据“局部性”和“均匀分布”策略选择目标 vlstorage 节点。
代码片段
// app/vlstorage/netinsert/netinsert.go
func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
idx := s.srt.getNodeIdx(streamHash)
// sns []*storageNode 代表 vlstorage 节点组
sn := s.sns[idx]
// 进一步追踪到 sn.mustSendInsertRequest 代表通过网络写入到 vlstorage 节点
sn.addRow(r)
}
// app/vlstorage/netinsert/netinsert.go
func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 {
if srt.nodesCount == 1 {
// 如果只有一个 vlstorage 节点,那么就直接写入到该节点
return 0
}
// streamHash := sid.id.lo ^ sid.id.hi,这里的 sid 就是 streamID
// sid.id = hash128(bb.B) 就是 stream 的标签和值 hash 得来的
// 可以参见 LogRows.MustAdd 中的过程
streamRows := srt.rowsPerStream[streamHash] + 1
srt.rowsPerStream[streamHash] = streamRows
// 如果 Stream 中的日志数量小于 1000 条,那么写入到同一个 vlstorage 组件中
// 对于只包含少量日志的 Stream,可提高局部性;当某个 Stream 量较大时则
// 分散到不同的 vlstorage 节点,以提高查询性能。
if streamRows <= 1000 {
return streamHash % uint64(srt.nodesCount)
}
return uint64(fastrand.Uint32n(uint32(srt.nodesCount)))
}
// app/vlstorage/netinsert/netinsert.go
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) {
// sn 代表当前选中的写入节点
err := sn.sendInsertRequest(pendingData)
// 如果写入成功,那么这里就直接返回了
// 否则,会尝试其他节点
// sn.s 代表的是 Storage
// sendInsertRequestToAnyNode 方法则会随机选中一个节点尝试(实际调用 storageNode.sendInsertRequest)
for !sn.s.sendInsertRequestToAnyNode(pendingData) {
// 省略
}
}
// app/vlstorage/netinsert/netinsert.go
func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) error {
// 如果没有禁用压缩,就使用 zstd 压缩数据
var body io.Reader
if !sn.s.disableCompression {
bb := zstdBufPool.Get()
defer zstdBufPool.Put(bb)
bb.B = zstd.CompressLevel(bb.B[:0], pendingData.B, 1)
body = bb.NewReader()
} else {
body = pendingData.NewReader()
}
// 对 vlstorage 节点请求 internal/insert 接口
reqURL := sn.getRequestURL("/internal/insert")
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, body)
// ...
resp, err := sn.c.Do(req)
// ...
}
小结
在集群模式下,vlinsert 负责从 vlstorage 节点列表中选择写入目标,路由策略如下:
- Stream 中的日志数量小于 1000 时,同一个 Stream 的日志会被写入到同一个 vlstorage 组件中。
- Stream 中的日志数量大于等于 1000 时,会随机选择一个 vlstorage 节点,将日志写入到该组件中。
- 如果写入失败,会尝试写入到其他节点。
vlselect #
vlselect 是 Victoria Logs 的查询组件,负责接收查询请求并返回查询结果。这里以 /select/logsql/query 接口为例,介绍查询的过程。
入口函数为:app/vlselect/logsq/logsql.go#ProcessQueryRequest
代码片段
// app/vlselect/logsq/logsql.go
// ProcessQueryRequest handles /select/logsql/query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-logs
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// 省略参数解析,metrics 采集等等
// Execute the query
if err := vlstorage.RunQuery(qctx, writeBlock); err != nil {
// 省略错误处理
return
}
}
// app/vlstorage/main.go
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
// 这部分的目的是用来判断查询是否可以优化为:直接返回最后的 N 条日志,而不用先执行过滤再排序
// 比如:
// - 'sort by (_time desc) offset <offset> limit <limit>'
// - 'first <limit> by (_time desc)'
// - 'last <limit> by (_time)'
qOpt, offset, limit := qctx.Query.GetLastNResultsQuery()
if qOpt != nil {
qctxOpt := qctx.WithQuery(qOpt)
return runOptimizedLastNResultsQuery(qctxOpt, offset, limit, writeBlock)
}
// localStorage 在 vlstorage 节点中才会被赋值
if localStorage != nil {
return localStorage.RunQuery(qctx, writeBlock)
}
return netstorageSelect.RunQuery(qctx, writeBlock)
}
可以看出,RunQuery 方法不仅由 vlselect 调用,vlstorage 节点中也会调用。目前先关注 netstorageSelect 这个分支:
代码片段
// app/vlstorage/netselect/netselect.go
func (s *Storage) RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
// nqr 代表 NetQueryRunner,用于执行分布式查询
nqr, err := logstorage.NewNetQueryRunner(qctx, s.RunQuery, writeBlock)
if err != nil {
return err
}
search := func(stopCh <-chan struct{}, q *logstorage.Query, writeBlock logstorage.WriteDataBlockFunc) error {
qctxLocal := qctx.WithQuery(q)
return s.runQuery(stopCh, qctxLocal, writeBlock)
}
// nqr.Run 实际执行的还是 Storage.runQuery 方法
// nqr.Run 其中会将 pipe 分成 remote pipe 和 local pipe
// 分别执行分布式查询和本地查询
concurrency := qctx.Query.GetConcurrency()
return nqr.Run(qctx.Context, concurrency, search)
}
// app/vlstorage/netselect/netselect.go
func (s *Storage) runQuery(stopCh <-chan struct{}, qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
// ...
// 这里可以看出来,runQuery 方法会并发执行所有的 vlstorage 节点的查询
for i := range s.sns {
go func(nodeIdx int) {
err := sn.runQuery(qctxLocal, func(db *logstorage.DataBlock) {
writeBlock(uint(nodeIdx), db)
})
}(i)
}
// ...
}
func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func(db *logstorage.DataBlock)) error {
path := "/internal/select/query"
responseBody, reqURL, err := sn.getResponseBodyForPathAndArgs(qctx.Context, path, args)
if err != nil {
return err
}
defer responseBody.Close()
// 解析响应,省略
}
小结
vlselect 会并发向所有 vlstorage 节点发送查询请求,查询使用的是节点的 /internal/select/query 接口。
从这部分实现也能看出,Victoria Logs 可以实现 vlstorage 的无缝扩展。这种查询方式会向所有 vlstorage 节点发送请求,并将各节点返回的结果进行合并。也存在一些缺点,例如:
- 木桶效应:查询响应时间受最慢节点影响;
- 即使某些节点不存在相关数据,仍会参与查询执行;
- 网络开销不可避免,延迟会增加。
这也解释了为什么 Victoria Logs 更推荐在可接受的情况下使用单节点模式,而非集群。
vlstorage #
vlstorage 是 Victoria Logs 的存储组件,负责管理日志数据的存储,并为 vlselect 提供查询接口(/internal/select/query),为 vlinsert 提供写入接口(/internal/insert)。
查询接口 #
/internal/select/query 映射为 processQueryRequest 方法:
代码片段
// app/vlselect/internalselect/internalselect.go
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// ...
// 这里就是在跟踪 vlselect 时调用的 RunQuery 方法
if err := vlstorage.RunQuery(qctx, writeBlock); err != nil {
return err
}
}
// app/vlstorage/main.go
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
// ...
// 这里就要关注 localStorage.RunQuery 方法
if localStorage != nil {
return localStorage.RunQuery(qctx, writeBlock)
}
return netstorageSelect.RunQuery(qctx, writeBlock)
}
这也进入了 Victoria Logs 的核心存储层,具体细节在后文“存储原理”部分展开。
写入接口 #
/internal/insert 对应的处理方法在 app/vlinsert/internalinsert/internalinsert.go#RequestHandler 中:
代码片段
// app/vlinsert/internalinsert/internalinsert.go
func RequestHandler(w http.ResponseWriter, r *http.Request) {
// ...
// CommonParams 包含了写入时的一些公共参数, 如:
// TenantID logstorage.TenantID
// TimeFields []string
// MsgFields []string
// StreamFields []string
// IgnoreFields []string
// 等等
cp, err := insertutil.GetCommonParams(r)
// 根据压缩类型解析 body 中的数据, 再通过 parseData 转为 InsertRow 类型
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.NewLogMessageProcessor("internalinsert", false)
irp := lmp.(insertutil.InsertRowProcessor)
err := parseData(irp, data)
lmp.MustClose()
return err
})
}
// app/vlinsert/internalinsert/internalinsert.go
func parseData(irp insertutil.InsertRowProcessor, data []byte) error {
// 从对象池中获取 InsertRow 对象
r := logstorage.GetInsertRow()
src := data
i := 0
for len(src) > 0 {
tail, err := r.UnmarshalInplace(src)
src = tail
i++
// 将解析后的 InsertRow 添加到 InsertRowProcessor 中
irp.AddInsertRow(r)
}
}
InsertRowProcessor 的实现是 logMessageProcessor:
代码片段
// app/vlinsert/insertutil/common_params.go
func (lmp *logMessageProcessor) AddInsertRow(r *logstorage.InsertRow) {
// 超过 MaxFieldsPerLine(默认 1000) 个字段的日志行会被丢弃
if len(r.Fields) > *MaxFieldsPerLine {
return
}
// 调用 logstorage.LogRows 将 InsertRow 添加到 LogRows 中
lmp.lr.MustAddInsertRow(r)
// 如果需要 flush,则调用 flushLocked 方法
if lmp.lr.NeedFlush() {
lmp.flushLocked()
}
}
到这里也进入到 Victoria Logs 的核心存储层,同样在介绍存储原理中再展开。
存储原理 #
从 Victoria Logs - How does victorialogs works 可以得知,Victoria Logs 采用了以下设计:
- 日志被作为 JSON 条目存储。
- 日志的字段会保存到不同的数据块中。
- 不同日志的相同字段会保存在同一个数据块中。
- 数据块会压缩存储,以减少磁盘空间占用。
- 较小的数据块会在后台合并成较大的数据块。
- 查询过程中,每个数据块会被原子的并发的读取。
此外,Victoria Logs 还采用以下优化以提升查询效率:
- 引入了 Bloom Filter 来跳过没有给定关键字的数据块。
- 针对不同数据类型的字段采用自定义编码和压缩。
- 相同的 Stream 被物理的分组(使用 Stream Filter 可以跳过不需要的数据块)。
- 为日志时间维护了一个稀疏索引(使用 Time Filter 可以提高查询效率)。
仅凭文字说明难以充分理解 Victoria Logs 的存储原理,仍需进一步深入源码来把握细节。
这部分源码在 lib/logstorage/ 目录下。
写入流程 #
前面我们跟踪到 logMessageProcessor 中的 AddInsertRow 方法,其中调用了 flushLocked 方法:
代码片段
// app/vlinsert/insertutil/common_params.go
func (lmp *logMessageProcessor) flushLocked() {
logRowsStorage.MustAddRows(lmp.lr)
}
// app/vlstorage/main.go
// 这个方法在 vlinsert 中也被调用,只不过就是使用的 netstorageInsert.AddRow 方法
func (*Storage) MustAddRows(lr *logstorage.LogRows) {
if localStorage != nil {
localStorage.MustAddRows(lr)
} else {
// Store lr across the remote storage nodes.
lr.ForEachRow(netstorageInsert.AddRow)
}
}
localStorage(logstorage.Storage) 代表的就是 Victoria Logs 的本地存储层实现:
代码片段
// lib/logstorage/storage.go
type Storage struct {
path string // 存储目录的路径
retention time.Duration // 数据保留时间
flockF *os.File // 用于确保 Storage 仅被单个进程打开的文件锁
partitions []*partitionWrapper // 分区列表,按时间排序,例如 partitions[0] 是最早的分区
ptwHot *partitionWrapper // 最新的分区,用于写入新的日志行
// ... 省略其他字段
}
这里新出了 partition 的概念,其实对应的就是日期,每个日期对应一个分区。再回到 MustAddRows 方法:
代码片段
func (s *Storage) MustAddRows(lr *LogRows) {
// Fast path:
// 尝试将 LogRows 全部写入到 ptwHot 中
ptwHot := s.ptwHot
if ptwHot != nil {
if ptwHot.canAddAllRows(lr) {
ptwHot.pt.mustAddRows(lr)
return
}
}
// Slow path:
// 如果 LogRows 不能被 ptwHot 全部写入,那么就需要把 LogRows 拆分成多个 LogRows,写入到不同的分区中
// PS: 历史的分区可能需要从磁盘加载,会增加写入延迟
now := time.Now().UnixNano()
minAllowedDay := s.getMinAllowedDay(now) // 保留策略:过去的时间
maxAllowedDay := s.getMaxAllowedDay(now) // 保留策略:未来的时间
minAllowedTimestamp := now - s.maxBackfillAge.Nanoseconds() // 最大能接受的历史日志时间
// 遍历 LogRows 中的每个日志行,过滤掉不符合要求的日志行,
// 并根据日志行的时间戳,将其添加到同一分区(日期)的 LogRows 中
m := make(map[int64]*LogRows)
for i, ts := range lr.timestamps {
day := ts / nsecsPerDay
// 根据保留策略 和 最大能接受的历史日志时间 过滤掉不符合要求的日志行
// ...
lrPart := m[day]
lrPart.mustAddInternal(lr.streamIDs[i], ts, lr.rows[i], lr.streamTagsCanonicals[i])
}
for day, lrPart := range m {
ptw := s.getPartitionForWriting(day)
if ptw != nil {
ptw.pt.mustAddRows(lrPart)
}
}
}
这里可以得知 Storage 的写入是分区(day)的,而分区具体的写入由 partitionWrapper.partition 控制。
代码片段
// lib/logstorage/partition.go
type partition struct {
s *Storage
// path 是分区的完整目录路径。例如 /data/logstorage/partitions/20230801
// 前面的 /data/logstorage 可以配置,后面则是 Victoria Logs 自己的目录结构
path string
name string // name 是分区的名称,是目录名。如:20230801
idb *indexdb // idb 是索引数据库,用于存储日志行的索引信息
ddb *datadb // ddb 是数据数据库,用于存储日志行的原始数据
// ....
}
// lib/logstorage/partition.go
// mustAddRows 也就是把数据写入到索引数据库和数据数据库中,只是其中为了提高性能过滤了已经存在的 Stream
func (pt *partition) mustAddRows(lr *LogRows) {
// 将 新增 的 Stream 注册到 indexdb 中
if !pt.idb.hasStreamID(streamID) {
streamTagsCanonical := streamTagsCanonicals[rowIdx]
pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
}
// 最后把 LogRows 写入到 datadb 中
pt.ddb.mustAddRows(lr)
}
显然 Victoria Logs 的分区中主要由 索引数据库(indexdb)和 数据数据库(datadb)构成。下面就依次跟踪下 indexdb 和 datadb 的写入流程。
indexdb 写入 #
indexdb 用于存储 Stream 的索引信息,基于 MergeSet 实现。
代码片段
// lib/logstorage/indexdb.go
type indexdb struct {
path string // path 是索引数据库的目录路径:如 path/to/partition/indexdb 目录
partitionName string // partitionName 是索引数据库所属的分区名称(天)
tb *mergeset.Table // tb 是索引数据库的存储,用于存储索引信息
s *Storage // s 是 indexdb 所属的 Storage
}
- 索引条目准备
mustRegisterStream 会写入三类索引条目:
tenantID:streamID:streamID 到 streamTagsCanonical 的映射tenantID:streamID -> streamTagsCanonical:streamID 到 streamTagsCanonical 的映射tenantID:name:value -> streamID:tag 到 streamID 的映射
代码片段
// lib/logstorage/indexdb.go
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
// Register tenantID:streamID entry.
bufLen := len(buf)
buf = marshalCommonPrefix(buf, nsPrefixStreamID, tenantID)
buf = streamID.id.marshal(buf)
items = append(items, buf[bufLen:])
// Register tenantID:streamID -> streamTagsCanonical entry.
bufLen = len(buf)
buf = marshalCommonPrefix(buf, nsPrefixStreamIDToStreamTags, tenantID)
buf = streamID.id.marshal(buf)
buf = append(buf, streamTagsCanonical...)
items = append(items, buf[bufLen:])
// Register tenantID:name:value -> streamIDs entries.
tags := st.tags
for i := range tags {
bufLen = len(buf)
buf = marshalCommonPrefix(buf, nsPrefixTagToStreamIDs, tenantID)
buf = tags[i].indexdbMarshal(buf)
buf = streamID.id.marshal(buf)
items = append(items, buf[bufLen:])
}
// Add items to the storage
idb.tb.AddItems(items)
}
这里以一个具体的例子来说明:
假设我们有一个 Stream {"tag1":"value1", "tag2":"value2"}, 其 tenantID 为 0, streamID 为 0x12345678(hi) 90abcdef(lo),那么会生成如下的索引条目:
- 0x00(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID)
- 0x01(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID) 0x02(tagCount) 0x04(tag1NameLen) "tag1"(tag1Name) 0x06(tag1ValueLen) "value1" 0x04(tag2NameLen) "tag2" 0x06 "value2"
- 0x02(indexType) 0x00000000(tenantID) 0x04(tagNameLen) "tag1"(tagName) 0x06(tagValueLen) "value1"(tagValue) 0x12345678 0x90abcdef (streamID)
- 0x02(indexType) 0x00000000(tenantID) 0x04(tagNameLen) "tag2"(tagName) 0x06(tagValueLen) "value2"(tagValue) 0x12345678 0x90abcdef (streamID)
- 写入内存块
这些索引条目会加入到 indexdb 的 mergeset.Table 存储引擎中,此引擎负责将索引条目写入到内存块(inmemoryBlock)中,这里内存块采用了分片管理,分片数量 和 cpu 逻辑核数(GOMAXPROCS) 正相关cpus * multiplier (multiplier = cpus, 最大 16)。
并在合适的时机触发合并操作。
代码片段
type rawItemsShard struct {
ibs []*inmemoryBlock
}
type inmemoryBlock struct {
commonPrefix []byte // 公共前缀,减少重复存储
data []byte
items []Item
}
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/encoding.go
func (ib *inmemoryBlock) Add(x []byte) bool {
data := ib.data
// 如果写入当前索引条目后,内存块大小超过了 maxInmemoryBlockSize(64KB),
// 那么就不能写入当前索引条目,返回 false
if len(x)+len(data) > maxInmemoryBlockSize {
return false
}
dataLen := len(data)
data = append(data, x...)
ib.items = append(ib.items, Item{
Start: uint32(dataLen),
End: uint32(len(data)),
})
ib.data = data
return true
}
inmemoryBlock 合并时,会将前面的索引条目加入 data 和 items 中,后面的索引条目会加入到下一个内存块中,比如:
{
data([]byte): [
0x00(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID) // 26B
0x01(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID) 0x02(tagCount) 0x04(tag1NameLen) "tag1"(tag1Name) 0x06(tag1ValueLen) "value1" 0x04(tag2NameLen) "tag2" 0x06 "value2" // 56B
],
items: [
Item{Start: 0, End: 26},
Item{Start: 26, End: 82},
],
}
- 内存块合并
mergeset.Table 会将 inmemoryBlock 合并成一个更大的内存块(inmemoryPart), inmemoryPart 也会进一步合并并在合适的时机将其刷入到持久化磁盘中。
当单个 Shard 中的内存块超过 maxBlocksPerShard(256) 时,分片管理器会将这些内存块加入到自身的 ibsToFlush 中,等待合并操作。
代码片段
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/table.go
func (riss *rawItemsShards) addIbsToFlush(tb *Table, ibsToFlush []*inmemoryBlock) {
if len(ibsToFlush) == 0 {
return
}
var ibsToMerge []*inmemoryBlock
riss.ibsToFlushLock.Lock()
if len(riss.ibsToFlush) == 0 {
riss.updateFlushDeadline()
}
// 将当前分片的内存块加入到 ibsToFlush 中
riss.ibsToFlush = append(riss.ibsToFlush, ibsToFlush...)
// 如果待合并的内存块数量超过了 maxBlocksPerShard(256) * cpus 这一阈值,
// 那么就需要进行合并操作,将这些内存块合并成一个更大的内存块(inmemoryPart)
if len(riss.ibsToFlush) >= maxBlocksPerShard*cgroup.AvailableCPUs() {
ibsToMerge = riss.ibsToFlush
riss.ibsToFlush = nil
}
riss.ibsToFlushLock.Unlock()
// 将内存块合并成一个更大的内存块(inmemoryPart)
tb.flushBlocksToInmemoryParts(ibsToMerge, false)
}
在 flushBlocksToInmemoryParts 中将 ibsToMerge 按最多 defaultPartsToMerge(16) 大小分成多个 chunk,再把这些 chunk 合并成一个更大的内存块(inmemoryPart)。注意单个 inmemoryPart 大小不能超过 5% 的可用系统内存。
代码片段
func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool) {
pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)
// 按最多 defaultPartsToMerge(16) 大小分成多个 chunk,
// 每个 chunk 合并成一个更大的内存块(inmemoryPart)
for len(ibs) > 0 {
n := defaultPartsToMerge
if n > len(ibs) {
n = len(ibs)
}
go func(ibsChunk []*inmemoryBlock) {
if pw := tb.createInmemoryPart(ibsChunk); pw != nil {
pws = append(pws, pw)
}
}(ibs[:n])
ibs = ibs[n:]
}
// 处理所有的 chunk (partWrapper), 将其合并成一个更大的内存块(inmemoryPart)
// 除非超过大小限制
maxPartSize := getMaxInmemoryPartSize()
for len(pws) > 1 {
// 合并 inmemoryPart
pws = tb.mustMergeInmemoryParts(pws)
pwsRemaining := pws[:0]
for _, pw := range pws {
// 如果大小超过 maxPartSize,那么就直接加入到 inmemoryParts 中
if pw.p.size >= maxPartSize {
tb.addToInmemoryParts(pw, isFinal)
} else {
pwsRemaining = append(pwsRemaining, pw)
}
}
pws = pwsRemaining
}
if len(pws) == 1 {
tb.addToInmemoryParts(pws[0], isFinal)
}
}
默认的可用内存 = 系统内存 * 60% (可以通过
memory.allowedPercent配置)
合并不只是上面提到的部分,程序还会在后台对 inmemoryPart/filePart 进行合并。
- 持久化
经过合并后的 inmemoryPart 会被写入到磁盘中,如果满足以下条件,则会被刷入到磁盘中:
- flushInterval 定时触发
- part(inmemoryPart / filePart) 合并时
代码片段
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/table.go
func (tb *Table) nextMergeIdx() uint64 {
return tb.mergeIdx.Add(1)
}
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/table.go
func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
mergeIdx := tb.nextMergeIdx()
dstPartPath := ""
if dstPartType == partFile {
// 合并后的 inmemoryPart 会被写入到磁盘中,文件名格式为 %016X,如 18804EBAD6A80650
dstPartPath = filepath.Join(tb.path, fmt.Sprintf("%016X", mergeIdx))
}
// 只有一个 inmemoryPart 时,直接将其写入到磁盘中
if isFinal && len(pws) == 1 && pws[0].mp != nil {
mp := pws[0].mp
mp.MustStoreToDisk(dstPartPath)
// 打开新创建的 filePart
pwNew := tb.openCreatedPart(pws, nil, dstPartPath)
// 将合并后的 inmemoryPart 加入到 inmemoryParts 中
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
return nil
}
}
可以看到,inmemoryPart 会被写入到磁盘目录,目录格式为 %016X(如 18804EBAD6A80650)。实际写入文件的逻辑在 mp(*inmemoryPart).MustStoreToDisk 方法中。
代码片段
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/inmemory_part.go
func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
metaindexPath := filepath.Join(path, metaindexFilename) // metaindex.bin
indexPath := filepath.Join(path, indexFilename) // index.bin
itemsPath := filepath.Join(path, itemsFilename) // items.bin
lensPath := filepath.Join(path, lensFilename) // lens.bin
var psw filestream.ParallelStreamWriter
psw.Add(metaindexPath, &mp.metaindexData)
psw.Add(indexPath, &mp.indexData)
psw.Add(itemsPath, &mp.itemsData)
psw.Add(lensPath, &mp.lensData)
// 并发执行前面添加的刷写任务
psw.Run()
mp.ph.MustWriteMetadata(path) // 写入 parts.json 文件
fs.MustSyncPathAndParentDir(path)
}
小结:
indexdb 用于存储 Stream 索引信息,位于 path/to/partitions/indexdb。索引包括:tenantID → streamID、streamID → streamTagsCanonical、以及标签(tag)到 streamID 的映射。
indexdb 的存储按照 part 单位进行管理,每个 part 对应一个文件夹(18804EBAD6A80650),每个文件夹中包含如下的文件:
| 文件名 | 描述 |
|---|---|
| metadata.json | (Part) 索引元数据: {索引数量,块数量, 第一个项的索引, 最后一个项的索引} |
| metaindex.bin | 存储 metaindexRow(用来索引 indexBlock 或者 blockHeaders) |
| index.bin | 存储 blockHeader (commonPrefix、items 数量、索引 items 偏移、索引 lens 偏移等) |
| items.bin | (Block) stream 索引 items 数据(会使用 commonPrefix 手段进行压缩) |
| lens.bin | (Block) lens 索引 items 长度信息(用于支持 commonPrefix 压缩) |
datadb 写入 #
datadb 用于存储实际的日志数据。
继续跟踪 pt.ddb.mustAddRows,可以看到 ddb(*datadb).mustFlushLogRows 是实际写入 datadb 的方法。
代码片段
// lib/logstorage/datadb.go
func (ddb *datadb) mustFlushLogRows(lr *logRows) {
mp.mustInitFromRows(lr)
p := mustOpenInmemoryPart(ddb.pt, mp)
// 将 inmemoryPart 加入到 datadb.inmemoryParts 中
ddb.inmemoryParts = append(ddb.inmemoryParts, pw)
// 触发 inmemoryParts 合并
ddb.startInmemoryPartsMergerLocked()
}
其中 mp.mustInitFromRows(lr) 方法会将 logRows 转换为 inmemoryPart:
代码片段
// lib/logstorage/datadb.go
func (mp *inmemoryPart) mustInitFromRows(lr *logRows) {
sort.Sort(lr) // 根据 streamID 排序,再按 timestamp 排序
lr.sortFieldsInRows() // 根据 field 的 Name 进行排序
// 将 inmemoryPart 的各种 buffer 赋值给 blockStreamWriter,
// 后续 bsw.Finalize 实际上也是将 bsw 的数据写入到 inmemoryPart 中
bsw.MustInitForInmemoryPart(mp)
var sidPrev *streamID
uncompressedBlockSizeBytes := uint64(0)
timestamps := lr.timestamps
rows := lr.rows
streamIDs := lr.streamIDs
for i := range timestamps {
streamID := &streamIDs[i]
if sidPrev == nil {
sidPrev = streamID
}
// 注意:将相同流的日志写入同一个 block 中,如果超过 maxUncompressedBlockSize (2MB) 则会写入下一个 block
// 看 blockStreamWriter.MustWriteRows 方法
if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || !streamID.equal(sidPrev) {
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
trs.reset()
sidPrev = streamID
uncompressedBlockSizeBytes = 0
}
fields := rows[i]
trs.timestamps = append(trs.timestamps, timestamps[i])
trs.rows = append(trs.rows, fields)
uncompressedBlockSizeBytes += uint64(EstimatedJSONRowLen(fields))
}
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
// 将 bsw 中的数据更新到 inmemoryPart
bsw.Finalize(&mp.ph)
}
// lib/logstorage/block_stream_writer.go
func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, rows [][]Field) {
if len(timestamps) == 0 {
return
}
b := getBlock()
// !!! 这里把原始日志的字段写入到 block 中
b.MustInitFromRows(timestamps, rows)
bsw.MustWriteBlock(sid, b)
putBlock(b)
}
type block struct {
timestamps []int64
columns []column // 所有 row 会根据按列进行存储
constColumns []Field // 所有 row 这一列的值相同就可以存储在 constColumns 中,但值不能超过 256 字节
}
小结:
在这个环节,日志数据从行格式(logRows)转换为列格式(inmemoryPart),主要经历以下步骤:
- 对
logRows排序:同一stream的数据相邻并按时间排序,同时对Fields依据Field.Name进行排序,便于生成block; - 将排序后的
logRows中相同stream的数据写入一个或多个block(每个block最大maxUncompressedBlockSize(2MB)); block包含常量列(所有行该字段值相同)与普通列(各行值不完全相同);普通列保存所有行的该字段值(缺失则为空);- 将
block写入inmemoryPart,合并后刷入磁盘。
每个 part 包含下面几种文件:
| 文件名 | 存储内容 |
|---|---|
| metadata.json | (Part) part 元信息:记录数、格式版本、时间范围等 |
| metaindex.bin | (Part) 索引 indexBlockHeader (一组 block) |
| index.bin | (Part) 索引 blockHeader |
| column_names.bin | (Part) 中所有的列和列ID |
| column_idxs.bin | (Block) column 的 bloom/values 存在的 shardID |
| columns_header_index.bin | (Block) 索引 columnID 到 columnHeader 的偏移量 |
| columns_header.bin | (Block) columnHeader 数据 |
| timestamps.bin | (Block) 时间戳数据 |
| message_bloom.bin | (Block) 消息的布隆过滤器 |
| message_values.bin | (Block) 实际的日志消息 |
| bloom.bin{shard} | (Block) 普通列的布隆过滤器,用于快速判断是否包含某个消息 |
| values.bin{shard} | (Block) 普通列的实际值 |
其中 {shardIdx} 是分片索引,从 0 开始,代表了 bloom 和 values 在这个 part 中的分片数量。
Log 中的列名会存储在 column_names.bin 中,每个列名会有一个唯一的 ID,这个 ID 会在 column_idxs.bin 中存储。
读取流程 #
经过对写入流程的梳理,我们再回头来看读取流程。从 RunQuery 开始追踪:
Block 过滤 #
代码片段
// lib/logstorage/storage_search.go
func (s *Storage) runQuery(qctx *QueryContext, writeBlock writeBlockResultFunc) error {
// 从查询条件中解析出查询需要的参数:
// type storageSearchOptions struct {
// tenantIDs []TenantID
// streamIDs []streamID
// minTimestamp int64
// maxTimestamp int64
// streamFilter *StreamFilter
// filter filter
// fieldsFilter *prefixfilter.Filter
// hiddenFieldsFilter *prefixfilter.Filter
// timeOffset int64
// }
sso := s.getSearchOptions(qctx.TenantIDs, q, qctx.HiddenFieldsFilters)
s.searchParallel(workersCount, sso, qctx.QueryStats, stopCh, writeBlockToPipes)
}
// lib/logstorage/storage_search.go
func (s *Storage) searchParallel(workersCount int, sso *storageSearchOptions, qs *QueryStats, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) {
// ...
// 启动多个 blockSearch 协程,每个协程从 workCh 中取出一个 blockSearchWorkBatch 进行搜索
workCh := make(chan *blockSearchWorkBatch, workersCount)
for workerID := 0; workerID < workersCount; workerID++ {
go func(workerID uint) {
for bswb := range workCh {
for i := range bsws {
// bs blockSearch 代表对一个 block 进行搜索
bs.search(qsLocal, bsw, bm)
if bs.br.rowsLen > 0 {
writeBlock(workerID, &bs.br)
}
}
}
}(uint(workerID))
}
// 根据时间范围确定需要查询的 Partition
ptws, ptwsDecRef := s.getPartitionsForTimeRange(sso.minTimestamp, sso.maxTimestamp)
defer ptwsDecRef()
// 并发查询每个 Partition
for i, ptw := range ptws {
go func(idx int, pt *partition) {
psfs[idx] = pt.search(sso, qsLocal, workCh, stopCh)
}(i, ptw.pt)
}
}
// lib/logstorage/storage_search.go
func (pt *partition) search(sso *storageSearchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
// 如果查询条件中包含了 _streamFilter 那么从 indexdb 中获取到对应的 streamID
pso := pt.getSearchOptions(sso)
// 在 datadb 中执行查询
return pt.ddb.search(pso, qs, workCh, stopCh)
}
// lib/logstorage/datadb_search.go
func (ddb *datadb) search(pso *partitionSearchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
// 按照时间范围确定需要查询的 Part
pws, pwsDecRef := ddb.getPartsForTimeRange(pso.minTimestamp, pso.maxTimestamp)
// Apply search to matching parts
for _, pw := range pws {
pw.p.search(pso, qs, workCh, stopCh)
}
return pwsDecRef
}
// lib/logstorage/storage_search.go
func (p *part) search(pso *partitionSearchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
bhss := getBlockHeaders()
if len(pso.tenantIDs) > 0 {
p.searchByTenantIDs(pso, qs, bhss, workCh, stopCh)
} else {
// 这个方法是对 part 中的 index
p.searchByStreamIDs(pso, qs, bhss, workCh, stopCh)
}
putBlockHeaders(bhss)
}
从上述代码可以清晰看出,Victoria Logs 的前三层结构在逐层缩小查询范围以提高效率,并发查询每个 partition 和 part 以加速检索。
其中 part.searchByStreamIDs 的作用是基于 streamIDs 过滤出符合条件的 block(blockHeader):要求 streamID 落在目标集合内且时间范围命中目标区间,并将其加入 workCh。
根据 streamFilter 获取 streamIDs 的代码片段如下。以 _stream: {service="test-app"} 为例,对应的等值查询方法为 getStreamIDsForNonEmptyTagValue:
代码片段
func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagName, tagValue string) map[u128]struct{} {
ids := make(map[u128]struct{})
ts := &is.ts
kb := &is.kb
// 构建查询前缀:nsPrefixTagToStreamIDs + tenantID + tagName + tagValue
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
prefix := kb.B
// 找到第一条以 prefix 开头的记录
ts.Seek(prefix)
for ts.NextItem() {
if !bytes.HasPrefix(item, prefix) {
break
}
// 解析 streamID 并更新结果
tail := item[len(prefix):]
sp.UpdateStreamIDs(ids, tail)
}
return ids
}
这里的细节:
indexBlockHeader(s)从metaindex.bin加载;- 第一次过滤:通过
indexBlockHeader排除不包含目标streamID和时间范围的索引块; - 加载
blockHeader:从index.bin读取匹配索引块中的所有blockHeader; - 第二次过滤:依据
blockHeader再次排除不包含目标streamID和时间范围的块。
代码片段
// indexBlockHeader 包含索引是覆盖了多个 block
type indexBlockHeader struct {
streamID streamID // 代表的是覆盖的 block 中最小的 streamID
minTimestamp int64 // 代表的是覆盖的 block 中最小的 timestamp
maxTimestamp int64 // 代表的是覆盖的 block 中最大的 timestamp
indexBlockOffset uint64 // 代表的是在 indexFilename(index.bin) 中的指向存储数据位置的偏移量
indexBlockSize uint64 // 代表的是在 indexFilename(index.bin) 中存储数据的大小
}
// blockHeader 包含了单个 block 的元数据信息
type blockHeader struct {
streamID streamID // 代表的是 block 中存储的日志条目的 streamID
uncompressedSizeBytes uint64 // 代表的是 block 中存储的日志条目的原始(未压缩)大小
rowsCount uint64 // 代表的是 block 中存储的日志条目的数量
timestampsHeader timestampsHeader // timestampsHeader 包含了 block 中存储的日志条目的时间戳信息
columnsHeaderIndexOffset uint64 // 代表的是 columnsHeader 在 columnsHeaderIndexFilename(columns_header_index.bin) 中的偏移量
columnsHeaderIndexSize uint64 // 代表的是 columnsHeader 在 columnsHeaderIndexFilename(columns_header_index.bin) 中的大小
columnsHeaderOffset uint64 // 代表的是 columnsHeader 在 columnsHeaderFilename(columns_header.bin) 中的偏移量
columnsHeaderSize uint64 // 代表的是 columnsHeader 在 columnsHeaderFilename(columns_header.bin) 中的大小
}
type timestampsHeader struct {
blockOffset uint64 // 代表的是 block 在 timestampsFilename(timestamps.bin) 中的偏移量
blockSize uint64 // 代表的是 block 在 timestampsFilename(timestamps.bin) 中的大小
minTimestamp int64 // 代表的是 block 中存储的日志条目的最小时间戳
maxTimestamp int64 // 代表的是 block 中存储的日志条目的最大时间戳
marshalType encoding.MarshalType // 代表的是 block 中存储的日志条目的时间戳信息的编码类型
}
经过上面的层层过滤 (按照 时间范围 + streamID 列表),确定了哪些分区的哪些 part,以及 part 中需要进一步筛选的 Block(blockHeader) 列表。这些还需要查找过滤的 Block(blockHeader) 会被打包成 blockSearchWorkBatch 任务,发送到 workCh 中, 让 blockSearch 执行具体的查询。
我们也能得出这样的结论:VictoriaLogs 在存储时,在每一层都为时间 和 streamID 过滤在索引中做了设计,进而在查询时带上适当时间范围 和 streamFilter 可以极大的缩小查询范围,从而提高查询效率。
Block 匹配 #
blockSearch 是在 Storage.searchParallel 启动的工作协程中运行,依次从 workCh 中获取 blockSearchWorkBatch,然后进行进行查询。
代码片段
// lib/logstorage/storage_search.go
func (bs *blockSearch) search(qs *QueryStats, bsw *blockSearchWork, bm *bitmap) {
bs.reset()
bs.qs = qs
bs.bsw = bsw
// bitmap 用来存储 block 中命中的 log entry 的‘索引’
bm.init(int(bsw.bh.rowsCount))
// 将所有的位标记为 1(初始状态,代表所有行都需要检测)
bm.setBits()
// filter 是一个接口,用来实现 log entries 的过滤
// 不仅有 = (filterExact) 这种简单的过滤,还支持逻辑表达: 且(filterAnd) 这样可以组织成一个复杂的逻辑 filter
bs.bsw.pso.filter.applyToBlockSearch(bs, bm)
// 如果没有命中任何 log entry,直接返回
if bm.isZero() {
return
}
// 将 block 中的
bs.br.mustInit(bs, bm)
// 获取需要的列,通过 “ | fields level, streamID, timestamp, message” 这种方式来指定
bs.br.initColumns(bsw.pso.fieldsFilter)
}
type filter interface {
String() string // 返回 filter 的字符串表示
updateNeededFields(pf *prefixfilter.Filter)
matchRow(fields []Field) bool
// 即根据 filter 过滤出 bs 中命中的 log entry 的‘索引’,并更新到 bm 中 (标记或者取消标记)
applyToBlockSearch(bs *blockSearch, bm *bitmap)
// 即根据 filter 过滤出 br 中命中的 log entry 的‘索引’,并更新到 bm 中(标记或者取消标记)
applyToBlockResult(br *blockResult, bm *bitmap)
}
这里再进一步深入就是各种 filter 的实现了,因此不再展开,这里只关注下 Block 内是怎么对 列式存储结构 进行检索的?
这里以 filterExact 为例,来展示下 Block 是怎么对 列式存储结构 进行检索比较的。
举例:filterExact 匹配
// filterExact 匹配指定字段的精确值
//
// Example LogsQL: `fieldName:exact("foo bar")` of `fieldName:="foo bar"`
type filterExact struct {
fieldName string // 代表的是需要匹配的字段名
value string // 代表的是需要匹配的字段值
tokens []string
tokensHashes []uint64
}
// lib/logstorage/filter_exact.go
func (fe *filterExact) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
fieldName := fe.fieldName
value := fe.value
// 先判断是否是 const 列 (所有行这一列的值都是相同的)。
// 如果是,可以直接比较 value 是否相等,且只需要比较一次就能确定是否命中
v := bs.getConstColumnValue(fieldName)
if v != "" {
if value != v {
bm.resetBits()
}
return
}
// 如果不是 const 列,检查列是否存在,并获取到 column header:
// 1. `column_names.bin` 提供 column_name 和 column_id 的映射
// 2. `columns_header_index.bin` 提供 column_id 到 column header 的偏移量映射
// 3. `column_idxs.bin` 提供 column_name 到 bloom/values shardId 的映射
ch := bs.getColumnHeader(fieldName)
if ch == nil {
// 列不存在的情况下,如果查询值不是空,那么需要重置 bitmap
if value != "" {
bm.resetBits()
}
return
}
tokens := fe.getTokensHashes()
switch ch.valueType {
case valueTypeString:
matchStringByExactValue(bs, ch, bm, value, tokens)
case valueTypeDict:
matchValuesDictByExactValue(bs, ch, bm, value)
case valueTypeUint8:
matchUint8ByExactValue(bs, ch, bm, value, tokens)
// ... 省略其他数值类型
case valueTypeIPv4:
matchIPv4ByExactValue(bs, ch, bm, value, tokens)
case valueTypeTimestampISO8601:
matchTimestampISO8601ByExactValue(bs, ch, bm, value, tokens)
default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
}
}
// columnHeader 代表列的元数据信息, 它一定只对应一个 Block 中的一列
// 注意如果 name 为空,那么它代表的 message(_msg) 列
type columnHeader struct {
name string
valueType valueType // 列中存储的值的类型
minValue uint64 // 列中存储的最小值, 用于快速判断是否在给定的范围内。适用于 uint*, ipv4, timestamp 和 float64 类型
maxValue uint64 // 列中存储的最大值, 用于快速判断是否在给定的范围内。适用于 uint*, ipv4, timestamp 和 float64 类型
valuesDict valuesDict // 列中存储的唯一值字典, 适用于 valueType = valueTypeDict 类型
valuesOffset uint64 // 列中存储的 values 的偏移量, 用于快速定位到 values.bin 中的对应位置
valuesSize uint64 // 列中存储的 values 的大小, 用于快速定位到 values.bin 中的对应位置
bloomFilterOffset uint64 // 列中存储的 bloom filter 的偏移量, 用于快速定位到 bloom.bin 中的对应位置
bloomFilterSize uint64 // 列中存储的 bloom filter 的大小, 用于快速定位到 bloom.bin 中的对应位置
}
到这里也只是完成了查询 column header 的过程,接下来需要根据 column header 判断具体命中了哪些行:
values 匹配
// lib/logstore/filter_exact.go
func matchStringByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []uint64) {
// 先判断 bloom filter 是否命中, 如果 bloom filter 不命中, 那么直接返回
if !matchBloomFilterAllTokens(bs, ch, tokens) {
bm.resetBits()
return
}
visitValues(bs, ch, bm, func(v string) bool {
return v == value
})
}
// lib/logstore/filter_phrase.go
func visitValues(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string) bool) {
if bm.isZero() {
// Fast path - nothing to visit
return
}
// 根据 列头 来获取 values
// 1. 列名 确定 再 values.bin 的哪一个分片
// 2. 再根据 columnHeader 存储的 valuesOffset 和 valuesSize 来读取 values
values := bs.getValuesForColumn(ch)
bm.forEachSetBit(func(idx int) bool {
return f(values[idx])
})
}
到这里整个查询过程也差不多完成了,剩下的部分就是根据命中的行索引来获取需要返回的 fields 的值了。这里限于篇幅,就不展开了。
小结:
总结一下查询过程:
- 根据查询条件,确定需要查询的分区和 part;
- 并发查询每个分区的每个 part,查询时根据 indexBlockHeader 过滤出可能命中的 block;
- 对每个命中的 block:
- 先检查匹配的字段是不是常量列,如果是,那么判断一次即可;
- 如果是普通列,那么先获取列头,检查列是否存在,如果不存在,那么也可以直接返回;
- 如果列存在,也不直接加载 values,而是先判断 bloom filter 是否未命中,如果未命中,那么代表该 block 中一定不存在该值,直接返回;
- 最后遍历这一列所有的 values,这里也不会遍历所有的 values,而是只遍历 bitmap 中设置为 1 的位置(这些位置可能是前置 filter 已经标记为命中的位置);
存储模型 #
自己本地启动一个 Victoria Logs,再写入数据,可以直观的看到存储引擎的目录结构如下:
代码片段
/storage/
├── partitions/
│ ├── 20251210/
│ │ ├── indexdb
│ │ │ ├── 18804EBAD6A6ECA1
│ │ │ │ ├── index.bin
│ │ │ │ ├── items.bin
│ │ │ │ ├── lens.bin
│ │ │ │ ├── metadata.json
│ │ │ │ └── metaindex.bin
│ │ │ ├── 18804EBAD6A6ECB9
│ │ │ ├── ...
│ │ │ ├── 18804EBAD6A6ECB8
│ │ │ └── parts.json
│ │ │
│ │ └── datadb/
│ │ ├── 18804EBAD6A80650
│ │ │ ├── bloom.bin0
│ │ │ ├── bloom.bin1
│ │ │ ├── bloom.bin2
│ │ │ ├── bloom.bin3
│ │ │ ├── bloom.bin4
│ │ │ ├── column_idxs.bin
│ │ │ ├── column_names.bin
│ │ │ ├── columns_header.bin
│ │ │ ├── columns_header_index.bin
│ │ │ ├── index.bin
│ │ │ ├── message_bloom.bin
│ │ │ ├── message_values.bin
│ │ │ ├── metadata.json
│ │ │ ├── metaindex.bin
│ │ │ ├── timestamps.bin
│ │ │ ├── values.bin0
│ │ │ ├── values.bin1
│ │ │ ├── values.bin2
│ │ │ ├── values.bin3
│ │ │ └── values.bin4
│ │ │
│ │ ├── 18804EBAD6A80FF6
│ │ ├── ...
│ │ ├── 18804EBAD6A81004
│ │ └── parts.json
│ │
│ │── 20251211/
│ │── ...
│ └── 20251213/
│
└── flock.lock
其实从上面的文件目录结构可以看出来,VictoriaLogs 存储上分成了三层:
Storage这是存储引擎的最顶层,所有数据都存储在这个目录下。Partition这是存储引擎的第二层,每个 Partition 对应一个日期,这个时间范围内的数据按照 Part 进行存储。Part这是存储引擎的第三层,每个 Part 代表的是合并压缩后的数据,其中存储了具体的数据和索引信息。Block可以认为是第四层,每个 Block 代表的是同一一个 Stream 的数据(物理上相邻)。多个 Block 组成了一个 Part。
每个 Partition 都包含了 indexdb 和 datadb:
- indexdb 保存
stream的索引信息,包括:标签、tenantID、streamID 之间的映射关系。 - datadb 日志数据,每个
stream会有一个唯一的 ID,同一个 stream 的数据在物理上相近保存;datadb 存储的数据是列格式的,分为 message, timestamp,columns 三种类型分开存储;使用了 bloom 过滤器来快速判断是否存在某个字段值。
对应的存储模型如下图所示:
总结 #
通过深入 VictoriaLogs 的写入和查询过程,可以发现它的存储引擎做了针对 日志场景 做了很多优化来提高查询效率和节省存储开销:
- 按照日期设计分区,如果查询特定时间范围内,可以只查询涉及到的分区;
- 分区按照 part 进行划分,part 上设计了 最小 streamID 和 覆盖的时间的范围,用于帮助过滤;
- 分区内提供了
indexBlockHeader(代表多个 blockHeader)和blockHeader进一步帮助过滤,减小扫描范围; - Block 中将列分成了 const column 和 普通 column,const column 存储时只需要存储一个值,比较时也只需要比较一次就能确定 block 是否命中;
- 将同一个 Stream 数据相邻存储;
- 普通列的值存储在一起,读取时顺序读取,速度更快;
- 使用 bitmap 在 filter 中传递,避免重复检测已经确定未命中的行;
- 使用 bloom filter 来快速判断未命中,减少不必要的读取 values;
- 查询时分区 和 part 都是并发查询的;
- 采用 ZSTD 压缩存储;
除了设计上的优化,还有很多 golang 实现上的优化:
- 随处可见的 sync.Pool 用来复用对象,避免频繁分配和释放内存;
- slice 同样也被复用;
- 充分利用 goroutine 并发查询,提高查询效率;
水平有限,如有错误,欢迎勘误指正 🙏