Skip to main content

用于连接到 AWS S3 和 Redshift 的便捷包装器

项目描述

北方数据

作者:

尼克·布克

介绍:

Nordata 是一组用于访问 AWS S3 和 AWS Redshift 的实用程序函数。它是由 Nordstrom 分析团队的一位数据科学家编写的。Nordata 的目标是成为一个简单、强大的软件包,以简化数据工作流程。它并非旨在处理所有可能的需求(例如凭证管理主要留给用户),但它旨在简化常见任务。

目录:

安装 Nordata:

为 Nordata 设置凭据:

如何使用 Nordata:

测试:

安装 Nordata:

Nordata 可以通过 pip 安装。与往常一样,建议使用项目级虚拟环境。

Nordata 需要 Python >= 3.6。

$ pip install nordata

为 Nordata 设置凭据:

红移:

Nordata 旨在以以下格式将您的 Redshift 凭据作为环境变量提取。此方法允许用户以多种方式自由处理凭据。与往常一样,建议使用最佳实践。您的凭据不应放在项目代码中,例如文件Dockerfile.env文件中。相反,您可能希望将它们放在.bash_profile本地或利用密钥管理服务,例如 AWS 提供的服务。

'host=my_hostname dbname=my_dbname user=my_user password=my_password port=1234'

S3:

如果用户在本地运行,他们的Home目录应该包含一个.aws/带有credentials文件的目录。该credentials文件应类似于下面的示例,其中配置文件名称位于括号中。请注意,具体值和地区可能会有所不同。如果用户在 EC2 上运行,则访问 S3 的实例权限由实例的 IAM 角色处理。

[default]
aws_access_key_id=MYAWSACCESSKEY
aws_secret_access_key=MYAWSSECRETACCESS
aws_session_token="long_string_of_random_characters=="
aws_security_token="another_string_of_random_characters=="
region=us-west-2

请注意括号中的配置文件名称。如果您的凭证文件中的配置文件名称不同,您可能需要将此配置文件名称作为参数传递给 S3 函数。

如何使用 Nordata:

红移:

导入 nordata Redshift 函数:

from nordata import read_sql, redshift_execute_sql, redshift_get_conn

将 SQL 脚本作为字符串读入 Python:

sql = read_sql(sql_filename='../sql/my_script.sql')

执行不返回数据的 SQL 查询:

redshift_execute_sql(
    sql=sql,
    env_var='REDSHIFT_CREDS',
    return_data=False,
    return_dict=False)

执行一个 SQL 查询,将数据作为元组列表返回,将列名作为字符串列表返回:

data, columns = redshift_execute_sql(
    sql=sql,
    env_var='REDSHIFT_CREDS',
    return_data=True,
    return_dict=False)

执行 SQL 查询,将数据作为 dict 返回,以便轻松摄取到 pandas DataFrame:

import pandas as pd

df = pd.DataFrame(**redshift_execute_sql(
    sql=sql,
    env_var='REDSHIFT_CREDS',
    return_data=True,
    return_dict=True))

创建一个可以由有经验的用户直接操作的连接对象:

conn = redshift_get_conn(env_var='REDSHIFT_CREDS')

S3:

导入 S3 函数:

from nordata import s3_download, s3_upload, s3_delete, create_session, s3_get_bucket

从 S3 下载单个文件:

s3_download(
    bucket='my_bucket',
    s3_filepath='tmp/my_file.csv',
    local_filepath='../data/my_file.csv')

使用配置文件名称下载:

s3_download(
    bucket='my_bucket',
    profile_name='my-profile-name',
    s3_filepath='tmp/my_file.csv',
    local_filepath='../data/my_file.csv')

从 S3 下载文件列表(不会上传子目录的内容):

s3_download(
    bucket='my_bucket',
    s3_filepath=['tmp/my_file1.csv', 'tmp/my_file2.csv', 'img.png'],
    local_filepath=['../data/my_file1.csv', '../data/my_file2.csv', '../img.png'])

从 S3 下载匹配模式的文件(不会上传子目录的内容):

s3_download(
    bucket='my_bucket',
    s3_filepath='tmp/*.csv',
    local_filepath='../data/')

从 S3 下载目录中的所有文件(不会上传子目录的内容):

s3_download(
    bucket='my_bucket',
    s3_filepath='tmp/*',
    local_filepath='../data/')

