2023년 1월 초에 2022년 한 해 동안 BE에서 DE를 처음하면서 배운 점을 정리한 ‘BE에서 DE 첫걸음을 하면서 느낀 점’이라는 글을 올렸다. 올해도 2023년 한 해 동안 DE를 하면서 배운 점들을 정리해보려고 한다. 올해는 특히 해결해야할 이슈도 많았고 개발한 것도 많기 때문에 할 이야기가 많았다. 이제 첫걸음은 했기 때문에 제목에서 뺐다 ㅎㅎ; 각 섹션별로 특정 기술 혹은 주제를 기준으로 정리했다.
Table 설계
데이터 삽입과 갱신뿐만 아니라 삭제도 생각하면서 만들자.
- 아무래도 삭제는 기능에 큰 영향을 주는 게 아니다보니 크게 고려되지 않을 가능성이 높다.
- 하지만 불필요한 데이터 삭제나 보안상의 이유로 테이블에서 특정 데이터를 모두 삭제해야할 때가 있다.
- unpartitioned table이면 그나마 괜찮지만 partitioned table이면 복잡해진다. 최악으로는 삭제 키가 partition이 아니면 partition을 모두 새로 써야한다.
특정 키로만 데이터 삽입 및 업데이트가 필요하다면 파티션 키로 설정하자.
- 종종 partitioned table에 특정 키에 해당하는 데이터만 삽입하거나 업데이트가 필요할 때가 있다.
- 이때 해당 키가 partition이 아니면 전체 데이터를 모두 새로 써야한다.
- 따라서 해당 키로 partition을 생성해 특정 데이터만 쓸 수 있게 해야한다.
데이터
Encryption은 모든 플랫폼에서 사용가능한 것으로 쓰자.
- Encryption은 언어, 플랫폼 등에 종속적이면 안 된다. 해당 데이터를 Encryption한 언어, 플랫폼 뿐만 아니라 다른 언어, 플랫폼에서도 데이터를 사용할 수 있기 때문이다.
- 조금 더 구체적으로는 파이썬, 스파크, 아테나 등등 모든 언어, 플랫폼에서 지원할만한 보편적인 방법을 써야한다. 예를 들면 AES 같은 것이 있다.
- 네이티브 함수 한 번으로 끝날 작업을 별도 언어로 로직을 다시 작성하고 람다에서 해당 로직을 수행해야하는 등 정말 멀리 돌아가야할 수도 있다. 😇
데이터 검증은 prod 혹은 전체 데이터를 이용하자.
- 굳이 prod 데이터만큼 필요할까 싶어서 일부만 샘플링해서 데이터 검증을 한 적이 있다.
- 그러자 샘플에서는 발견하지 못한 케이스가 프로덕션에서 발견돼 핫픽스를 한 적이 있다.
- 엣지케이스를 최대한 줄이기 위해서는 prod 혹은 전체 데이터로 검증하는 게 좋다.
데이터 파이프라인을 새롭게 구성할 때 처리 로직은 그대로 작성하자.
- 일단 파이프라인을 새롭게 옮기는 작업 자체가 리소스가 많이 드는 일이므로 처리 코드까지 건드리면 시간이 더 오래 걸린다.
- 데이터 파이프라인의 소비자들까지 모두 바꿀 수 없기 때문에 기존과 동일한 결과를 제공해줘야한다.
- 처리 로직을 변경하면 새롭게 작성한 로직으로 만든 데이터에 문제가 없는지 검증이어려워진다.
- 특히 레코드를 제외하는 로직이 추가로 들어가면 실수에 의한 누락인지 로직에 의한 의도적이 누락인지 구분하기가 어렵다.
- 소비자들에게 영향이 그대로 전파되기 때문에 기존 처리 로직이 마음에 안 들더라도 유지하는 걸 매우 권장한다.
- 바꾸더라도 데이터파이프라인을 완전히 옮기고 소비자들과 커뮤니케이션해서 바꿔야한다.
null, empty string 처리를 항상 명심하자.
- 모든 타입에 대해서 값이 없는 경우 null로 데이터가 들어올 수 있고 스트링인 경우 empty string으로 데이터가 들어올 수 있다.
- 검증, 처리 로직에서 항상 필드가 null, empty string으로 들어오는 걸 가정하고 작성해야한다.
- 예를 들어, encryption 유틸에서 null을 암호화해서 에러가 날 수 있다면 조건문으로 null인 경우 encryption하지 않고 그대로 반환하게 만들어야 한다.
API로 외부 데이터를 가져올 때는 ID여도 중복을 꼭 확인하자.
- 보통 API 스펙에 PK처럼 보이는 값은 중복 확인을 안 하기 쉽다.
- 하지만 API로 가져오는 방식이나 외부 데이터 출처의 오류로 인해 ID 값이 중복된 데이터가 수집될 수 있다.
- 예를 들어, 페이징 방식을 통해 가져오면 전 페이지 마지막 데이터와 다음 페이지 첫번째 데이터가 동일한 데이터일 수도 있다.
- 따라서 정합성 검증 때는 drop duplicate 없이 검증해서 원인을 파악하고 파이프라인으로 만들 때는 drop duplicate를 해줘야한다.
환경 구성
dev 환경 구성시 read는 prod에서 write는 dev에서 하자.
- 주기적으로 수집된 데이터가 필요한 데이터 파이프라인 특성 상 read를 dev로 하면 prod에서 매번 복사해야한다.
- 비효율적이고 번거롭기 때문에 처음부터 prod에서 read를 하도록 설정하는 게 좋다.
- 물론 prod RDB에서 읽는 건 다른 작업에 바로 영향을 줄 수 있으므로 다른 방법을 사용하거나 주의 해야한다.
dev, prod 권한은 리소스에 대해 동일하게 설정해두자.
- 일단 dev도 prod 리소스에 접근할 수 있게 하라는 말은 아니다.
- 보통 새로운 피처를 추가할 때 새로운 리소스에 대한 권한을 추가할 때가 많다.
- 이때 개발중이라고 dev에만 해당 리소스에 대한 권한을 추가하면 dev에서 테스트할 때는 되지만 prod로 할 때는 안 돼서 prod 작업이 실패할 수 있다.
- 예를 들면, AWS Glue Catalog 관련 권한을 dev에만 추가하면 dev에서는 Glue Catalog에 파티션을 추가할 수 있지만 prod에서는 파티션을 추가할 수 없다.
Log Pipeline
로그가 생성된 시간과 스토리지에 쌓이는 시간을 구분하자.
- 실시간으로 데이터 스트림에 쌓이는 로그 데이터를 AWS Kinesis Firehose 같은 컨슈머가 버퍼를 기준으로 소비함. 그리고 데이터 분석 및 ML을 위해 객체 스토리지에 파일로 뭉쳐서 저장한다.
- 로그 데이터는 레코드 건수가 많기 때문에 보통 시간 컬럼을 기준으로 파티셔닝을 한다.
- 버퍼는 보통 버퍼 크기나 버퍼링 시간을 넘으면 스토리지로 플러쉬한다. 데이터의 저장 위치는 특별한 설정이 없다면 플러쉬되는 시간을 기준으로 한다.
- 이때 버퍼가 자정 전후로 걸치면 클라우드 정책에 따라 각각의 날짜에 저장되는 게 아니라 하나의 날짜에 모두 저장될 수 있다. (AWS는 oldest record의 인입 시간을 따라감)
- 예를 들어, A 레코드가 2023–10–01 23:59:59에 버퍼링 됐고 B 레코드가 2023–10–02 00:00:01에 버퍼링 됐으면 B 레코드의 파티션은 2023–10–02가 아니라 2023–10–01이 된다.
- 이런 경우 전날 파티션에 다음날 로그가 들어있을 수 있어 다음날 파티션을 조건으로 쿼리를 한 경우 로그가 누락될 수도 있다.
- 따라서 로그가 생성된 시간과 버퍼링 돼서 스토리지에 쌓이는 시간을 동일하게 생각하면 안 된다. 두 개를 명확하게 구분해야한다.
Spark
신뢰할 수 없는 json을 읽을 때 스파크 스키마의 타입은 모두 String으로 하자.
- 이걸 적용하지 올해 초 개발자 인생 중 역대급 트러블슈팅을 했다.
- 스파크는 스키마에 정의한 타입으로 multi line json을 읽을 수 없으면 해당 로우를 포함하는 파일의 모든 로우가 날라가 버린다. mode에 따라서 모든 컬럼이 null로 표시된다.
- 예를 들어, sample.json에 a, b, c 로우가 있고 b 로우의 타입만 스키마에 정의한 타입으로 읽을 수 없더라도 a, b, c 모두 읽을 수 없다.
- json lines로 매 line마다 valid json이 있으면 해당 로우만 읽지 못할 수 있다. 하지만 multi line json이라면 100%로 모든 로우를 읽지 못한다.
- 외부 API는 스펙이 있어도 스펙대로 보내주지 않는 경우가 흔하다. 외부 API 응답 결과를 수집해 스파크로 처리한다면 스키마 타입은 원래 타입과 상관없이 String으로 하는 걸 강력 추천한다.
메타 스토어는 하나로 쓰자.
- 데이터브릭스를 새롭게 쓰면서 AWS Glue와 별개로 메타 스토어를 쓰게 됐다.
- 서로 다른 메타 스토어끼리는 테이블 수준의 연산을 할 수 없다. 파일 수준의 연산만 가능하다.
- 조금 더 구체적으로는 테이블 수준의 연산인 saveAsTable이나 insert into가 불가능하다. 또한 partitioned table이라면 스파크가 아닌 별도 API로 파티션을 등록해줘야한다.
- 이런 불편함을 감수해서 얻는 이득이 크지 않다면 하나로 쓰는 걸 권장한다.
RDB에서 대규모 데이터를 가져올 때는 파티셔닝 전략이 필요하다.
- RDB는 스파크 같은 분산 시스템과 달리 파티션 개념이 없다.
- 그래서 한 방에 다 가져오면 익스큐터 하나에서 처리하기 때문에 느리거나 OOM 발생 가능하다.
- 스파크를 통해 파티셔닝 컬럼을 지정하면 여러 익스큐터에서 더 빠르고 안정적으로 처리가 가능하다.
- 파티셔닝 전략에 따라서 RDB에 많은 부하를 일으키거나 파티션 불균형을 발생시킬 수 있다. 전략에 따라서 가져왔을 때 문제가 없는지 실험을 해보는 게 좋다.
saveAsTable과 InsertInto의 차이를 꼭 알고 있자.
- 생각보다 두 함수는 큰 차이가 난다. 특히 savemode가 overwrite일 때 가장 큰 차이가 있는 것 같다. 이 차이를 몰라서 예상하지 못한 에러나 데이터 삭제가 발생할 수 있다.
- 자세한 내용은 Table Handling in PySpark: Understanding saveAsTable and insertInto을 읽어보는 걸 추천한다.
웬만하면 Spark Session 수준에서 partition over write mode를 dynamic으로 설정하자.
- partitioned table에 데이터를 쓸 때 현재 DF로 파티션을 대체해야하는 요구사항이 없다면 partition over write mode를 dynamic으로 하는 걸 권장한다.
- dynamic으로 설정하지 않고 overwrite하면 table에 있던 파티션 데이터는 사라지고 현재 DF에 있는 파티션으로 대체된다.
- Dataframe writer 수준에서 설정하면 실수할 위험이 크므로 Spark Session 수준에서 설정하는 게 좋은 것 같다.
jdbc를 이용해 RDB에 데이터를 쓸 때, 다음과 같은 사실을 알면 좋다.
- url에 넘겨주는 파라미터는 모두 spark option으로 표현 가능하다.
rewriteBatchedStatements
와batchsize
는 다른 설정이다.rewriteBatchedStatements
는 insert into values에 여러 값이 오도록 만들어준다.batchsize
는 insert into 쿼리를batchsize
만큼 한 번에 RDB에 보낸다.- 파티션 단위로 커넥션을 연다.
- autocommit이 꺼져있고 실패시 롤백을 한다. 그래서 특정 로우 때문에 파티션 쓰기가 실패하면 정상적인 로우도 RDB에 쓰일 수 없다.
repartitionByRange
,sortWithinPartitions
를 하면 더 적은 IOPS로 로 할 수 있다. 특히sortWithinPartitions
가 효과적이다.- MySQL에 저장되는 타임존은 스파크 세션이 아닌
.config("spark.executor.extraJavaOptions"
)이 결정한다. - save mode가 overwrite일 때,
truncate
true
로 설정해야 새로운 테이블을 만들지 않고 기존 테이블을 유지한다. - MySQL 테이블 PK의 collate가 ci(case insensitive)라면 대소문자만 다른 경우 같은 PK로 인식해 실패한다. PK collate를 꼭 확인하자. hive 기반 테이블들은 string 타입이어서 대소문자를 다 구분한다. 그래서 여기에서 이슈가 생길 가능성이 크다.
- 대량의 로우를 읽거나 쓸 때, RDS가 스토리지 타입으로 gp2를 쓴다면 burst balance를 사용량을 주의하자.
Airflow
execution_date를 확실하게 이해하자.
execution_date
를 바탕으로 DAG가 수행되거나 센싱을 하기 때문에 확실히 알아야한다.start_date
, DAG run start_date 도 확실히 이해해야한다.
external task sensor의 센싱 시간을 구할 때 DAG를 이용하자.
- 센싱을 위해서는 센싱할 대상 dag 혹은 task의 execution_date를 지정해줘야한다.
- 직접 계산해서 지정할 수도 있지만 대상 dag 혹은 자신의 스케줄 시간이 바뀌면 바로 깨진다.
- 이럴 때
dag.get_last_dagrun().logical_date
를 이용하면 자동으로 구할 수 있다. - 단, 이렇게 구해서 생기는 사이드이펙트가 있으니 센서가 예상한대로 동작하는지 검증하는 걸 추천한다.
DAG SLA를 구현하고 싶다면 BranchDatetimeOperator 를 사용하자.
- 만약 특정 DAG가 수행되기 전에 종료돼야하는 DAG가 있다면 기대한 시간 내에 끝나는지 확인하는 게 필수적이다. 이때 SLA(Service Level Agreement)를 이용하면 된다.
- Airflow SLA는 태스크의 실행 시간을 기반으로 하고 잘 동작하지 않는다. 메인테이너도 잘 동작하지 않는다고 인정했다;
- 그래서 DAG가 특정 시간 내에 끝나는지 노티를 받고 싶다면 시간에 기반해 동작하는
BranchDatetimeOperator
를 이용할 수 있다. - 지정한 lower 시간보다 이후에 끝나면 끝나야할 시간보다 오래 걸린 것이니 슬랙 같은 도구를 이용해 노티를 받으면 된다.
- 단, 브랜칭할 태스크들 다음에는 dummy task를 둬서 downstream task가 무조건 skipped state가 되는 걸 방지하자.
- 또한 trigger_rule을
ALL_DONE
으로 하지말고NONE_FAILED
로 처리해야 실패시 DAG가 실패할 수 있다.
DAG 실패시 콜백을 수행하고 싶다면 DAG에 지정하자. task에 지정하지 말자.
default_args
또는 task에 직접 지정하면 task의 이벤트에 기반해서 콜백을 수행할 수 있다.DAG
에 지정하면 DAG의 task에 기반해서 콜백을 수행할 수 있다.- 이 차이는 특히 DAG run timeout으로 DAG가 실패할 때, task에 콜백을 지정해 아무런 알림도 받지 못할 때 유용한 팁이다.
- 단, DAG 콜백으로 설정하면 태스크 정보는 의미없는 값일 수 있다.
AWS
S3 versioning은 무조건 키자.
- 아무리 조심한다고 하더라도 실수로 데이터를 지우는 일은 반드시 발생한다.
- versioning을 켜서 delete 연산을 했더라도 언제든지 복구가 가능하게 만들어야 한다.
- 단, S3 versioning을 하면 unpartitioned table의 데이터가 계속 쌓이므로 불필요한 데이터가 계속 쌓일 수 있다.
AWS managed service 공부는 AWS Developer Guide를 참고하자.
- 개발 및 운영을 위해 필요한 개념 및 팁들이 잔뜩있다.
- 개발자 가이드 정독하면 열심히 생각해도 생각 못 한 문제를 미리 발견할 수 있다.
- 특히 lambda concurrency 설명이 예술이다.
확실히 지난 느낀 점에 비해서 주제가 다양해지고 내용도 좀 더 구체적인 것 같다. 그만큼 많은 일들이 있었다는 걸 보여주는 게 아닐까 싶다. 😇 하지만 아직도 쪼렙이기 때문에 더 많은 시행착오가 필요하다. 문제가 발생한 당시에는 너무 힘들었지만 지나고 보면 그게 다 경험으로 쌓이는 것 같다. 2024년에도 다양한 경험을 했으면 좋겠다.