Skip to main content

Arbalest 为 Amazon Redshift 编排批量数据加载

项目描述

https://travis-ci.org/Dwolla/arbalest.svg?branch=master https://readthedocs.org/projects/arbalest/badge/?version=latest

Arbalest是一个用于Amazon S3Amazon Redshift的 Python 数据管道编排库。它负责在 AWS 中大规模查询数据的繁重工作。

它负责:

  • 将数据提取到 Amazon Redshift

  • 模式创建和验证

  • 创建高度可用和可扩展的数据导入策略

  • 生成和上传先决条件工件以进行导入

  • 运行数据导入作业

  • 使用 SQL 编排幂等和容错多步 ETL 管道

为什么是Arbalest?

  • 轻量级库优于可与现有数据工具组合的重量级框架

  • Python 是数据科学事实上的 通用

  • 配置为代码

  • 例如,电池包括用于摄取时间序列或稀疏数据的策略 ( arbalest.pipeline ),或与现有管道拓扑的集成 ( arbalest.contrib )

用例

Arbalest 不是 MapReduce 框架,而是旨在使 Amazon Redshift(及其所有优势)易于与典型的数据工作流和工具一起使用。这里有一些例子:

  • 您已经在使用MapReduce 框架来处理 S3 中的数据。Arbalest 可以使用Redshift 中的 SQL使Elastic MapReduce作业的结果可查询。然后,您可以交给 Arbalest 以在普通的旧 SQL 中定义额外的 ETL。

  • 您将 S3 视为一个捕获所有数据的接收器,可能保存来自Kafka或 RabbitMQ等消息系统的 JSON 消息或事件。Arbalest 可以使用 Redshift 将部分或全部这些数据公开到数据仓库中。SQL 生态系统现在可用于仪表板、报告、即席分析。

  • 您拥有可以从快速、可查询的 SQL 数据接收器中受益的复杂管道。Arbalest 支持开箱即用 ( arbalest.contrib ) 与Luigi等工具集成,成为多依赖、多步骤管道拓扑的一部分。

入门

使用pip很容易上手:

点安装 arbalest

Arbalest 管道的示例在examples/中。下面是概念和类的概述。

笔记

Arbalest 依赖于 psycopg2。但是,在 Windows 上安装 psycopg2 可能并不简单。

在 Windows 上安装 psycopg2:

64位Python安装:

pip install -e git+https://github.com/nwcell/psycopg2-windows.git@win64-py27#egg=psycopg2

32位Python安装:

pip install -e git+https://github.com/nwcell/psycopg2-windows.git@win32-py27#egg=psycopg2

管道

Arbalest 使用管道协调数据加载。每个流水线 可以有一个或多个由三个部分组成的步骤:

metadata:S3 存储桶中的路径,用于存储复制过程所需的信息。

source:要从中复制数据的 S3 存储桶中的路径,由 JSON 对象文件组成:

{“id”:“66bc8153-d6d9-4351-bada-803330f22db7”, “someNumber”:1}

schema:定义要映射到 Redshift 行的 JSON 对象。

模式

使用由一个或多个属性声明组成的JsonObject映射器定义模式。默认情况下,JSON 属性的名称用作列,但可以设置为自定义列名称。列名的 最大长度为 127 个字符。超过 127 个字符的列名将被截断。嵌套属性将创建一个由下划线分隔的默认列名。

示例 JSON 对象(为清楚起见留白):

{
  "id": "66bc8153-d6d9-4351-bada-803330f22db7",
  "someNumber": 1,
  "child" : {
    "someBoolean": true
  }
}

示例架构:

JsonObject('destination_table_name',
    Property('id', 'VARCHAR(36)'),
    Property('someNumber', 'INTEGER', 'custom_column_name'),
    Property('child', Property('someBoolean', 'BOOLEAN')))

复制策略

S3CopyPipeline支持将数据从 S3 复制到 Redshift的不同策略。

批量复制

批量复制使用临时表将 S3 路径中的所有键导入 Redshift 表。通过删除并重新导入所有数据,消除了重复。这种类型的副本对于不经常更改或只会被摄取一次的数据很有用(例如,不可变的时间序列)。

清单副本

清单副本使用清单将 S3 路径中的所有密钥导入 Redshift 表。此外,成功导入对象的日志会保存在元数据路径中。此复制步骤的后续运行将仅复制日志中不存在的 S3 密钥。这种类型的副本对于经常更改的路径中的数据很有用。

示例数据副本:

#!/usr/bin/env python
import psycopg2
from arbalest.configuration import env
from arbalest.redshift import S3CopyPipeline
from arbalest.redshift.schema import JsonObject, Property