将单个文件上传到 S3:

s3_upload(
    bucket='my_bucket',
    local_filepath='../data/my_file.csv',
    s3_filepath='tmp/my_file.csv')

使用个人资料名称上传:

s3_upload(
    bucket='my_bucket',
    profile_name='my-profile-name',
    local_filepath='../data/my_file.csv',
    s3_filepath='tmp/my_file.csv')

将文件列表上传到 S3(不会上传子目录的内容):

s3_upload(
    bucket='my_bucket',
    local_filepath=['../data/my_file1.csv', '../data/my_file2.csv', '../img.png'],
    s3_filepath=['tmp/my_file1.csv', 'tmp/my_file2.csv', 'img.png'])

将匹配模式的文件上传到 S3(不会上传子目录的内容):

s3_upload(
    bucket='my_bucket',
    local_filepath='../data/*.csv',
    s3_filepath='tmp/')

将目录中的所有文件上传到 S3(不会上传子目录的内容):

s3_upload(
    bucket='my_bucket',
    local_filepath='../data/*'
    s3_filepath='tmp/')

在 S3 中删除单个文件:

resp = s3_delete(bucket='my_bucket', s3_filepath='tmp/my_file.csv')

使用配置文件名称删除:

s3_upload(
    bucket='my_bucket',
    profile_name='my-profile-name',
    s3_filepath='tmp/my_file.csv')

删除 S3 中的文件列表:

resp = s3_delete(
    bucket='my_bucket',
    s3_filepath=['tmp/my_file1.csv', 'tmp/my_file2.csv', 'img.png'])

删除与 S3 中的模式匹配的文件:

resp = s3_delete(bucket='my_bucket', s3_filepath='tmp/*.csv')

删除 S3 目录中的所有文件:

resp = s3_delete(bucket='my_bucket', s3_filepath='tmp/*')

创建一个可以由有经验的用户直接操作的存储桶对象:

bucket = s3_get_bucket(
    bucket='my_bucket',
    profile_name='default',
    region_name='us-west-2')

博托3:

导入 boto3 函数:

from nordata import boto_get_creds, boto_create_session

COPY检索Boto3 凭据作为字符串用于UNLOADSQL 语句:

creds = boto_get_creds(
    profile_name='default',
    region_name='us-west-2',
    session=None)

创建一个可以由有经验的用户直接操作的 boto3 会话对象:

session = boto_create_session(profile_name='default', region_name='us-west-2')

在 Redshift 和 S3 之间传输数据:

使用UNLOAD语句将数据从 Redshift 传输到 S3(有关更多信息,请参阅Redshift UNLOAD 文档):

 nordata 导入 boto_get_creds  redshift_execute_sql


creds = boto_get_creds(
    profile_name='default',
    region_name='us-west-2',
    session=None)

sql  =  f '''

    卸载(
        'select 
            col1 
            ,col2 
        from 
            my_schema.my_table' 


        's3://mybucket/unload/my_table/'
    凭据
        ' { creds } '
    并行关闭标题gzip allowoverwrite;
'''

redshift_execute_sql(
    sql=sql,
    env_var='REDSHIFT_CREDS',
    return_data=False,
    return_dict=False)

使用COPY语句将数据从 S3 传输到 Redshift(有关更多信息,请参阅Redshift COPY 文档):

 nordata 导入 boto_get_creds  redshift_execute_sql


creds = boto_get_creds(
    profile_name='default',
    region_name='us-west-2',
    session=None)

sql  =  f '''


        's3://mybucket/unload/my_table/'    复制        my_schema.my_table ' 
    credentials         ' { creds } '     ignoreheader 1 gzip; '''






redshift_execute_sql(
    sql=sql,
    env_var='REDSHIFT_CREDS',
    return_data=False,
    return_dict=False)

测试:

对于那些有兴趣为 Nordata 做出贡献或分叉和编辑项目的人来说,pytest 是使用的测试框架。要运行测试,请创建一个虚拟环境,安装 的内容dev-requirements.txt,然后从项目的根目录运行以下命令。测试脚本可以在test/目录中找到。

$ pytest

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

nordata-0.2.3.tar.gz (13.6 kB 查看哈希

已上传 source

内置分布

nordata-0.2.3-py3-none-any.whl (35.3 kB 查看哈希

已上传 py3