您有自己的代码存储库吗?那数据呢?你如何收集它们?是的,我们知道,这并不容易。当数据来自多个来源,具有不同的格式,并且以不同的频率到达时,您需要专门的工具来完成特定的任务。有很多工具,但在这篇博文中,我们将告诉你我们在AWS上构建的生产解决方案。

我们的目标是:从多个来源收集数据,并将其用于向在我们客户平台上观看视频的用户展示有针对性的广告。

什么是数据湖?

基本信息

数据陪伴着我们的每一步。当我们打开手机、笔记本电脑或购买欧芹时,有关这些活动的信息就被保存在某个地方。

根据著名统计学家、FiveThirtyEight网站主编内特·西尔弗(Nate Silver)的说法,我们收集的数据量相当于美国国会图书馆(Library of Congress)的全部纸质馆藏每秒三次

数据之所以被称为21世纪的石油,是因为它可以用来提取巨大的价值。

然而,由于数据量很大,问题就出现了。那么我们能用它做什么呢?我们首先要把它整理好。

想象一下,一家公司有许多不同的部门,每个部门都有不同的系统。这导致大量的数据被收集并以各种格式存储,例如:

  • 平面文件- CSV,日志,XML, JSON。
  • 非结构化数据——电子邮件、文档、pdf。
  • 二进制数据-图片,声音,视频。
  • 结构化——数据库。

每个部门和系统都必须彼此共享数据,而且它们都有自己共享数据的方式。

此外,一些数据集可能已经处理过,而另一些数据集仍然是原始的。简直是一团糟。这就是数据湖解决存储和共享数据这一令人沮丧的挑战的地方。

数据湖为您提供以下功能:

  • 数据操作:轮询和处理。
  • 安全:授权人员的访问控制。
  • 分析:提供不需要数据传输而运行分析的可能性。
  • 恒定的数据流量:处理大量增加的文件不是问题。
  • 编目和索引:它通过编目和索引提供易于理解的内容。

这个解决方案的巨大灵活性在于我们使用了读时模式(即我们只在读时对模式建模),而使用了写时模式的数据仓库(即我们在写时决定结构)。

自20世纪80年代以来,数据仓库一直被广泛视为从大量数据集合中提取知识的主要手段。从一开始,他们也有一个很大的劣势,那就是成本。

有了数据湖,我们可以以较低的成本存储大量数据,然后只在需要使它们适应预期结构时才对它们进行转换。

不同的湖边小贩

Azure

数据存储在Azure Blob Storage中,具有分级命名空间。数据分析和处理使用基于HDFS (Hadoop分布式文件系统)的工具进行,例如:

  • Azure数据工厂。
  • Azure HDInsight。
  • Azure砖。
  • Azure Synapse Analytics。
  • Azure数据浏览器。
  • BI。

Azure Synapse本身也提供了以列格式构建关系数据库的可能性。Azure数据湖与Azure Blob Storage密切相关,因此Azure数据湖的成本与Blob Storage相同。用于数据分析和处理的其他工具单独收费。

Qubole
Qubole非常强调开源,并且不依赖于供应商(即没有供应商锁定)。正因为如此,Qubole提供的解决方案是独立于云的,可以转移到任何云服务提供商。

集成UI适用于项目中的不同角色(例如,数据科学和数据工程师)。该解决方案可在AWS、谷歌Cloud、Microsoft Azure和Oracle Cloud等平台上使用。

它还与各种外部服务进行了大量集成。优德88备用网数据以Parquet和ORC等格式存储在选定的存储空间(云存储或本地存储)中。

可以使用Apache Spark等技术处理数据,而与存储数据相关的元数据可以存储在数据目录中,如Apache Hive。可以通过使用Apache Ranger和Apache Sentry等技术来管理数据权限和数据安全性。

在这种解决方案中,价格基于每小时Qubole计算单元的数量,该数量根据用于处理数据的实例的大小而变化。这些实例的计算成本以及网络和数据存储必须单独支付给选定的云服务提供商。

