用于测试 Spark 集合(如 DataFrames)的断言函数。
项目描述
用于测试 Spark 集合(如 DataFrames)的断言函数集合
—
在开发 Spark 应用程序时,您最终可以编写在 Spark DataFrames 上应用转换的方法。为了测试结果,您可以创建pandas DataFrames并使用pandas提供的测试功能,因为pyspark不提供任何辅助测试的功能。
spark-test提供类似于pandas但面向 Spark Collections 的测试功能。
假设您有一个函数可以在 Spark DataFrame 上应用一些转换(此示例的完整代码可以在 tests/test_example.py 中找到:
def transform(df):
"""
Fill nulls with 0, sum 10 to Age column and only return distinct rows
"""
df = df.na.fill(0)
df = df.withColumn('Age', df['Age'] + 10)
df = df.distinct()
return df
然后我们可以编写一个包含尽可能多的测试输入的测试用例,并使用assert_dataframe_equal测试结果:
from spark_test.testing import assert_dataframe_equal
def test_transform(spark, transform):
input_df = spark.createDataFrame(
[['Tom', 25], ['Tom', 25], ['Charlie', 24], ['Dan', None]],
schema=['Name', 'Age']
)
expected = spark.createDataFrame(
[['Tom', 35], ['Charlie', 34], ['Dan', 0]],
schema=['Name', 'Age']
)
result = transform(input_df)
assert_frame_equal(expected, result)
当然,测试失败时会更有趣,所以让我们在转换函数中引入一个错误:
def bugged_transform(df):
"""
Fill nulls with 0, sum 10 to Age column and only return distinct rows
"""
df = df.na.fill(1) # Whoops! Should be 0!
df = df.withColumn('Age', df['Age'] + 10)
df = df.distinct()
return df
使用pytest.mark.parametize 将这两个函数传递给我们的测试会产生以下输出,其中包含关于失败的消息:
$ pytest tests/example.py
============================= test session starts =============================
platform linux -- Python 3.7.3, pytest-5.0.0, py-1.8.0, pluggy-0.12.0
rootdir: /home/tfarias/repos/spark-test
collected 2 items
tests/example.py .F [100%]
================================== FAILURES ===================================
_______________________ test_transform[bugged_transform] ________________________
assert left_d[key] == right_d[key], msg.format(
> field=key, l_value=left_d[key], r_value=right_d[key]
)
E AssertionError: Values for Age do not match:
E Left=10
E Right=11
执照
根据 MIT 许可证分发。