Arbalest 为 Amazon Redshift 编排批量数据加载
项目描述
Arbalest是一个用于Amazon S3 和Amazon Redshift的 Python 数据管道编排库。它负责在 AWS 中大规模查询数据的繁重工作。
它负责:
将数据提取到 Amazon Redshift
模式创建和验证
创建高度可用和可扩展的数据导入策略
生成和上传先决条件工件以进行导入
运行数据导入作业
使用 SQL 编排幂等和容错多步 ETL 管道
为什么是Arbalest?
轻量级库优于可与现有数据工具组合的重量级框架
配置为代码
例如,电池包括用于摄取时间序列或稀疏数据的策略 ( arbalest.pipeline ),或与现有管道拓扑的集成 ( arbalest.contrib )
用例
Arbalest 不是 MapReduce 框架,而是旨在使 Amazon Redshift(及其所有优势)易于与典型的数据工作流和工具一起使用。这里有一些例子:
您已经在使用MapReduce 框架来处理 S3 中的数据。Arbalest 可以使用Redshift 中的 SQL使Elastic MapReduce作业的结果可查询。然后,您可以交给 Arbalest 以在普通的旧 SQL 中定义额外的 ETL。
您将 S3 视为一个捕获所有数据的接收器,可能保存来自Kafka或 RabbitMQ等消息系统的 JSON 消息或事件。Arbalest 可以使用 Redshift 将部分或全部这些数据公开到数据仓库中。SQL 生态系统现在可用于仪表板、报告、即席分析。
您拥有可以从快速、可查询的 SQL 数据接收器中受益的复杂管道。Arbalest 支持开箱即用 ( arbalest.contrib ) 与Luigi等工具集成,成为多依赖、多步骤管道拓扑的一部分。
入门
使用pip很容易上手:
点安装 arbalest
Arbalest 管道的示例在examples/中。下面是概念和类的概述。
笔记
Arbalest 依赖于 psycopg2。但是,在 Windows 上安装 psycopg2 可能并不简单。
在 Windows 上安装 psycopg2:
64位Python安装:
pip install -e git+https://github.com/nwcell/psycopg2-windows.git@win64-py27#egg=psycopg2
32位Python安装:
pip install -e git+https://github.com/nwcell/psycopg2-windows.git@win32-py27#egg=psycopg2
管道
Arbalest 使用管道协调数据加载。每个流水线 可以有一个或多个由三个部分组成的步骤:
metadata:S3 存储桶中的路径,用于存储复制过程所需的信息。
source:要从中复制数据的 S3 存储桶中的路径,由 JSON 对象文件组成:
{“id”:“66bc8153-d6d9-4351-bada-803330f22db7”, “someNumber”:1}
schema:定义要映射到 Redshift 行的 JSON 对象。
模式
使用由一个或多个属性声明组成的JsonObject映射器定义模式。默认情况下,JSON 属性的名称用作列,但可以设置为自定义列名称。列名的 最大长度为 127 个字符。超过 127 个字符的列名将被截断。嵌套属性将创建一个由下划线分隔的默认列名。
示例 JSON 对象(为清楚起见留白):
{ "id": "66bc8153-d6d9-4351-bada-803330f22db7", "someNumber": 1, "child" : { "someBoolean": true } }
示例架构:
JsonObject('destination_table_name',
Property('id', 'VARCHAR(36)'),
Property('someNumber', 'INTEGER', 'custom_column_name'),
Property('child', Property('someBoolean', 'BOOLEAN')))
复制策略
S3CopyPipeline支持将数据从 S3 复制到 Redshift的不同策略。
批量复制
批量复制使用临时表将 S3 路径中的所有键导入 Redshift 表。通过删除并重新导入所有数据,消除了重复。这种类型的副本对于不经常更改或只会被摄取一次的数据很有用(例如,不可变的时间序列)。
清单副本
清单副本使用清单将 S3 路径中的所有密钥导入 Redshift 表。此外,成功导入对象的日志会保存在元数据路径中。此复制步骤的后续运行将仅复制日志中不存在的 S3 密钥。这种类型的副本对于经常更改的路径中的数据很有用。
示例数据副本:
#!/usr/bin/env python
import psycopg2
from arbalest.configuration import env
from arbalest.redshift import S3CopyPipeline
from arbalest.redshift.schema import JsonObject, Property
if __name__ == '__main__':
pipeline = S3CopyPipeline(
aws_access_key_id=env('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=env('AWS_SECRET_ACCESS_KEY'),
bucket=env('BUCKET_NAME'),
db_connection=psycopg2.connect(env('REDSHIFT_CONNECTION')))
pipeline.bulk_copy(metadata='path_to_save_pipeline_metadata',
source='path_of_source_data',
schema=JsonObject('destination_table_name',
Property('id', 'VARCHAR(36)'),
Property('someNumber', 'INTEGER',
'custom_column_name')))
pipeline.manifest_copy(metadata='path_to_save_pipeline_metadata',
source='path_of_incremental_source_data',
schema=JsonObject('incremental_destination_table_name',
Property('id', 'VARCHAR(36)'),
Property('someNumber', 'INTEGER',
'custom_column_name')))
pipeline.run()
SQL
管道也可以有任意的 SQL 步骤。每个 SQL 步骤可以有一个或多个在事务中执行的语句,例如,编排额外的 ETL(提取、转换和加载)。扩展上一个示例:
#!/usr/bin/env python
import psycopg2
from arbalest.configuration import env
from arbalest.redshift import S3CopyPipeline
from arbalest.redshift.schema import JsonObject, Property
if __name__ == '__main__':
pipeline = S3CopyPipeline(
aws_access_key_id=env('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=env('AWS_SECRET_ACCESS_KEY'),
bucket=env('BUCKET_NAME'),
db_connection=psycopg2.connect(env('REDSHIFT_CONNECTION')))
pipeline.bulk_copy(metadata='path_to_save_pipeline_metadata',
source='path_of_source_data',
schema=JsonObject('destination_table_name',
Property('id', 'VARCHAR(36)'),
Property('someNumber', 'INTEGER',
'custom_column_name')))
pipeline.manifest_copy(metadata='path_to_save_pipeline_metadata',
source='path_of_incremental_source_data',
schema=JsonObject('incremental_destination_table_name',
Property('id', 'VARCHAR(36)'),
Property('someNumber', 'INTEGER',
'custom_column_name')))
pipeline.sql(('SELECT someNumber + %s '
'INTO some_olap_table FROM destination_table_name', 1),
('SELECT * INTO destination_table_name_copy '
'FROM destination_table_name'))
pipeline.run()
编排助手
该项目中包括各种编排助手,以协助创建管道。这些类在arbalest.pipeline和arbalest.contrib模块中定义。
排序的数据源
假设源数据存储在一系列可排序的目录中,S3SortedDataSources 有助于在给定开始和/或结束的情况下按顺序检索 S3 路径以进行导入。此外,它还具有在 S3 持久日志中标记游标的方法。
存储为排序序列的数据示例
连续整数:
s3://bucket/child/1/* s3://bucket/child/2/* s3://bucket/child/3/*
时间序列:
s3://bucket/child/2015-01-01/* s3://bucket/child/2015-01-02/* s3://bucket/child/2015-01-03/* s3://bucket/child/2015-01-04/00/*
排序数据源类示例
S3SortedDataSources(
metadata='',
source='child',
bucket=bucket,
start=env('START'),
end=env('END'))
时间序列
SqlTimeSeriesImport将数据从S3SortedDataSources的时间序列源列表中批量复制和更新到现有目标表中。
从 S3 时间序列拓扑导入示例时间序列,摄取一天的对象
时间序列路径拓扑:
s3://bucket/child/2015-01-01/* s3://bucket/child/2015-01-02/*
ExamplePipeline(S3CopyPipeline):
def __init__(self,
aws_access_key_id,
aws_secret_access_key,
bucket,
db_connection):
super(ExamplePipeline, self).__init__(
aws_access_key_id,
aws_secret_access_key,
bucket,
db_connection)
# Create table to ingest data into if it does not exist
self.sql('CREATE target_table IF NOT EXISTS target_table(id VARCHAR(36), someNumber INTEGER, timestamp TIMESTAMP);')
time_series = SqlTimeSeriesImport(
destination_table='target_table',
update_date='2015-01-01', # Replace existing events, if any, after this timestamp
sources=S3SortedDataSources(
metadata='',
source='child',
bucket=bucket,
start='2015-01-01',
end='2015-01-02'),
Property('id', 'VARCHAR(36)'),
Property('someNumber', 'INTEGER'),
Property('timestamp', 'TIMESTAMP'))
# Populate target_table using a bulk copy per day
time_series.bulk_copy(
pipeline=self,
metadata='',
max_error=1000, # Maximum errors tolerated by Redshift COPY
order_by_column='timestamp') # Use column named timestamp to sort by and replace existing events, if any
路易吉
PipelineTask将任何arbalest.core.Pipeline包装到Luigi Task中。这允许使用依赖图组合工作流,例如,依赖于多个步骤或其他管道的数据管道。然后,Luigi 负责调度和执行 多步管道的繁重工作 。
执照
Arbalest 在MIT 许可证下获得许可。