智能数据湖(Informatica)

智能数据湖基于Apache Hive、Apache HDFS、Apache Hadoop等技术。它可以与AWS、Microsoft Azure、谷歌云平台和Snowflake等供应商提供的各种其他数据存储解决方案相结合。它提供了一个易于使用的界面,只需要编写很少的代码。有关费用的信息可以通过联系他们获得。

有限公司

Infor Data Lake提供了它自己的元数据目录和该目录的内部管理,包括创建显示各个数据源或集之间依赖关系的元图。

Infor还提供可靠的监控和与其他服务的集成,以确保数据湖的无故障运行。优德88备用网Infor提供的示例实现基于AWS,但它只使用EC2实例,并在这些实例上构建整个Infor解决方案。有关费用的信息可以通过与他们联系获得。

三角洲湖(Databricks)

这里我们有Apache Spark(分布式计算平台)的创建者,他除了创建自己的解决方案外,还将其与一个名为Lakehouse的数据仓库相结合。

他们吹嘘他们的Delta Lake符合ACID原则,并提供数据版本、修改、验证和小文件压缩。

Presto为分布式SQL查询Delta Lake表的能力在不断发展,只对分区数据表有问题。此解决方案的成本取决于您正在使用的云平台(有三种可用的云平台:AWS、Azure和谷歌云平台)。

您按DBU (Databricks单位)付费,即代表Databricks平台上处理能力消耗的单位,当然也包括云资源。w88优德中文

AWS湖的形成

AWS数据湖是AWS上几个可用服务的组合,允许您使用AWS湖泊形成服务管理数据。优德88备用网S3服务用于数据存储,Amazon Glue是数据编目和处理的推荐解决方案(使用Apache Hive和Apache Spark技术)。Amazon Athena服务(基于Presto的分布式SQL查询)被用作数据的接口。

与EMR数据处理服务的集成目前处于测试阶段,但已经可以使用了。Lake Formation本身的使用是免费的,而使用其他亚马逊网络服务则需要付费。优德88备用网

AWS湖的形成和其他- AWS数据湖的配方

过去;过去到2019年8月),对表、数据库和目录使用单独的IAM权限来管理对Glue元数据编目中的表的访问,这些权限被授予给角色和用户。LakeFormation的设计目的是集中管理这种访问,但是可以在AWS帐户级别禁用LakeFormation,并恢复到管理访问的旧方法。

下面是用户访问该表所需的权限示例。

在旧的方法中,角色或用户必须对表具有适当的权限,并访问表所描述的数据源(例如S3桶)。使用这种新方法,数据源被注册为数据湖资源,其中一个内部Lake Formation服务角色附加到数据源,并对其具有适当的权限。

将特定角色或用户的权限授予表或数据库的方式非常类似于在SQL数据库中授予权限(例如SELECT、INSERT、DROP TABLE)。

当使用表时,例如在数据处理过程中,如果给定的用户或角色对表有适当的特权,则通过内部Lake Formation角色给予对数据源(例如S3桶)的访问权。因此,处理给定表的人员不能直接访问S3上的数据,而只能使用Glue表作为处理数据的接口。

此外,可以为单个用户或角色分配不同的权限,以及向其他用户和角色授予权限的权限。在这种情况下,访问管理变得方便和集中。

还可以只向单个表列授予权限,这在处理敏感数据时非常重要。AWS还有几个预定义的模板用于从关系数据库中检索数据。

Lake Formation,作为一个相对较新的产品,不幸的是也有它的缺点。

与EMR服务的集成仍处于测试阶段,所以如果您更喜欢使用一个干净的Spark,并通过在EC2上使用更便宜的点而不是Glue来降低成本,那么您必须考虑在基础设施中创建一些工作区的需求——主要是授权对S3桶的直接访问。

如果EMR要被数据科学团队或数据科学使用,作为与Lake Formation的集成,EMR仍然是一个很好的选择。如果用户正在使用SAML身份验证,那么它可以正常工作。