if __name__ == '__main__':
    pipeline = S3CopyPipeline(
        aws_access_key_id=env('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=env('AWS_SECRET_ACCESS_KEY'),
        bucket=env('BUCKET_NAME'),
        db_connection=psycopg2.connect(env('REDSHIFT_CONNECTION')))

    pipeline.bulk_copy(metadata='path_to_save_pipeline_metadata',
                       source='path_of_source_data',
                       schema=JsonObject('destination_table_name',
                                         Property('id', 'VARCHAR(36)'),
                                         Property('someNumber', 'INTEGER',
                                                  'custom_column_name')))

    pipeline.manifest_copy(metadata='path_to_save_pipeline_metadata',
                           source='path_of_incremental_source_data',
                           schema=JsonObject('incremental_destination_table_name',
                                             Property('id', 'VARCHAR(36)'),
                                             Property('someNumber', 'INTEGER',
                                                      'custom_column_name')))

    pipeline.run()

SQL

管道也可以有任意的 SQL 步骤。每个 SQL 步骤可以有一个或多个在事务中执行的语句,例如,编排额外的 ETL(提取、转换和加载)。扩展上一个示例:

#!/usr/bin/env python
import psycopg2
from arbalest.configuration import env
from arbalest.redshift import S3CopyPipeline
from arbalest.redshift.schema import JsonObject, Property

if __name__ == '__main__':
    pipeline = S3CopyPipeline(
        aws_access_key_id=env('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=env('AWS_SECRET_ACCESS_KEY'),
        bucket=env('BUCKET_NAME'),
        db_connection=psycopg2.connect(env('REDSHIFT_CONNECTION')))

    pipeline.bulk_copy(metadata='path_to_save_pipeline_metadata',
                       source='path_of_source_data',
                       schema=JsonObject('destination_table_name',
                                         Property('id', 'VARCHAR(36)'),
                                         Property('someNumber', 'INTEGER',
                                                  'custom_column_name')))

    pipeline.manifest_copy(metadata='path_to_save_pipeline_metadata',
                           source='path_of_incremental_source_data',
                           schema=JsonObject('incremental_destination_table_name',
                                             Property('id', 'VARCHAR(36)'),
                                             Property('someNumber', 'INTEGER',
                                                      'custom_column_name')))

    pipeline.sql(('SELECT someNumber + %s '
                  'INTO some_olap_table FROM destination_table_name', 1),
                 ('SELECT * INTO destination_table_name_copy '
                  'FROM destination_table_name'))

    pipeline.run()

编排助手

该项目中包括各种编排助手,以协助创建管道。这些类在arbalest.pipelinearbalest.contrib模块中定义。

排序的数据源

假设源数据存储在一系列可排序的目录中,S3SortedDataSources 有助于在给定开始和/或结束的情况下按顺序检索 S3 路径以进行导入。此外,它还具有在 S3 持久日志中标记游标的方法。

存储为排序序列的数据示例

连续整数:

s3://bucket/child/1/*
s3://bucket/child/2/*
s3://bucket/child/3/*

时间序列:

s3://bucket/child/2015-01-01/*
s3://bucket/child/2015-01-02/*
s3://bucket/child/2015-01-03/*
s3://bucket/child/2015-01-04/00/*

排序数据源类示例

S3SortedDataSources(
            metadata='',
            source='child',
            bucket=bucket,
            start=env('START'),
            end=env('END'))

时间序列

SqlTimeSeriesImport将数据从S3SortedDataSources的时间序列源列表中批量复制和更新到现有目标表中。

从 S3 时间序列拓扑导入示例时间序列,摄取一天的对象

时间序列路径拓扑:

s3://bucket/child/2015-01-01/*
s3://bucket/child/2015-01-02/*
ExamplePipeline(S3CopyPipeline):
    def __init__(self,
             aws_access_key_id,
             aws_secret_access_key,
             bucket,
             db_connection):
        super(ExamplePipeline, self).__init__(
            aws_access_key_id,
            aws_secret_access_key,
            bucket,
            db_connection)

        # Create table to ingest data into if it does not exist
        self.sql('CREATE target_table IF NOT EXISTS target_table(id VARCHAR(36), someNumber INTEGER, timestamp TIMESTAMP);')

        time_series = SqlTimeSeriesImport(
            destination_table='target_table',
            update_date='2015-01-01', # Replace existing events, if any, after this timestamp
            sources=S3SortedDataSources(
                        metadata='',
                        source='child',
                        bucket=bucket,
                        start='2015-01-01',
                        end='2015-01-02'),
            Property('id', 'VARCHAR(36)'),
            Property('someNumber', 'INTEGER'),
            Property('timestamp', 'TIMESTAMP'))

        # Populate target_table using a bulk copy per day
        time_series.bulk_copy(
            pipeline=self,
            metadata='',
            max_error=1000, # Maximum errors tolerated by Redshift COPY
            order_by_column='timestamp') # Use column named timestamp to sort by and replace existing events, if any

路易吉

PipelineTask将任何arbalest.core.Pipeline包装到Luigi Task中。这允许使用依赖图组合工作流,例如,依赖于多个步骤或其他管道的数据管道。然后,Luigi 负责调度和执行 多步管道的繁重工作 。

执照

Arbalest 在MIT 许可证下获得许可。

作者和贡献者

Arbalest 是在 Dwolla 建造的,主要由Fredrick Galoso 建造Hayden Goldstien对 Luigi 的初步支持和对编排助手的贡献。我们很高兴地欢迎贡献和反馈。如果您使用的是 Arbalest,我们很想知道。

下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

arbalest-1.6.2.tar.gz (13.0 kB 查看哈希

已上传 source