ClickHouse基础、实践、调优全视角解析(part4-典型案例-推荐系统实时指标)

发布于 4 个月前 作者 ifengkou 940 次浏览 来自 社区优选

640.jpeg

业务需求

随着 ClickHouse 支持的业务范围扩大,我们也决定支持一些实时的业务,第一个典型案例是推荐系统的实时数据指标:在字节跳动内部 AB 实验 应用非常广泛,特别用来验证推荐算法和功能优化的效果。

最初,公司内部专门的 AB 实验平台已经提供了 T+1 的离线实验指标,而推荐系统的算法工程师们希望能更快地观察算法模型、或者某个功能的上线效果,因此需要一份能够实时反馈的数据作为补充。他们大致有如下需求:

  1. 研发同学有 debug 的需求,他们不仅需要看聚合指标,某些时间还需要查询明细数据。

  2. 推荐系统产生的数据,维度和指标多达几百列,而且未来可能还会增加。

  3. 每一条数据都命中了若干个实验,使用 Array 存储,需要高效地按实验 ID 过滤数据。

  4. 需要支持一些机器学习和统计相关的指标计算(比如 AUC)。

640-2.jpeg

当时公司也有维护其他的分析型引擎,比如 Druid 和 ES。ES 不适合大批量数据的查询,Druid 则不满足明细数据查询的需求。而 ClickHouse 则刚好适合这个场景。

  1. 对于明细数据这个需求:ClickHouse > Druid。
  2. 对于维度、指标多的问题,可能经常变动,我们可以用 Map 列的功能,很方便支持动态变更的维度和指标。
  3. 按实验 ID 过滤的需求,则可以用 Bloom filter 索引。
  4. AUC 之前则已经实现过。

这些需求我们当时刚好都能满足。

640-3.jpeg

方案设计和比较

  • 常规方案:比较常规的思路,是用 Flink 消费 Kafka,然后通过 JDBC 写入 ClickHouse。
  • 优点: 各个组件职责划分清楚、潜在扩展性强
  • 缺点: 需要额外资源、写入频次不好控制、难以处理节点故障、维护成本较高
  • 关键是后面两点:由于缺少事务的支持,实时导入数据时难以处理节点故障;ClickHouse 组技术栈以 C++为主,维护 Flink 潜在的成本比较高。

Kafka Engine 方案

640-4.jpeg

第二个方案,则是使用 ClickHouse 内置的 Kafka Engine。我们可以在 ClickHouse 服务内部建一张引擎类型为 Kafka 的表,该表会内置一个消费线程,它会直接请求 Kafka 服务,直接将 Kafka partition 的数据拉过来,然后解析并完成数据构建。对于一个 ClickHouse 集群而言,可以在每个节点上都建一张 Kafka 表,在每个节点内部启动一个消费者,这些消费者会分配到若干个 Kafka Partition,然后将数据直接消费到对应。 这样的架构相对于使用了 Flink 的方案来说更简单一些,由于少了一次数据传输,整体而言开销会相对小一些,对我们来说也算是补齐了 ClickHouse 的一部分功能(比如 Druid 也支持直接消费 Kafka topic)缺点就是未来可扩展性会更差一些,也略微增加了引擎维护负担。 640-5.jpeg

Kafka engine 原理

这里简单介绍一下如何使用 kafka 引擎,为了能让 ClickHouse 消费 Kafka 数据,我们需要三张表:首先需要一张存数据的表也就是 MergeTree;然后需要一张 Kafka 表,它负责描述 Topic、消费数据和解析数据;最后需要一个物化视图去把两张表关联起来,它也描述了数据的流向,某些时候我们可以里面内置一个 SELECT 语句去完成一些 ETL 的工作。只有当三张表凑齐的时候我们才会真正启动一个消费任务。

640-6.jpeg

这是一个简单的例子:最后呈现的效果,就是通过表和 SQL 的形式,描述了一个 kafka -> ClickHouse 的任务。

640-7.jpeg

最终效果

由于外部写入并不可控、技术栈上的原因,我们最终采用了 Kafka Engine 的方案,也就是 ClickHouse 内置消费者去消费 Kafka。整体的架构如图:

1.数据由推荐系统直接产生,写入 Kafka。这里推荐系统做了相应配合,修改 Kafka Topic 的**格式适配 ClickHouse 表的 schema。 2.敏捷 BI 平台也适配了一下实时的场景,可以支持交互式的查询分析。 3.如果实时数据有问题,也可以从 Hive 把数据导入至 ClickHouse 中,不过这种情况不多。除此之外,业务方还会将 1%抽样的离线数据导入过来做一些简单验证,1%抽样的数据一般会保存更久的时间。

640-8.jpeg

我们在支持推荐系统的实时数据时遇到过不少问题,其中最大的问题随着推荐系统产生的数据量越来越大,单个节点的消费能力也要求越来越大:

改进一:异步构建索引

第一做的改进是将辅助索引的构建异步化了:在社区实现中,构建一个 Part 分为三步:

(1)解析输入数据生成内存中数据结构的 Block;

(2)然后切分 Block,并按照表的 schema 构建 columns 数据文件;

(3) 最后扫描根据 skip index schema 去构建 skip index 文件。三个步骤完成之后才会算 Part 文件构建完毕。