这种解决方案的另一个缺点是表本身的管理缺乏自动化。

与Databricks不同,AWS上的Glue表没有实现ACID事务,也没有自动将小文件合并为大文件,这提高了表查询的效率。

然而,AWS已经开发了一种数据湖解决方案的方法(查看预览)在这里),这些表提到了ACID事务的引入和小文件的自动合并。

我们需要解决的问题是什么?

我们的客户是一家向亚洲市场提供视频流媒体服务的公司。优德88备用网当我们开始这个项目时,它“只有”1.5亿活跃用户,而现在已经有3亿了。

他们每个人都产生了很多关于他们偏好的信息。

所谓偏好,我们指的是喜欢的电影类型、演员等。这些信息还不足以用于定向广告,因此还值得分析设备类型、性别和年龄等数据。这可以用来创建用户配置文件,并向用户显示他们可能感兴趣的广告。

客户要求我们建立一个广告技术栈,让他们成为一个有围墙的花园,通过它他们收集客户数据并将其货币化。我们的客户建议在这个系统中需要一个数据湖。

这里我们只讨论这个系统的一小部分,该系统旨在从各种来源收集数据,这是创建用户配置文件及其分类的基础。

我们面临的挑战

有问题的数据源

数据湖从六个不同的来源收集数据。其中一些来自网站,另一些来自外部公司。扩展分类的可能性是一种非常常见的做法。例如,可以通过cookie或对类似特征的分析来组合这些数据。

数据湖通过以下方式收集数据:

  • 直接从我们客户的数据库下载数据。
  • 由客户将数据推入我们的S3桶。
  • 从客户的S3桶接收数据。

对于这个特定的项目,没有实时处理的文件。数据下载的频率是由客户端和外部来源预先强加给我们的。在最坏的情况下,从我们收到数据到我们将数据发送到广告服务器可能已经过去了3天。在此期间,我们还将下载文件,处理它们,并创建概要文件和受众。

我们将收集到的文件分为两个阶段:

  • 铜:直接来自来源的原始数据(不变)。
  • 银:清理、压平嵌套结构和转换。

事实证明AWS对到DynamoDB和MongoDB的连接的支持很差,因此我们决定将数据发送到我们的S3桶会更容易。

我们定期收到这些文件,因此我们有了原始数据。我们确信它们是我们的,没有人会改变它们。这种情况与我们应该从客户端的S3桶中下载的文件完全不同。

起初,我们决定将这些数据归类为“我们的青铜阶段”数据,后来我们发现这是一个错误。

原始文件作为下一个阶段的源文件,因此我们直接在它们上运行ETL脚本。我们没有预料到这些文件的回填,即用新数据补充旧文件。

更新了与7天前事件相关的现有文件。这是通过删除旧文件并用同名的新文件替换它们来实现的。

我们的工作流程是每隔几个小时进行一次,所以我们根本没有使用几天前更新的数据。使用这些更改是有问题的,因为我们必须知道已经处理并上传到其他服务的文件中究竟发生了什么更改。优德88备用网

然而,除了数据丢失之外,我们在转换过程中还遇到了错误的问题。

当Pyspark试图转换不再存在的文件时抛出了一个异常。我们通过将这些文件复制到我们的帐户来解决这个问题。这不是我们在这些文件中遇到的唯一问题。

这些文件包含大量垃圾数据,添加元数据将它们视为创建了约2,000列的表。列中包含以下类型:

  • 会话
  • session_1
  • session_1_2
  • session_string
  • session_string_1

这些列中的每一列都可能具有概要文件所需的会话ID。我们需要进行深入的分析,以查看概要文件是否有任何必需的信息。

系统各个部分的良好逻辑分离

在我们的数据湖开发过程中,我们使用了AWS CDK——AWS推荐的一个库——它支持使用代码定义基础设施。目前支持的语言有JavaScript、TypeScript、Python、Java和c#,这些语言都可以构建AWS CloudFormation模板。

