用于连接到 AWS S3 和 Redshift 的便捷包装器
项目描述
北方数据
作者:
尼克·布克
介绍:
Nordata 是一组用于访问 AWS S3 和 AWS Redshift 的实用程序函数。它是由 Nordstrom 分析团队的一位数据科学家编写的。Nordata 的目标是成为一个简单、强大的软件包,以简化数据工作流程。它并非旨在处理所有可能的需求(例如凭证管理主要留给用户),但它旨在简化常见任务。
目录:
安装 Nordata:
为 Nordata 设置凭据:
如何使用 Nordata:
-
红移:
- 导入 nordata Redshift 函数
- 将 SQL 脚本作为字符串读入 Python
- 执行不返回数据的 SQL 查询
- 执行返回数据的 SQL 查询
- 执行返回 pandas 数据的 SQL 查询
- 创建连接对象(有经验的用户)
S3:
- 导入 S3 函数
- 从 S3 下载单个文件
- 使用配置文件名称下载
- 从 S3 下载文件列表
- 从 S3 下载与模式匹配的文件
- 从 S3 下载目录中的所有文件
- 将单个文件上传到 S3
- 使用个人资料名称上传
- 将文件列表上传到 S3
- 将匹配模式的文件上传到 S3
- 将目录中的所有文件上传到 S3
- 在 S3 中删除单个文件
- 使用配置文件名称删除
- 删除 S3 中的文件列表
- 删除与 S3 中的模式匹配的文件
- 删除S3目录中的所有文件
- 创建存储桶对象(有经验的用户)
Boto3(有经验的用户):
在 Redshift 和 S3 之间传输数据:
测试:
安装 Nordata:
Nordata 可以通过 pip 安装。与往常一样,建议使用项目级虚拟环境。
Nordata 需要 Python >= 3.6。
$ pip install nordata
为 Nordata 设置凭据:
红移:
Nordata 旨在以以下格式将您的 Redshift 凭据作为环境变量提取。此方法允许用户以多种方式自由处理凭据。与往常一样,建议使用最佳实践。您的凭据不应放在项目代码中,例如文件Dockerfile或.env文件中。相反,您可能希望将它们放在.bash_profile本地或利用密钥管理服务,例如 AWS 提供的服务。
'host=my_hostname dbname=my_dbname user=my_user password=my_password port=1234'
S3:
如果用户在本地运行,他们的Home目录应该包含一个.aws/带有credentials文件的目录。该credentials文件应类似于下面的示例,其中配置文件名称位于括号中。请注意,具体值和地区可能会有所不同。如果用户在 EC2 上运行,则访问 S3 的实例权限由实例的 IAM 角色处理。
[default]
aws_access_key_id=MYAWSACCESSKEY
aws_secret_access_key=MYAWSSECRETACCESS
aws_session_token="long_string_of_random_characters=="
aws_security_token="another_string_of_random_characters=="
region=us-west-2
请注意括号中的配置文件名称。如果您的凭证文件中的配置文件名称不同,您可能需要将此配置文件名称作为参数传递给 S3 函数。
如何使用 Nordata:
红移:
from nordata import read_sql, redshift_execute_sql, redshift_get_conn
sql = read_sql(sql_filename='../sql/my_script.sql')
redshift_execute_sql(
sql=sql,
env_var='REDSHIFT_CREDS',
return_data=False,
return_dict=False)
执行一个 SQL 查询,将数据作为元组列表返回,将列名作为字符串列表返回:
data, columns = redshift_execute_sql(
sql=sql,
env_var='REDSHIFT_CREDS',
return_data=True,
return_dict=False)
执行 SQL 查询,将数据作为 dict 返回,以便轻松摄取到 pandas DataFrame:
import pandas as pd
df = pd.DataFrame(**redshift_execute_sql(
sql=sql,
env_var='REDSHIFT_CREDS',
return_data=True,
return_dict=True))
conn = redshift_get_conn(env_var='REDSHIFT_CREDS')
S3:
from nordata import s3_download, s3_upload, s3_delete, create_session, s3_get_bucket
s3_download(
bucket='my_bucket',
s3_filepath='tmp/my_file.csv',
local_filepath='../data/my_file.csv')
s3_download(
bucket='my_bucket',
profile_name='my-profile-name',
s3_filepath='tmp/my_file.csv',
local_filepath='../data/my_file.csv')
s3_download(
bucket='my_bucket',
s3_filepath=['tmp/my_file1.csv', 'tmp/my_file2.csv', 'img.png'],
local_filepath=['../data/my_file1.csv', '../data/my_file2.csv', '../img.png'])
s3_download(
bucket='my_bucket',
s3_filepath='tmp/*.csv',
local_filepath='../data/')
s3_download(
bucket='my_bucket',
s3_filepath='tmp/*',
local_filepath='../data/')
s3_upload(
bucket='my_bucket',
local_filepath='../data/my_file.csv',
s3_filepath='tmp/my_file.csv')
s3_upload(
bucket='my_bucket',
profile_name='my-profile-name',
local_filepath='../data/my_file.csv',
s3_filepath='tmp/my_file.csv')
s3_upload(
bucket='my_bucket',
local_filepath=['../data/my_file1.csv', '../data/my_file2.csv', '../img.png'],
s3_filepath=['tmp/my_file1.csv', 'tmp/my_file2.csv', 'img.png'])
s3_upload(
bucket='my_bucket',
local_filepath='../data/*.csv',
s3_filepath='tmp/')
s3_upload(
bucket='my_bucket',
local_filepath='../data/*'
s3_filepath='tmp/')
resp = s3_delete(bucket='my_bucket', s3_filepath='tmp/my_file.csv')
s3_upload(
bucket='my_bucket',
profile_name='my-profile-name',
s3_filepath='tmp/my_file.csv')
resp = s3_delete(
bucket='my_bucket',
s3_filepath=['tmp/my_file1.csv', 'tmp/my_file2.csv', 'img.png'])
resp = s3_delete(bucket='my_bucket', s3_filepath='tmp/*.csv')
resp = s3_delete(bucket='my_bucket', s3_filepath='tmp/*')
bucket = s3_get_bucket(
bucket='my_bucket',
profile_name='default',
region_name='us-west-2')
博托3:
from nordata import boto_get_creds, boto_create_session
COPY检索Boto3
凭据作为字符串用于UNLOADSQL 语句:
creds = boto_get_creds(
profile_name='default',
region_name='us-west-2',
session=None)
创建一个可以由有经验的用户直接操作的 boto3 会话对象:
session = boto_create_session(profile_name='default', region_name='us-west-2')
在 Redshift 和 S3 之间传输数据:
使用UNLOAD语句将数据从 Redshift 传输到 S3(有关更多信息,请参阅Redshift UNLOAD 文档):
从 nordata 导入 boto_get_creds , redshift_execute_sql
creds = boto_get_creds(
profile_name='default',
region_name='us-west-2',
session=None)
sql = f '''
卸载(
'select
col1
,col2
from
my_schema.my_table'
)
到
's3://mybucket/unload/my_table/'
凭据
' { creds } '
并行关闭标题gzip allowoverwrite;
'''
redshift_execute_sql(
sql=sql,
env_var='REDSHIFT_CREDS',
return_data=False,
return_dict=False)
使用COPY语句将数据从 S3 传输到 Redshift(有关更多信息,请参阅Redshift COPY 文档):
从 nordata 导入 boto_get_creds , redshift_execute_sql
creds = boto_get_creds(
profile_name='default',
region_name='us-west-2',
session=None)
sql = f '''
从 's3://mybucket/unload/my_table/' 复制 my_schema.my_table '
credentials ' { creds } ' ignoreheader 1 gzip; '''
redshift_execute_sql(
sql=sql,
env_var='REDSHIFT_CREDS',
return_data=False,
return_dict=False)
测试:
对于那些有兴趣为 Nordata 做出贡献或分叉和编辑项目的人来说,pytest 是使用的测试框架。要运行测试,请创建一个虚拟环境,安装 的内容dev-requirements.txt,然后从项目的根目录运行以下命令。测试脚本可以在test/目录中找到。
$ pytest
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。