Skip to main content

使用企业数据模型自动配置行业 Lakehouse

项目描述

水熊

缓步动物可以在较温和的环境中找到,例如湖泊、池塘和草地,通常生活在“湖屋”附近。尽管这些物种非常可爱,但它们也几乎坚不可摧,可以在外太空等恶劣环境中生存。该项目赋予最小的功能齐全的数据处理单元以最高程度的弹性和治理标准。我们对我们的缓步动物进行了编码,以承担执行企业数据模型的重任,从而为行业监管的数据湖库带来生机。


DBR 编解码器 PyPI 版本

企业数据模型

给定企业数据模型,我们会自动将实体转换为其等效的 spark 模式,提取元数据,将表期望导出为 SQL 表达式,并提供数据管道以加速生产工作流的开发。这样的基础允许金融服务机构以弹性数据管道和最小的开发开销引导他们 的金融服务 Lakehouse 。

JSON 模式

遵循严格的行业数据标准,我们的项目支持以 JSON Schema表示的数据模型,并且旨在确保与最近的开源计划完全兼容,例如用于监管报告的FIRE数据模型。collateral在下面的示例中,我们从实体访问 spark 模式和 delta 期望。

from waterbear.convertor import JsonSchemaConvertor
schema, constraints = JsonSchemaConvertor('fire/model').convert("collateral")

执行

尽管记录可能经常“看起来”是结构化的(例如读取 JSON 文件或明确定义的 CSV),但强制实施模式不仅仅是一种好习惯;在企业设置中,它保证仍然需要任何丢失的字段,丢弃意外的字段并完全评估数据类型(例如,日期应被视为日期对象而不是字符串)。我们检索处理给定实体所需的火花模式,我们可以通过结构化流和自动加载器批量或实时应用。在下面的示例中,我们对一批 CSV 记录强制执行模式,从而生成一个模式化的数据框。

derivative_df = (
    spark
        .read
        .format('csv')  # standard spark formats
        .schema(schema) # enforcing our data model
        .load('csv_files')
)

应用模式是一回事,执行其约束是另一回事。给定实体的模式定义,我们可以检测一个字段是否是必需的。给定一个枚举对象,我们确保其值的一致性(例如国家代码)。除了源自模式本身的技术约束之外,模型还使用例如minimummaximummaxItems等来报告业务期望。所有这些技术和业务约束都将从我们的模型中以编程方式检索并解释为一系列 SQL 表达式。

{
  "[`high_fives`] VALUE": "`high_fives` IS NULL OR `high_fives` BETWEEN 1.0 AND 300.0",
  "[`id`] NULLABLE": "`id` IS NOT NULL",
  "[`person`.`username`] MATCH": "`person`.`username` IS NULL OR `person`.`username` RLIKE '^[a-z0-9]{2,}$'",
  "[`person`] NULLABLE": "`person` IS NOT NULL",
  "[`role`] VALUE": "`role` IS NULL OR `role` IN ('SA','CSE','SSA','RSA')",
  "[`skills`] SIZE": "`skills` IS NULL OR SIZE(`skills`) >= 1.0"
}

尽管可以通过简单的用户定义函数来应用这些期望,但我们强烈建议使用Delta Live Tables来确保财务数据管道的可靠性和及时性。

Delta 实时表

我们的第一步是使用 Spark 自动加载器检索登陆到我们行业 Lakehouse 的文件。在连续模式下,新闻文件将在展开max_files时一次处理。在触发模式下,仅处理自上次运行以来的新文件。使用 Delta Live Tables,我们确保 delta 增量的执行和处理,防止组织不得不维护复杂的检查点机制。

@dlt.create_table()
def bronze():
    return (
        spark
            .readStream
            .format('csv')   # we read standard sources
            .schema(schema)  # and enforce schema
            .convert('/path/to/data/files')
    )

我们的管道将根据我们的模式化数据集评估我们的一系列 SQL 规则,通过模式标记违反我们任何期望的记录,expect_all并实时报告数据质量。

@dlt.create_table()
@dlt.expect_all(constraints) # we enforce expectations
def silver():
  return dlt.read_stream("bronze")

测试

对于集成测试,我们还为用户提供了生成与给定模式匹配并符合基本期望的记录的能力(不支持模式匹配)。

from waterbear.generator import JsonRecordGenerator
xs = JsonRecordGenerator('fire/model').generate("collateral", 5)
for x in xs:
    print(x)
{"id": 6867, "person": {"first_name": "vqgjldqqorklmupxibsrdyjw", "last_name": "vtsnbjuscbkvxyfdxrb", "birth_date": "2001-07-21"}, "skills": ["R"]}
{"id": 3119, "person": {"first_name": "vp", "last_name": "dgipl", "birth_date": "1972-03-23"}, "high_fives": 71, "skills": ["SCALA"]}
{"id": 4182, "person": {"first_name": "ijlzxxpv", "last_name": "ldpnnkohf", "birth_date": "1982-11-10"}, "joined_date": "2018-06-29", "skills": ["R"]}
{"id": 4940, "person": {"first_name": "lhklebpkcxp", "last_name": "jir", "birth_date": "1998-01-06"}, "high_fives": 213, "skills": ["SQL"], "role": "RSA"}
{"id": 5920, "person": {"first_name": "njadmuflxqbzc", "last_name": "arggdbaynulumrchreblfvxfe", "birth_date": "1997-06-26", "username": "snuafihfatyf"}, "high_fives": 105, "skills": ["PYTHON"], "role": "SA"}

建立一个轮子文件

python setup.py bdist_wheel --universal

使用项目

pip install dbl-waterbear

项目支持

请注意,/databrickslabs github 帐户中的所有项目仅供您探索,Databricks 未正式支持服务水平协议 (SLA)。它们按原样提供,我们不作任何形式的保证。请不要提交与使用这些项目引起的任何问题有关的支持票。

通过使用此项目发现的任何问题都应在 Repo 上作为 GitHub 问题提交。他们将在时间允许的情况下进行审查,但没有正式的 SLA 提供支持。

作者

antoine.amend@databricks.com

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

内置分布

dbl_waterbear-0.1.1-py3-none-any.whl (19.7 kB 查看哈希

已上传 py3