最初,我们在一个CloudFormation堆栈中定义了所有w88优德中文AWS资源,但随着项目的发展,需要对代码进行逻辑划分,以提高其可读性并促进开发。我们决定将CloudFormation堆栈拆分为几个更小的,具有精确定义的功能:

  • 包含角色和用户的堆栈—对于所有创建用户和IAM角色的部署都是通用的,供外部服务器或外部用户使用。
  • 主堆栈-它创建内部角色使用的服务,如Glue, LakeFormation, EMR或EC2。优德88备用网S3桶、VPC、SNS通知。还创建了Amazon Athena工作组和Glue目录中的主数据库。
  • 与单个数据源相关的栈——这里是为单个数据源创建的Glue表,以及整个Glue数据流(工作流),包含相关的触发器、作业和爬虫;处理数据并使其对外部服务可用所需的一切。优德88备用网
  • 与监控相关的堆栈——这里在CloudWatch服务中创建了一个仪表板,它显示了对数据湖重要的选定指标。此外,这个堆栈还定义了CloudWatch事件规则,如果一个Job或爬虫以错误结束其操作,我们会得到通知。
  • 一个与MWAA变量相关的堆栈——在项目的某些时候,需要使用MWAA (Apache风流的管理工作流)服务,但在我们部署数据湖的同一地区,MWAA是不可用的。该堆栈设计用于在Systems Manager服务中创建参数,这些参数可以将部署在数据湖中的资源的变量传递到运行MWAA服务的另一个区域。w88优德中文
  • 一个与未使用的特性相关的堆栈——这个堆栈需要将一些更改部署到现有的堆栈中,关于它的更多细节将在下面描述。

在一个AWS CDK应用程序中创建多个堆栈并将资源从一个堆栈转移到另一个堆栈相对容易。w88优德中文不幸的是,它并非没有问题。

这些堆栈是相互依赖的,因此它们的部署必须以正确的顺序进行。w88优德中文由于AWS CDK自动创建CloudFormation导出和导入,资源可以在堆栈之间传输。

如果堆栈是第一次部署的,那么它不会引起任何问题。但是,如果堆栈已经存在,则部署将导致更新它们。

如果是这种情况,那么AWS CDK可能会尝试删除已经导入到不同堆栈中的导出。不幸的是,这会导致整个部署失败。

如图所示,堆栈B依赖于堆栈A,因此在部署过程中,首先更新堆栈A,然后更新堆栈B。

如果堆栈B不再需要来自堆栈a的资源(例如S3桶),AWS CDK将从堆栈a的CloudFormation模板中删除桶导出,并从堆栈B的模板中删除导入桶。

但是,在部署过程中,堆栈A将首先更新,从其中删除导出将失败,因为堆栈B仍然需要更新。

在这种情况下,部署甚至不会到达更新堆栈B的阶段!

要解决这个问题,您可以删除堆栈B,然后堆栈A将正确更新,然后堆栈B将被重新创建。然而,在生产中,我们可能无法删除已经包含数据的堆栈(即堆栈B)。

为了解决这个难题,我们引入了堆栈C(它是一个与代码中未使用的函数相关的堆栈)。

栈C没有任何重要的功能,可以在任何时候安全地删除,然后从头开始重新创建。因此,如果堆栈B不再需要来自堆栈A的资源,那么我们只需在第一步中进行更改,从堆栈B中删除资源的导入,并将其添加到堆栈C中。

此更改的部署将非常顺利,因为导出不会从堆栈A中删除,而导入将从堆栈B中消失。

在第二步中,我们进行更改,从堆栈C中删除资源导入。

这还将导致从堆栈A中删除资源导出,因为它没有导入到任何地方。然而,在这种情况下,在部署此更改之前,我们可以安全地删除堆栈c。毕竟,这是它的作用。

然后,导出将被安全地从堆栈A中删除,从堆栈B的导入已经在上一步中删除,并且将在没有资源导入的情况下重新创建辅助堆栈C。

对于我们来说,在通过资源块部署时遇到类似的问题是非常罕见的,在整个数据湖工作期间,这种情况只发生过几次,但值得为此做好准备。

