Skip to main content

适用于 Python DB API 2.0 (PEP 249) 的 Amazon Athena JDBC 驱动程序包装器

项目描述

https://img.shields.io/pypi/pyversions/PyAthenaJDBC.svg https://github.com/laughingman7743/PyAthenaJDBC/workflows/test/badge.svg https://codecov.io/gh/laughingman7743/PyAthenaJDBC/branch/master/graph/badge.svg https://img.shields.io/pypi/l/PyAthenaJDBC.svg https://pepy.tech/badge/pyathenajdbc/月 https://img.shields.io/badge/code%20style-black-000000.svg

PyAthenaJDBC

PyAthenaJDBC 是Python DB API 2.0 (PEP 249)的Amazon Athena JDBC 驱动程序包装器。

要求

  • Python

    • CPython 3.6、3.7、3.8、3.9

  • 爪哇

    • Java >= 8 (JDBC 4.2)

JDBC 驱动程序兼容性

版本

JDBC驱动版本

小贩

< 2.0.0

== 1.1.0

AWS(早期发布的JDBC驱动,与Simba的JDBC驱动不兼容)

>= 2.0.0

>= 2.0.5

辛巴

安装

$ pip install PyAthenaJDBC

额外套餐:

包裹

安装命令

版本

熊猫

pip install PyAthenaJDBC[熊猫]

>=1.0.0

SQL炼金术

pip install PyAthenaJDBC[SQLAlchemy]

>=1.0.0, <2.0.0

用法

基本用法

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2')
try:
    with conn.cursor() as cursor:
        cursor.execute("""
        SELECT * FROM one_row
        """)
        print(cursor.description)
        print(cursor.fetchall())
finally:
    conn.close()

光标迭代

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2')
try:
    with conn.cursor() as cursor:
        cursor.execute("""
        SELECT * FROM many_rows LIMIT 10
        """)
        for row in cursor:
            print(row)
finally:
    conn.close()

带参数查询

支持的DB API paramstyle只有PyFormatPyFormat仅支持具有旧%运算符样式的命名占位符,并且参数指定字典格式。

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2')
try:
    with conn.cursor() as cursor:
        cursor.execute("""
        SELECT col_string FROM one_row_complex
        WHERE col_string = %(param)s
        """, {'param': 'a string'})
        print(cursor.fetchall())
finally:
    conn.close()

如果%字符包含在您的查询中,则必须使用%%对其进行转义,如下所示:

SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'

JVM 选项

在连接方法或连接对象中,您可以使用字符串数组指定 JVM 选项。

您可以增加 JVM 堆大小,如下所示:

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2',
               jvm_options=['-Xms1024m', '-Xmx4096m'])
try:
    with conn.cursor() as cursor:
        cursor.execute("""
        SELECT * FROM many_rows
        """)
        print(cursor.fetchall())
finally:
    conn.close()

JDBC 4.1

如果要使用 JDBC 4.1,请下载对应的 JDBC 驱动程序,并将下载的 JDBC 驱动程序的路径指定为连接方法或连接对象的参数driver_path

  • AthenaJDBC41-2.0.7.jar与 JDBC 4.1 兼容,需要 JDK 7.0 或更高版本

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2',
               driver_path='/path/to/AthenaJDBC41_2.0.7.jar')

JDBC 驱动程序配置选项

连接方法或连接对象将关键字参数作为选项传递给 JDBC 驱动程序。如果要更改 JDBC 驱动程序的行为,请将该选项指定为连接方法或连接对象中的关键字参数。

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2',
               LogPath='/path/to/pyathenajdbc/log/',
               LogLevel='6')

有关 JDBC 驱动程序选项的详细信息,请参阅官方文档。

注意:选项名称和值区分大小写。选项值被指定为字符串。

SQL炼金术

使用pip install SQLAlchemy>=1.0.0pip install PyAthenaJDBC[SQLAlchemy] 安装 SQLAlchemy。支持的 SQLAlchemy 为 1.0.0 或更高版本且小于 2.0.0。

import contextlib
from urllib.parse import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData

conn_str = 'awsathena+jdbc://{User}:{Password}@athena.{AwsRegion}.amazonaws.com:443/'\
           '{Schema}?S3OutputLocation={S3OutputLocation}'
engine = create_engine(conn_str.format(
    User=quote_plus('YOUR_ACCESS_KEY'),
    Password=quote_plus('YOUR_SECRET_ACCESS_KEY'),
    AwsRegion='us-west-2',
    Schema='default',
    S3OutputLocation=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))
try:
    with contextlib.closing(engine.connect()) as conn:
        many_rows = Table('many_rows', MetaData(bind=engine), autoload=True)
        print(select([func.count('*')], from_obj=many_rows).scalar())
finally:
    engine.dispose()

连接字符串具有以下格式:

