- [스파크 완벽 가이드] 8장 - 조인2024년 10월 24일
- 31514
- 작성자
- 2024.10.24.오후01:52
먼저 스파크에서 조인을 사용하는 방법은 다음과 같다.
joinType = "inner | outer | left_outer | right_outer | left_semi | left_anti | cross" joinExpression = person["graduate_program"] == graduateProgram['id'] person.join(graduateProgram, joinExpression).show()
joinType을 하나 지정하고, join할 때 비교할 하나 이상의 키값을 joinExpression에 지정한다.
그리고 조인을 수행하면 된다.
조인 사용 시 문제점
복합 데이터 타입의 조인
불리언을 반환하는 모든 표현식은 조인 표현식으로 간주할 수 있다.
예를 들어 spark_status라는 리스트 안에 원하는 id 값이 존재하면 True를 반환하는 조인 표현식이 있다고 하자.
from pyspark.sql.functions import expr join_expression = expr("array_contains(spark_status, id)")
이를 사용하면 다음과 같이 조인을 수행할 수 있다.
person.withColumnRenamed("id", "personId").join(sparkStatus, join_expression).show() >>> +--------+----------------+----------------+---------------+---+--------------+ |personId| name|graduate_program| spark_status| id| status| +--------+----------------+----------------+---------------+---+--------------+ | 0| Bill Chambers| 0| [100]|100| Contributor| | 1| Matei Zaharia| 1|[500, 250, 100]|500|Vice President| | 1| Matei Zaharia| 1|[500, 250, 100]|250| PMC Member| | 1| Matei Zaharia| 1|[500, 250, 100]|100| Contributor| | 2|Michael Armbrust| 1| [250, 100]|250| PMC Member| | 2|Michael Armbrust| 1| [250, 100]|100| Contributor| +--------+----------------+----------------+---------------+---+--------------+
중복 컬럼명 처리
조인을 수행할 때 가장 까다로운 것 중 하나는 결과 DataFrame에서 중복된 컬럼명을 다루는 것이다.
DataFrame의 각 컬럼은 스파크 SQL 엔진인 카탈리스트 내에 고유 ID가 있다.
하지만 고유 ID는 카탈리스트 내부에서만 사용할 수 있으며, 직접 참조할 수 없다.
이런 문제를 일으키는 두 가지 상황은 다음과 같다.
- 조인에 사용할 DataFrame의 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 경우
- 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가진 경우
예를 들면 다음과 같이 조인 표현식을 두 테이블의 같은 컬럼명을 사용하여 불리언 형태로 지정했다고 하자.
join_expression = gradProgrameDupe["graduate_program"] == person["graduate_program"]
이를 JOIN하면 다음과 같은 테이블이 나온다.
person.join(gradProgrameDupe, join_expression).show() >>> +---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+ | id| name|graduate_program| spark_status|graduate_program| degree| department| school| +---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+ | 0| Bill Chambers| 0| [100]| 0|Masters|School of Informa...|UC Berkeley| | 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D| EECS|UC Berkeley| | 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D| EECS|UC Berkeley| +---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
언뜻 보면 문제가 없어보이지만, 이러한 컬럼 중 하나를 참조할 때 문제가 발생한다.
person.join(gradProgrameDupe, join_expression).select("graduate_program").show() >>> AnalysisException: [AMBIGUOUS_REFERENCE]Reference `graduate_program` is ambiguous, could be: [`graduate_program`, `graduate_program`].
<해결 방법 1: 다른 조인 표현식 사용>
동일한 이름을 가진 두 개의 키를 사용한다면 가장 쉬운 방법은 불리언 형태의 조인 표현식을 문자열이나 시퀀스 형태로 바꾸는 것이다.
person.join(gradProgrameDupe, "graduate_program").select("graduate_program").show()
이렇게 하면 조인할 때 두 컬럼 중 하나가 자동으로 제거된다.
<해결 방법 2: 조인 후 컬럼 제거>
조인 후 문제가 되는 컬럼을 제거하는 방법이다.
person.join(gradProgrameDupe, join_expression)\ .drop(person.col("graduate_program")).select("graduate_program").show()
<해결 방법 3: 조인 전 컬럼명 변경>
조인 전에 컬럼명을 변경하는 방법이다.
person.withColumnRenamed("id", "personId").join(sparkStatus, join_expression).show()
스파크의 조인 수행 방식
셔플 조인
보통 두 개의 큰 데이터를 조인할 때 발생한다.
큰 데이터는 여러 노드에 분산되어 있으므로, 각 노드에서 동일한 키로 데이터를 파티셔닝해야 한다.
그리고 파티셔닝된 데이터를 네트워크를 통해 다른 노드로 전송하는데, 이때 네트워크 트래픽이 발생할 수 있다.
따라서 미리 repartition() 함수를 통해 동일한 키로 미리 파티셔닝을 해놓으면 셔플 조인을 최적화할 수 있다.
브로드캐스트 조인
큰 테이블과 작은 테이블을 조인할 때 발생할 수 있고, 강제성이 없는 힌트를 줄 수도 있다.
작은 테이블을 큰 테이블이 있는 모든 노드로 복사하므로, 단 한 번의 통신이 발생한다.
다만, 너무 큰 데이터를 브로드캐스트 조인할 경우에는 각 노드에서 고비용의 연산이 발생하므로 부하가 발생할 수 있다.
'Book > 스파크 완벽 가이드' 카테고리의 다른 글
[스파크 완벽 가이드] 10장 - 스파크 SQL (0) 2024.10.23 [스파크 완벽 가이드] 14장 - 분산형 공유 변수 (0) 2024.10.22 [스파크 완벽 가이드] 15장 - 스파크 애플리케이션의 생애주기 (0) 2024.10.21 [스파크 완벽 가이드] 18장 - 모니터링 (0) 2024.10.17 [스파크 완벽 가이드] 17장 - 스파크 배포 환경 (0) 2024.10.16 다음글이전글이전 글이 없습니다.댓글