蓝图仅来自AWS控制台

在我们的方法中,我们希望所有的东西都由我们的代码部署,但是蓝图存在一个问题,AWS提供蓝图的目的是从外部数据库下载数据。这项服务使工作变得容易得多,因为它不需要使用最流行的引擎的驱动程序。

但是,AWS CDK没有为该服务提供任何构造。当我们开始实现这个功能时,我们已经为Glue准备好了几个数据流。

我们决定使用它们并扩展它们,这样我们就可以使用PySpark脚本,这些脚本是由AWS提供给公众的。通过将它们作为工作源代码上传并创建我们自己的抽象,我们能够从代码中创建“我们自己的”蓝图。

AWS大力宣传Glue服务是一个简单的ETL脚本解决方案,使用Spark的DataFrame - DynamicFrame抽象。来自Glue的DynamicFrame不需要预定义的模式,而是根据它读取的数据自动创建它。

这无疑使处理低结构数据变得更容易,但在幕后发生了很多事情。在重写为Glue编写的脚本到PySpark之后,您可能会发现您需要手动添加DynamicFrame自动完成的事情,甚至在我们不知道的情况下。

但是,如果脚本可以完全基于DynamicFrame提供的方法,我们为什么要在这里提到PySpark呢?

事实证明,在转换大量数据时,只使用DynamicFrame是非常低效的。

在使用Glue的过程中,我们碰巧重写了从Glue原生转换到纯SparkSQL的转换,这大大加快了转换时间,减少了Spark执行程序的内存消耗。

您不必立即放弃DynamicFrame Glue提供的可能性,但如果您正在考虑优化etl,则有必要深入到Spark DataFrame级别。

从DynamicFrame,你可以在任何时候跳转到DataFrame和回到DynamicFrame,同时使用DataFrame的方法来缓存中间结果,稍后在脚本的几个地方使用,或者在Spark的执行程序处理的分区之间均匀分配数据。

然后我们使用这两个世界的好处,感谢这一点,你可以大大提高你的etl和降低他们的成本,这是非常重要的,当缺乏便宜的地点在Glue作业。

但是,使用Glue有一个很大的缺点:将数据保存到分区的Glue表是有问题的。

数据被正确地写入S3,但是,分区没有添加到Glue表的Hive metastore中,这有效地使数据在表中不可见。

的表在写入数据时添加分区glueparquet分类,但显著增加了Job Glue所消耗的资源。w88优德中文

在一种情况下,由于一个作业的主要性能问题,我们决定(根据AWS支持人员的建议)简单地向数据流添加一个爬虫,这将向Glue表添加新分区。我希望AWS在某个时候能解决这个相当麻烦的问题。

或者,您可以采用一种不同的策略,只使用Glue作为描述元数据的地方,也就是说,使用Glue数据目录作为一个metastore。在使用数据湖时,我们必须将其中一个数据流移动到EMR服务,这允许我们使用低成本的现场实例。

将EMR与Glue目录结合起来相对来说没有什么麻烦,但是这里有一些值得提及的重要细节,在将Glue与EMR配对时可能会出现一些问题。首先是Spark的IAM角色需要被授予database_creatorAWS Lake Formation的特权。

Spark创建一个global_temp如果没有适当的Lake Formation权限,它会导致Spark的作业很快崩溃,导致权限相关的错误。

第二个问题是从Glue目录获取数据库列表。

即使我们只是想将默认数据库设置为我们将要使用的数据库,Spark也会首先尝试下载可用数据库列表和“默认”数据库,不管它是否有权访问它。如果拒绝访问,作业将再次抛出错误。

我们遇到的最后一个问题是需要将S3位置添加到Glue数据库中。

这是一个可选参数,但是,如果它为空,则写入Glue表将失败。

理论上,Glue表中设置的参数应该用作S3位置。如果数据库有S3位置,则会忽略它,并使用表的S3位置进行写入。但是,如果基本S3位置为空,则会得到错误。