awsathena+jdbc://{User}:{Password}@athena.{AwsRegion}.amazonaws.com:443/{Schema}?S3OutputLocation={S3OutputLocation}&driver_path={driver_path}&...

如果您不使用实例配置文件凭证或凭证配置文件指定用户(即 AWSAccessKeyID)和密码(即 AWSSecretAccessKey):

awsathena+jdbc://:@athena.{Region}.amazonaws.com:443/{Schema}?S3OutputLocation={S3OutputLocation}&driver_path={driver_path}&...

注意:S3OutputLocation需要报价。如果UserPassword和其他参数包含特殊字符,还需要引号。

熊猫

作为数据框

您可以使用pandas.read_sql将查询结果作为DataFrame 对象处理。

from pyathenajdbc import connect
import pandas as pd

conn = connect(User='YOUR_ACCESS_KEY_ID',
               Password='YOUR_SECRET_ACCESS_KEY',
               S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2',
               jvm_path='/path/to/jvm')
df = pd.read_sql("SELECT * FROM many_rows LIMIT 10", conn)

pyathena.util也有辅助方法。

import contextlib
from pyathenajdbc import connect
from pyathenajdbc.util import as_pandas

with contextlib.closing(
        connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/'
                AwsRegion='us-west-2'))) as conn:
    with conn.cursor() as cursor:
        cursor.execute("""
        SELECT * FROM many_rows
        """)
        df = as_pandas(cursor)
print(df.describe())
到 SQL

您可以使用pandas.DataFrame.to_sql将存储在 DataFrame 中的记录写入 Amazon Athena。 pandas.DataFrame.to_sql使用SQLAlchemy,因此您需要安装它。

import pandas as pd
from urllib.parse import quote_plus
from sqlalchemy import create_engine
conn_str = 'awsathena+jdbc://:@athena.{AwsRegion}.amazonaws.com:443/'\
           '{Schema}?S3OutputLocation={S3OutputLocation}&S3Location={S3Location}&compression=snappy'
engine = create_engine(conn_str.format(
    AwsRegion='us-west-2',
    Schema_name='YOUR_SCHEMA',
    S3OutputLocation=quote_plus('s3://YOUR_S3_BUCKET/path/to/'),
    S3Location=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))
df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
df.to_sql('YOUR_TABLE', engine, schema="YOUR_SCHEMA", index=False, if_exists='replace', method='multi')

Amazon S3 表的位置由连接字符串中的S3Loc​​ation参数指定。如果未指定 S3Loc​​ation,使用S3OutputLocation参数。以下规则适用。

s3://{S3Location or S3OutputLocation}/{schema}/{table}/

数据格式仅支持 Parquet。压缩格式由连接字符串中的压缩参数指定。

凭据

AWS 凭证提供商链

请参阅提供和检索 AWS 凭证

https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html

按以下顺序查找凭证的 AWS 凭证提供程序链:

  • 环境变量 - AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY(推荐,因为它们被除 .NET 之外的所有 AWS 开发工具包和 CLI 识别),或 AWS_ACCESS_KEY 和 AWS_SECRET_KEY(仅被 Java 开发工具包识别)

  • Java 系统属性 - aws.accessKeyId 和 aws.secretKey

  • 来自环境或容器的 Web 身份令牌凭证

  • 所有 AWS 开发工具包和 AWS CLI 共享的默认位置 (~/.aws/credentials) 的凭证配置文件

  • 如果设置了 AWS_CONTAINER_CREDENTIALS_RELATIVE_URI” 环境变量并且安全管理员有权访问该变量,则通过 Amazon EC2 容器服务交付的凭证,

  • 通过 Amazon EC2 元数据服务交付的实例配置文件凭证

在连接方法或连接对象中,您可以通过至少指定S3OutputLocationAwsRegion进行连接。 如果已设置环境变量、凭据文件或实例配置文件,则不需要用户密码。

from pyathenajdbc import connect

conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
               AwsRegion='us-west-2')

测试

取决于以下环境变量:

$ export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ export AWS_DEFAULT_REGION=us-west-2
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/

您需要创建一个名为test-pyathena-jdbc的工作组。

运行测试

$ pip install poetry
$ poetry install -v
$ poetry run scripts/test_data/upload_test_data.sh
$ poetry run pytest
$ poetry run scripts/test_data/delete_test_data.sh

运行测试多个 Python 版本

$ pip install poetry
$ poetry install -v
$ poetry run scripts/test_data/upload_test_data.sh
$ pyenv local 3.9.0 3.8.6 3.7.9 3.6.12
$ poetry run tox
$ poetry run scripts/test_data/delete_test_data.sh

代码格式

代码格式使用blackisort

应用格式

$ make fmt

检查格式

$ make chk

执照

除 JDBC 驱动程序外,所有 Python 代码的许可证都是MIT 许可证

JDBC 驱动程序

有关 JDBC 驱动程序的许可证,请查看以下链接。

项目详情