目前字节内部的 ClickHouse 并没有使用社区版本的 skip index,不过也有类似的辅助索引(e.g. Bloom Filter Index, Bitmap Index)。构建 part 的前两步和社区一致,我们构建完 columns 数据之后用户即可正常查询,不过此时的 part 不能启用索引。此时,再将刚构建好数据的 part 放入到一个异步索引构建队列中,由后台线程构建索引文件。这个改进虽然整体的性能开销没有变化,但是由于隐藏了索引构建的时间开销,整体的写入吞吐量大概能提升 20%

640-9.jpeg

改进二:支持多线程消费

第二个改进是在 Kafka 表内部支持了多线程的消费:

目前实现的 Kafka 表,内部默认只会有一个消费者,这样会比较浪费资源并且性能达不到性能要求。一开始我们可以通过增大消费者的个数来增大消费能力,但社区的实现一开始是由一个线程去管理多个的消费者,多个的消费者各自解析输入数据并生成的 Input Stream 之后,会由一个 Union Stream 将多个 Input Stream 组合起来。这里的 Union Stream 会有潜在的性能瓶颈,多个消费者消费到的数据最后仅能由一个输出线程完成数据构建,所以这里没能完全利用上多线程和磁盘的潜力。

640-10.jpeg

一开始的解决方法,是建了多张 Kafka Table 和 Materialized View 写入同一张表,这样就有点近似于多个 INSERT Query 写入了同一个 MergeTree 表。当然这样运维起来会比较麻烦,最后我们决定通过改造 Kafka Engine 在其内部支持多个消费线程,简单来说就是每一个线程它持有一个消费者,然后每一个消费者负责各自的数据解析、数据写入,这样的话就相当于一张表内部同时执行多个的 INSERT Query,最后的性能也接近于线性的提升。

640-11.jpeg

改进三:增强容错处理

对于一个配置了主备节点的集群,我们一般来说只会写入一个主备其中一个节点。

为什么呢?因为一旦节点故障,会带来一系列不好处理的问题。(1)首先当出现故障节点的时候,一般会替换一个新的节点上来,新替换的节点为了恢复数据,同步会占用非常大的网络和磁盘 IO,这种情况,如果原来主备有两个消费者就剩一个,此时消费性能会下降很大(超过一倍),这对于我们来说是不太能接受的。(2)早先 ClickHouse Kafka engine 对 Kafka partition 的动态分配支持不算好,很有可能触发重复消费,同时也无法支持数据分片。因此我们默认使用静态分配,而静态分配不太方便主备节点同时消费。(3)最重要的一点,ClickHouse 通过分布式表查询 ReplicatedMergeTree 时,会基于 log delay 来计算 Query 到底要路由到哪个节点。一旦在主备同时摄入数据的情况下替换了某个节点,往往会导致查询结果不准。

640-12.jpeg 这里简单解释一下查询不准的场景。一开始我们有两副本,Replica #1 某时刻出现故障,于是替换了一个新的节点上来,新节点会开始同步数据,白框部分是已经同步过的,虚线黄框是正在恢复的数据,新写入的白色框部分就是新写入的数据。如果此时两个机器的数据同步压力比较大或查询压力比较大,就会出现 Replica #1 新写入的数据没有及时同步到 Replica #2 ,也就是这个绿框部分,大量历史数据也没有及时同步到对应的黄框部分,这个情况下两个副本都是缺少数据的。因此无论是查 Replica #1 还是 Replica #2 得到的数据都是不准的。 640-13.jpeg

对于替换节点导致查询不准问题,我们先尝试解决只有一个节点消费的问题。为了避免两个节点消费这个数据,改进版的 Kafka engine 参考了 ReplicatedMergeTree 基于 ZooKeeper 的选主逻辑。对于每一对副本的一对消费者,(如上图 A1 A2),它们会尝试在 ZooKeeper 上完成选主逻辑,只有选举称为主节点的消费者才能消费,另一个节点则会处于一个待机状态。一旦 Replica #1 宕机,(如上图 B1 B2 ),B1 已经宕机连不上 ZooKeeper 了,那 B2 会执行选主逻辑拿到 Leader 的角色,从而接替 B1 去消费数据。

640-14.jpeg

当有了前面的单节点消费机制,就可以解决查询的问题了。假设 Replica #1 是一个刚换上来的节点,它需要同步黄框部分的数据,这时候消费者会与 ReplicatedMergeTree 做一个联动,它会检测其对应的 ReplicatedMergeTree 表数据是否完整,如果数据不完整则代表不能正常服务,此时消费者会主动出让 Leader,让副本节点上的消费者也就是 Replica #2 上的 C2 去消费数据。

也就是说,我们新写入的数据并不会写入到缺少数据的节点,对于查询而言,由于查询路由机制的原因也不会把 Query 路由到缺少数据的节点上,所以一直能查询到最新的数据。这个机制设计其实和分布式表的查询写入是类似的,但由于分布表性能和稳定原因不好在线上使用,所以我们用这个方式解决了数据完整性的问题。

小结一下上面说的主备只有一个节点消费的问题

配置两副本情况下的 Kafka engine,主备仅有一个节点消费,另一个节点待机。

  • 如果有故障节点,则自动切换到正常节点消费;
  • 如果有新替换的节点无法正常服务,也切换到另一个节点;
  • 如果不同机房,则由离 Kafka 更近的节点消费,减少带宽消耗;
  • 否则,由类似 ReplicatedMergeTree 的 ZooKeeper Leader 决定。
回到顶部