雅典娜问题-分区和大量的小文件

得益于Athena,我们可以访问S3中的文件,并且可以像查询关系数据库一样使用SQL进行查询。在使用Athena时,您需要为扫描的数据量付费,但这可以通过分区进行优化。

从用户级别来看,这看起来像是要轮询的附加列,但在它下面是使用文件夹的文件分区。在使用日期时,分区非常有用——在UI中我们可以看到年、月、日列。

目录结构如下:

年= 2020

  • 月= 04
    • 一天= 1
  • 月= 05

年= 2021

通过这种方法,Athena扫描更少的文件来获得结果,成本更低,速度也更快。

我们在使用这个工具时遇到的一个问题是组成脚本的小文件。

在转换过程中,PySpark将转换分解为并行过程,从而创建许多独立的文件。这些文件不应该太小(大约> 128 MB),因为每个文件增加了额外的时间:

  • 打开它
  • 清单
  • 元数据检索
  • 加载头文件和压缩字典

文件也不能太大,因为这样它们就不能并行地快速读取。

数据湖开发的GitOps方法

理论上,每个开发人员都应该尽可能频繁地推进他们的更改。如果他们出了什么事,我们肯定会有其他员工接替他们的工作。

然而,接受和检查某人的工作并不是那么简单。

我们怎么知道他们的代码是否有效?它可以毫无问题地发布到生产中吗?代码经过测试了吗?

当然,这可以手动完成,但现代系统有CI / CD。

这些脚本自动运行,由推入存储库的代码触发,这些代码:

  • 开始测试。
  • 部署应用程序。

如果任何一个步骤失败了,我们知道我们需要从修复它开始。

有了我们的系统,我们可以从代码构建基础设施,所以没有什么可以阻止我们从每个分支构建基础设施。

每个开发人员都可以创建自己的数据湖,并在上面测试他们的更改。

不幸的是,许多程序员没有清理他们的堆栈,这意味着我们达到了AWS帐户上的资源限制。我们通过创建脚本解决了这一问题,这每晚都会消耗掉所有开发人员的资源。w88优德中文

测试

我们的系统中有3种测试:

  • 第一个检查CloudFormation模板。
  • 第二个是核实合同。
  • 第三个检查在每个数据流结束时是否进行数据验证。

通常,在使用单元测试时,我们检查给定函数的结果,看它是否与预期结果一致。

在这种情况下不可能进行这样的测试,因为整个代码的模板是一次性生成的。然而,这并不妨碍我们编写测试。

通过验证模板中的特定元素,我们能够测试:

  • 分配给角色的权限列表。
  • 传递给作业的参数。
  • 业务配置。

第二个测试检查契约,其中唯一的任务是维护由其他团队访问的表的适当模式。

在创建表之前,我们将这些测试以json的形式与主要使用它们的团队共享。我们这样做是为了确保他们需要的所有东西都以一种能让他们的工作更容易的形式存在。

最后一类测试验证处理的数据/表。它们基于检查之前为特定数据创建的需求,理论上应该由我们的PySpark脚本来满足。

我们检查了以下内容:

  • 行的唯一性。
  • 列是否只包含期望的值(例如,男性,女性,未定义)。
  • 预期的日期格式。
  • 给定列中是否没有空值。

这对谁来说是一个好的解决方案?

如果需要从不同来源以不同格式和不同频率下载数据,那么数据湖是正确的解决方案。

当我们希望将数据存储在自己的服务器上时,以TB为单位的文件数量的增加可能会导致问题。云解决方案解决了这个问题,并提供了安全性。已经有一些公司向云服务提供商提供解决方案和集成。

选择非常广泛,但不是每个人都能从代码创建整个基础设施。

通过单击控制台手动创建的体系结构当然可以工作,但深入研究其所有配置可能是不可能的。数据湖并不能解决与大数据相关的所有问题。这个概念是针对特定问题的,如果你不小心使用它,你可能会遇到更多这样的问题。

标记下

搜索

分享本文