Airbnb 大数据平台架构成为 Airbnb 公司提升产品决策的关键部分。其 Hive cangku.html" target="_blank">数据仓库从 2013 年中旬的 350 TB 暴增到 11 PB (2015 年末统计的数据)。随着公司的成长,数据仓库的可靠性需求日益剧增。我们寻求迁移数据仓库,但现有的迁移工具要么在大数据仓库时有问题,要么就是有很明显的操作负荷,所以 Airbnb 开发了 ReAir 解决这种状况。这篇文章将详细介绍 ReAir 是如何工作的以及它是怎样轻松的实现 PB 级数据仓库的备份。
背景
最开始 Airbnb 所有数据存放在单个 HDFS/HIve 数据仓库。单个命名空间简单、易管理,然而产品复杂后之后即席查询就影响其可靠性。因此,我们把数据仓库分成了两个:一个数据仓库是支持关键产品的任务;另一个是专为即席查询用的。分离这么大的数据仓库将面临两个问题:我们如何容易的迁移庞大的数据仓库?;分离完之后,我们又该怎么保持数据的同步?为了解决这些问题,Airbnb 开发 ReAir 项目并开源给社区。
ReAir 对基于 Hive 元数据的数据仓库进行备份非常有效,可以实现 PB 级别的集群扩展。使用 ReAir 极其简单,你只要连接 Hive 的 metastore Thrift 服务,通过 MapReduce 进行数据复制。ReAir 可以兼容 Hive 和 Hadoop 的各种版本,并支持单机模式操作——只有 Hadoop 和 Hive 是必须要有,MySQL DB 是用来进行增量复制。由于 ReAir 同时处理数据和元数据,所以,只要复制任务一完成就可以进行表和分区的查询。
在没有 ReAir 的时候,迁移 Hive 数据仓库最典型的用法是启动一个 DistCp,并且通过数据库操作手动管理元数据。这种方法既费力又容易出错,甚至会引起脏数据导致数据不一致,有时复制不断变化的数据仓库目录时也会出现问题。另外其它的迁移方法要求指定 Hive 版本,当我们需要从旧版本的 Hive 数据仓库迁移就变得异常困难。
ReAir 工具包含两种迁移工具:批量迁移和增量式迁移。批量式迁移工具允许你一次复制指定的一系列表,它适合对一个集群的迁移。相对应的,增量式迁移工具可以跟踪数据仓库的变化,并复制生成的对象或者修改的对象。增量式迁移工具适合保持集群间数据的同步,它可以实现秒级数据同步。两种方式更详细的对比请见下面部分。
批量迁移
批量式迁移一般用来备份整个数据仓库。复制的速度和吞吐量取决于 reducer 的数量和吞吐量,在 Airbnb 公司,使用 500 个 reducer 复制 2.2 PB 数据仅仅用了 24 小时。
启动批量迁移的过程非常简单:用户允许 shell 命令启动一系列 MR 任务。其执行协议是从源数据仓库复制 Hive 表和分区等实体到目的数据仓库。批量式复制过程是占有带宽的,它会探测源数据仓库和目的数据仓库,并只复制两者之间的不同文件。同时元数据也仅仅更新变化过的。这些策略可以保证 ReAir 工作的高效率。
批量式迁移也有一些挑战。最明显的一个是,产品数据仓库中实体(Hive 表和分区)的大小不同,但是迁移的延迟并不能依赖于最大的一个实体。例如,一般的 Hive 表不到 100 个分区,然后最大的表有超过 10 万个分区。为了保证正常的延迟,需要并行运行迁移工作。
为了解决负载均衡的问题,批量式迁移运行一系列 MR 任务。批量迁移中两个最昂贵的操作是文件复制和元数据更新,所以这些步骤都是在 shuffle 阶段分布式进行。每个任务都会打印运行日志数据存储在 HDFS,通过日志数据能清晰的看到任务的完成情况。
第一个 MR 任务从 HDFS 读取实体标识符并 shuffle 后均匀的发到 reducer。reducer 先验证各实体,再对要复制的 HDFS 目录和实体进行映射;第二个 MR 任务扫描第一个 MR 产生的文件目录,并在对应的目录下创建文件。对文件名称进行 hash shuffle 后发到 reducer,一旦 shuffle 之后 reducer 执行复制操作;第三个 MR 任务处理 Hive 元数据的提交逻辑。
三阶段 MR 任务计划的扩展性和负载均衡都很好:复制 1 百万实体的 2.2 PB 数据消耗大改 24 小时;同步 20 TB 数据量的更新仅花了一个小时。阶段 1 和阶段 3 的瓶颈在于 Hive 元数据 MySQL 数据库;而阶段 2 的瓶颈在于网络带宽。
对于我们的迁移,需要开发定制化的文件复制 MR 来处理 HDFS 上的数据。然而,对比 DistCp 这样通用化的工具,在测试 ReAir 过程中也发现了一些问题:
为了解决这些问题,我们开发了两个 MR 任务来处理通用 HDFS 数据复制。第一个任务采用一个启发式的、文件夹遍历的多线程进行一系列分隔。一旦有足够的文件夹,mapper 遍历这些文件夹生成文件列表来复制。文件名经过 shuffle 发送到 reducer 来决定是否有必要复制。第二个 MR 任务读取文件列表来复制,并通过一个 shuffle 进行分布式复制工作。
增量式迁移
在两个集群的情况下,我们需要在两个集群之间共享数据。例如,日志数据需要每天在生产集群聚会,但同时即席查询的用户也需要这些数据。批量式复制任务对于这种需求显得太重,这时需要两个数据仓库间可以按小时进行更新。即席查询集群的用户需要尽快同步生产集群的数据,因此有必要找到一个尽可能快的方法来更新新的内容。虽然有一些开源项目能解决这个问题,但由于 Hive 版本依赖等问题并不是很理想,后来我们开发增量式复制工具来保证即席查询集群和生产集群的数据同步。
增量式复制工具设计到实体(Hive 表和分区)的记录变化,一旦发生变化尽快复制变化量。为了记录源集群上的变化,我们使用了 Hive 的钩子机制(hook 函数)把查询成功的实体写入 MySQL 数据库。采用这种方法,我们可以跟踪生产集群的所有变化。HDFS 上的变化或者元数据的更新都能触发复制。
一旦有了数据库里的变化的记录,我们需要一种方法来复制这些变化到即席查询集群。这个执行机制是通过一个 Java 服务实现,读取变化的日志中实体,并转化成一系列的行为集合。比如,在源集群成功创建一个表,会在目标集群上翻译成“复制表”的行为。Java 进程将调用元数据,并启动 MR 任务执行。
由于源集群上的变化是序列化到日志里,它将在目的集群中以相同的顺序进行执行。但是,实际情况是,单独的复制一个表或者分区花费几秒钟或者几分钟实在太慢,后面改成多线程并行复制。为了处理并发,所有的行为基于并发限制形成 DAG。通常限制都是对于同一个表,比如,在复制分区之前要建立好表。通过并发执行,复制的延迟降低到最小。
用增量式复制可以实现快速、可靠的复制。两个数据仓库之间的同步复制可以实现容灾恢复——如果一个集群宕机,另外一个集群还可以正常提供服务。对比批量式复制,增量式复制对数据仓库体量巨大但改变的数据量比较小的情况更有效率。当前在 Airbnb,每天的数据量增长不到 2 TB,所以增量式复制比较有意义。
使用批量式迁移和增量式迁移,可以快速的迁移两个集群。我们希望这些工具对社区也一样有用。