适用于 Python DB API 2.0 (PEP 249) 的 Amazon Athena JDBC 驱动程序包装器
项目描述
PyAthenaJDBC
PyAthenaJDBC 是Python DB API 2.0 (PEP 249)的Amazon Athena JDBC 驱动程序包装器。
要求
Python
CPython 3.6、3.7、3.8、3.9
爪哇
Java >= 8 (JDBC 4.2)
JDBC 驱动程序兼容性
版本 |
JDBC驱动版本 |
小贩 |
---|---|---|
< 2.0.0 |
== 1.1.0 |
AWS(早期发布的JDBC驱动,与Simba的JDBC驱动不兼容) |
>= 2.0.0 |
>= 2.0.5 |
辛巴 |
安装
$ pip install PyAthenaJDBC
额外套餐:
包裹 |
安装命令 |
版本 |
---|---|---|
熊猫 |
pip install PyAthenaJDBC[熊猫] |
>=1.0.0 |
SQL炼金术 |
pip install PyAthenaJDBC[SQLAlchemy] |
>=1.0.0, <2.0.0 |
用法
基本用法
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2')
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM one_row
""")
print(cursor.description)
print(cursor.fetchall())
finally:
conn.close()
光标迭代
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2')
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM many_rows LIMIT 10
""")
for row in cursor:
print(row)
finally:
conn.close()
带参数查询
支持的DB API paramstyle只有PyFormat。 PyFormat仅支持具有旧%运算符样式的命名占位符,并且参数指定字典格式。
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2')
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s
""", {'param': 'a string'})
print(cursor.fetchall())
finally:
conn.close()
如果%字符包含在您的查询中,则必须使用%%对其进行转义,如下所示:
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'
JVM 选项
在连接方法或连接对象中,您可以使用字符串数组指定 JVM 选项。
您可以增加 JVM 堆大小,如下所示:
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2',
jvm_options=['-Xms1024m', '-Xmx4096m'])
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM many_rows
""")
print(cursor.fetchall())
finally:
conn.close()
JDBC 4.1
如果要使用 JDBC 4.1,请下载对应的 JDBC 驱动程序,并将下载的 JDBC 驱动程序的路径指定为连接方法或连接对象的参数driver_path。
AthenaJDBC41-2.0.7.jar与 JDBC 4.1 兼容,需要 JDK 7.0 或更高版本。
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2',
driver_path='/path/to/AthenaJDBC41_2.0.7.jar')
JDBC 驱动程序配置选项
连接方法或连接对象将关键字参数作为选项传递给 JDBC 驱动程序。如果要更改 JDBC 驱动程序的行为,请将该选项指定为连接方法或连接对象中的关键字参数。
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2',
LogPath='/path/to/pyathenajdbc/log/',
LogLevel='6')
有关 JDBC 驱动程序选项的详细信息,请参阅官方文档。
注意:选项名称和值区分大小写。选项值被指定为字符串。
SQL炼金术
使用pip install SQLAlchemy>=1.0.0或pip install PyAthenaJDBC[SQLAlchemy] 安装 SQLAlchemy。支持的 SQLAlchemy 为 1.0.0 或更高版本且小于 2.0.0。
import contextlib
from urllib.parse import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData
conn_str = 'awsathena+jdbc://{User}:{Password}@athena.{AwsRegion}.amazonaws.com:443/'\
'{Schema}?S3OutputLocation={S3OutputLocation}'
engine = create_engine(conn_str.format(
User=quote_plus('YOUR_ACCESS_KEY'),
Password=quote_plus('YOUR_SECRET_ACCESS_KEY'),
AwsRegion='us-west-2',
Schema='default',
S3OutputLocation=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))
try:
with contextlib.closing(engine.connect()) as conn:
many_rows = Table('many_rows', MetaData(bind=engine), autoload=True)
print(select([func.count('*')], from_obj=many_rows).scalar())
finally:
engine.dispose()
连接字符串具有以下格式:
awsathena+jdbc://{User}:{Password}@athena.{AwsRegion}.amazonaws.com:443/{Schema}?S3OutputLocation={S3OutputLocation}&driver_path={driver_path}&...
如果您不使用实例配置文件凭证或凭证配置文件指定用户(即 AWSAccessKeyID)和密码(即 AWSSecretAccessKey):
awsathena+jdbc://:@athena.{Region}.amazonaws.com:443/{Schema}?S3OutputLocation={S3OutputLocation}&driver_path={driver_path}&...
注意:S3OutputLocation需要报价。如果User,Password和其他参数包含特殊字符,还需要引号。
熊猫
作为数据框
您可以使用pandas.read_sql将查询结果作为DataFrame 对象处理。
from pyathenajdbc import connect
import pandas as pd
conn = connect(User='YOUR_ACCESS_KEY_ID',
Password='YOUR_SECRET_ACCESS_KEY',
S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2',
jvm_path='/path/to/jvm')
df = pd.read_sql("SELECT * FROM many_rows LIMIT 10", conn)
pyathena.util包也有辅助方法。
import contextlib
from pyathenajdbc import connect
from pyathenajdbc.util import as_pandas
with contextlib.closing(
connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/'
AwsRegion='us-west-2'))) as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM many_rows
""")
df = as_pandas(cursor)
print(df.describe())
到 SQL
您可以使用pandas.DataFrame.to_sql将存储在 DataFrame 中的记录写入 Amazon Athena。 pandas.DataFrame.to_sql使用SQLAlchemy,因此您需要安装它。
import pandas as pd
from urllib.parse import quote_plus
from sqlalchemy import create_engine
conn_str = 'awsathena+jdbc://:@athena.{AwsRegion}.amazonaws.com:443/'\
'{Schema}?S3OutputLocation={S3OutputLocation}&S3Location={S3Location}&compression=snappy'
engine = create_engine(conn_str.format(
AwsRegion='us-west-2',
Schema_name='YOUR_SCHEMA',
S3OutputLocation=quote_plus('s3://YOUR_S3_BUCKET/path/to/'),
S3Location=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))
df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
df.to_sql('YOUR_TABLE', engine, schema="YOUR_SCHEMA", index=False, if_exists='replace', method='multi')
Amazon S3 表的位置由连接字符串中的S3Location参数指定。如果未指定 S3Location,将使用S3OutputLocation参数。以下规则适用。
s3://{S3Location or S3OutputLocation}/{schema}/{table}/
数据格式仅支持 Parquet。压缩格式由连接字符串中的压缩参数指定。
凭据
AWS 凭证提供商链
请参阅提供和检索 AWS 凭证
按以下顺序查找凭证的 AWS 凭证提供程序链:
环境变量 - AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY(推荐,因为它们被除 .NET 之外的所有 AWS 开发工具包和 CLI 识别),或 AWS_ACCESS_KEY 和 AWS_SECRET_KEY(仅被 Java 开发工具包识别)
Java 系统属性 - aws.accessKeyId 和 aws.secretKey
来自环境或容器的 Web 身份令牌凭证
所有 AWS 开发工具包和 AWS CLI 共享的默认位置 (~/.aws/credentials) 的凭证配置文件
如果设置了 AWS_CONTAINER_CREDENTIALS_RELATIVE_URI” 环境变量并且安全管理员有权访问该变量,则通过 Amazon EC2 容器服务交付的凭证,
通过 Amazon EC2 元数据服务交付的实例配置文件凭证
在连接方法或连接对象中,您可以通过至少指定S3OutputLocation和AwsRegion进行连接。 如果已设置环境变量、凭据文件或实例配置文件,则不需要用户和密码。
from pyathenajdbc import connect
conn = connect(S3OutputLocation='s3://YOUR_S3_BUCKET/path/to/',
AwsRegion='us-west-2')
测试
取决于以下环境变量:
$ export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ export AWS_DEFAULT_REGION=us-west-2
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
您需要创建一个名为test-pyathena-jdbc的工作组。
运行测试
$ pip install poetry
$ poetry install -v
$ poetry run scripts/test_data/upload_test_data.sh
$ poetry run pytest
$ poetry run scripts/test_data/delete_test_data.sh
运行测试多个 Python 版本
$ pip install poetry
$ poetry install -v
$ poetry run scripts/test_data/upload_test_data.sh
$ pyenv local 3.9.0 3.8.6 3.7.9 3.6.12
$ poetry run tox
$ poetry run scripts/test_data/delete_test_data.sh
代码格式
应用格式
$ make fmt
检查格式
$ make chk
执照
除 JDBC 驱动程序外,所有 Python 代码的许可证都是MIT 许可证。
JDBC 驱动程序
有关 JDBC 驱动程序的许可证,请查看以下链接。