목적
자동화된 pyspark Transformation 유닛 테스트 코드를 어떻게 구성했는지 소개합니다.
배경
저는 웹 프레임워크를 사용하는 백엔드 코드에 익숙한 개발자입니다. 최근 회사에서 데이터 엔지니어링 관련 업무를 맡으면서 pyspark를 처음 사용해봤습니다. pyspark도 백엔드처럼 비지니스 로직이 들어간 부분을 테스트하고 싶었습니다. 특히 비지니스 로직이 많이 들어간 Transformation은 더더욱 테스트하고 싶었습니다.
왜냐하면 Transformation 코드를 변경할 때마다 클러스터 환경에서 문제가 없는지 테스트해보는 게 번거로웠습니다. 코드가 많아지면 많아질수록 번거로움이 더 커진다고 생각했습니다. 또한 자동화된 테스트가 없다면 귀찮아서 혹은 실수로 테스트를 안 할 수 있습니다. 이에 따라 코드의 안전성도 떨어질 것 같았습니다. 특히 Transformation은 실행 자체에는 문제가 없어도, 결과가 잘못 되는 치명적인 케이스도 자주 발생할 것이라고 생각했습니다.
이런 여러 이유로 테스트 코드를 짜야겠다는 결심을 했습니다. 😇
테스트 코드 작성 전 체크 사항
본격적으로 테스트 코드를 짜기 전에 살펴볼 것들이 있습니다.
코드 구조
AlexIoannides/pyspark-example-project 레포를 보면 pyspark 어플리케이션이 갖춰야할 베스트 프랙티스를 소개하고 있습니다. pyspark를 처음 써봐서 디렉토리 구조도 어떻게 잡아야할지 감이 안 오던 제게 단비같은(?) 레포였습니다.
특히 여기서 structure-of-an-etl-job 섹션 부분을 잘 봐야합니다.
In order to facilitate easy debugging and testing, we recommend that the ‘Transformation’ step be isolated from the ‘Extract’ and ‘Load’ steps, into its own function — taking input data arguments in the form of DataFrames and returning the transformed data as a single DataFrame.
Transformation이 Extract와 Load 단계와 분리돼있어야 합니다. 그래야 테스트에서 딱 Transformation 부분에만 집중해 테스트를 할 수 있습니다.
패키지
테스트를 위해 패키지들도 필요합니다. 패키지 설치를 위해서는 python3의 내장 기능인 venv를 쓰셔도 되고 pyenv를 쓰셔도 됩니다. 이 부분의 설명은 생략합니다.
- pyspark: 파이썬에서 spark를 사용하기 위해 필요합니다.
- pytest: 파이썬 테스트 코드를 작성하기 위해 필요합니다.
- chispa: pyspark 테스트 헬퍼 메서드를 제공합니다. pyspark는 DataFrame이라는 별도의 자료구조를 사용하기 때문에 pytest만으로는 테스트가 까다롭습니다.
패키지는 딱 이 3개면 충분합니다. 이제 테스크 코드 작성으로 넘어갑시다.
테스트 코드 작성
아래 나오는 코드들은 모두 제 레포인 pyspark-sample에서 가져왔습니다. 자세하게 보고 싶으시면 레포를 참고하시면 됩니다.
간단한 코드 및 데이터 소개
책 판매 데이터를 처리한다고 가정했습니다. 아래와 같이 생긴 json이 있고 json을 처리해 다시 json 데이터를 만들어냅니다. output은 코드를 수행할 때마다 변경될 수 있으므로 레포에 올리지 않았습니다. transforamtion
함수에 Transformation이 있습니다.
# Input
[
{
"title": "Learning Spark, Second Edition",
"price": 70,
"author": "Jules Damji",
"rate": 4.5 },
...# Output
{"lower_title":"learning spark, second edition","price_won":70000,"first_name":"Jules","last_name":"Damji","rate_comment":"very good"}
...
Fixture
pytest에서 권장하는 방식 중 하나는 conftest.py
라는 파일에 테스트를 하기 위한 준비물인 fixture를 정의해놓습니다. 그리고 이 fixture를 테스트에서 사용합니다.
- spark session
로컬 환경에서 테스트를 수행하기 위해 각 테스트 케이스에 로컬에서 생성한 SparkSession
을 주입해줘야합니다. 일반적인 파이썬 코드와 달리 pyspark 코드는 실행을 위해 비교적 더 많은 시간이 필요합니다. 기본적으로 초 단위의 시간이 걸립니다. spark의 여러 컨텍스트를 담고 있는 SparkSession
을 테스트 케이스마다 생성하는 것은 테스트 시간을 급격하게 증가 시킵니다. 그래서 SparkSession
의 scope는 테스트 수행 시 한 번 생성되고 테스트가 끝날 때 없애는 session
scope로 설정해줍니다.
@pytest.fixture(scope="session")
def spark() -> SparkSession:
return SparkSession.builder.master("local[*]").getOrCreate()
- Input, Output DF schema
스키마가 비교적 단순하다면 타입 지정 없이 컬럼 이름만 지정해도 충분할 수 있습니다. 하지만 비교적 복잡해 컬럼 이름만 지정해서는 원래 스키마와 동일하게 나오지 않는다면, StructType
을 이용하는 방법이 있습니다. schema 추론에 의존하지 않기 때문에 지정한 타입대로 데이터 프레임의 스키마가 결정됩니다. 단, 스키마에 지정되지 않은 필드는 무시되니 주의하시길 바랍니다.
@pytest.fixture(scope="session")
def input_schema() -> T.StructType:
return T.StructType(
[
T.StructField("title", T.StringType(), True),
T.StructField("price", T.IntegerType(), True),
T.StructField("author", T.StringType(), True), T.StructField("rate", T.FloatType(), True),
]
)
- Input, Outpt DataFrame
스파크는 보통 transformation의 인풋과 아웃풋으로 DataFrame
을 이용합니다. 데이터 프레임을 만들 때, 스파크 자료구조인 Row
를 이용하면 컬럼 이름을 명시적으로 정의하면서 테스트 데이터를 만들 수 있습니다.
def get_given_then_df(
spark: SparkSession,
input_schema: T.StructType,
output_schema: T.StructType
) -> tuple[DataFrame, DataFrame]:
given = [Row(title="Test Spark", price=10, author="Hyemi Noh", rate=5.0)]
given_df = spark.createDataFrame(data=given, schema=input_schema)
...
테스트 로직
given, when, then으로 분리해 작성해줍니다. 위에서 소개해드렸던 chispa
패키지를 이용해 검증을 합니다. 아마 nullable은 비교하고 싶지 않다거나, 컬럼 순서는 비교하고 싶지 않다거나 여러 요구사항이 있으실 겁니다. 이를 위해, 여러가지 옵션이 있으니 chispa
공식 레포를 참고하시면 좋을 것 같습니다. (given에서 then 데이터 프레임을 얻어 좀 애매합니다. 거슬리신다면 두 함수를 분리하는 걸 추천합니다.)
def test_success(
self,
spark: SparkSession,
input_schema: T.StructType,
output_schema: T.StructType,
):
# given
given_df, then_df = get_given_then_df(
spark=spark, input_schema=input_schema, output_schema=output_schema) # when
result = etl.transform(given_df) # then
assert_df_equality(result, then_df, ignore_nullable=True)
테스트가 실패하면 아래처럼 어떤 부분이 실패했는지 프린트를 해줍니다. 그래서 비교적 쉽게 디버깅을 할 수 있습니다.
E chispa.dataframe_comparer.DataFramesNotEqualError:
E +---------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+
E | df1 | df2 |
E +---------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+
E | Row(lower_title='test spark', price_won=10000, first_name='Hyemi', last_name='Noh', rate_comment='very good') | Row(lower_title='test spark', price_won=10000, first_name='Hyemi', last_name='Noh', rate_comment='very goo') |
E +---------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+.venv/lib/python3.9/site-packages/chispa/dataframe_comparer.py:80: DataFramesNotEqualError
github action 연동
github action에 대한 자세한 설명은 다른 블로그 글을 참고하시면 좋을 것 같습니다. on: pull_request
로 설정하면 PR을 열었을 때와 PR에 커밋을 할때마다 테스트가 수행됩니다. actions를 이용해 파이썬과 자바를 runner에 설치하고 requirements.txt
로 필요한 패키지들을 설치합니다. pytest
에 -m
옵션을 주는 이유는 pytest가 파이썬 실행 경로를 찾을 수 있도록 하기 위해서입니다.
name: Unit Test
on:
pull_request:
branches: [ '**' ]jobs:
test:
name: Unit Test
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9 - name: Set up Java 11
uses: actions/setup-java@v3
with:
distribution: 'adopt'
java-version: '11' - name: Python PIP Install
uses: BSFishy/pip-action@v1
with:
requirements: requirements.txt
- name: pytest
run: python3 -m pytest tests
기타
처음 테스트 데이터 프레임을 만들 때, struct type과 array type을 어떻게 정의해야할지 몰라서 헤맸던 기억이 있습니다. 그래서 이 방법도 설명드리려고 합니다. 👀
- struct type
some = {"key": "value"}
Row(some=some)
- array type
some = ["some"]
Row(some=some)
- struct of array
some = [{"key": "value"}]
Row(some=some)
결론
스파크 Transformation에 자동화된 유닛 테스트를 적용하는 방법을 알아봤습니다. 코드 구조에서 Transformation만 분리돼있다면 필요한 패키지를 사용해 쉽게 테스트 코드를 작성할 수 있습니다.
테스트 코드를 짜면 Transformation 로직을 작성하거나 변경할 때, 별도의 클러스터를 띄우지 않고 로컬에서 코드의 수행 결과를 확인할 수 있습니다. 또한 기존에 여러 테스트 코드를 잘 작성해놓는다면, Transformation 코드의 오류도 미리 빠르게 발견할 수 있을 것 입니다 🙂
+) 아직 스파크에 완전히 익숙한 상태에서 작성한 글이 아닙니다. 이렇게 테스트 코드를 짤 수도 있구나만 참고해주시면 좋을 것 같습니다.