Book/스파크 완벽 가이드

[스파크 완벽 가이드] 8장 - 조인

31514 2024. 10. 24. 13:52

먼저 스파크에서 조인을 사용하는 방법은 다음과 같다.

joinType = "inner | outer | left_outer | right_outer | left_semi | left_anti | cross"
joinExpression = person["graduate_program"] == graduateProgram['id']
person.join(graduateProgram, joinExpression).show()

joinType을 하나 지정하고, join할 때 비교할 하나 이상의 키값을 joinExpression에 지정한다.

그리고 조인을 수행하면 된다.

 

조인 사용 시 문제점

복합 데이터 타입의 조인

불리언을 반환하는 모든 표현식은 조인 표현식으로 간주할 수 있다.

예를 들어 spark_status라는 리스트 안에 원하는 id 값이 존재하면 True를 반환하는 조인 표현식이 있다고 하자.

from pyspark.sql.functions import expr

join_expression = expr("array_contains(spark_status, id)")

이를 사용하면 다음과 같이 조인을 수행할 수 있다.

person.withColumnRenamed("id", "personId").join(sparkStatus, join_expression).show()

>>>
+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+

 

중복 컬럼명 처리

조인을 수행할 때 가장 까다로운 것 중 하나는 결과 DataFrame에서 중복된 컬럼명을 다루는 것이다.

DataFrame의 각 컬럼은 스파크 SQL 엔진인 카탈리스트 내에 고유 ID가 있다.

하지만 고유 ID는 카탈리스트 내부에서만 사용할 수 있으며, 직접 참조할 수 없다.

 

이런 문제를 일으키는 두 가지 상황은 다음과 같다.

  • 조인에 사용할 DataFrame의 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 경우
  • 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가진 경우

 

예를 들면 다음과 같이 조인 표현식을 두 테이블의 같은 컬럼명을 사용하여 불리언 형태로 지정했다고 하자.

join_expression = gradProgrameDupe["graduate_program"] == person["graduate_program"]

 

이를 JOIN하면 다음과 같은 테이블이 나온다.

person.join(gradProgrameDupe, join_expression).show()

>>>
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|graduate_program| degree|          department|     school|
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|               0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|               1|   Ph.D|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|               1|   Ph.D|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+

 

언뜻 보면 문제가 없어보이지만, 이러한 컬럼 중 하나를 참조할 때 문제가 발생한다.

person.join(gradProgrameDupe, join_expression).select("graduate_program").show()

>>>
AnalysisException: [AMBIGUOUS_REFERENCE]Reference `graduate_program` is
ambiguous, could be: [`graduate_program`, `graduate_program`].

 

<해결 방법 1: 다른 조인 표현식 사용>

동일한 이름을 가진 두 개의 키를 사용한다면 가장 쉬운 방법은 불리언 형태의 조인 표현식을 문자열이나 시퀀스 형태로 바꾸는 것이다.

person.join(gradProgrameDupe, "graduate_program").select("graduate_program").show()

이렇게 하면 조인할 때 두 컬럼 중 하나가 자동으로 제거된다.

 

<해결 방법 2: 조인 후 컬럼 제거>

조인 후 문제가 되는 컬럼을 제거하는 방법이다.

person.join(gradProgrameDupe, join_expression)\
    .drop(person.col("graduate_program")).select("graduate_program").show()

 

<해결 방법 3: 조인 전 컬럼명 변경>

조인 전에 컬럼명을 변경하는 방법이다.

person.withColumnRenamed("id", "personId").join(sparkStatus, join_expression).show()

 

스파크의 조인 수행 방식

셔플 조인

보통 두 개의 큰 데이터를 조인할 때 발생한다.

큰 데이터는 여러 노드에 분산되어 있으므로, 각 노드에서 동일한 키로 데이터를 파티셔닝해야 한다.

그리고 파티셔닝된 데이터를 네트워크를 통해 다른 노드로 전송하는데, 이때 네트워크 트래픽이 발생할 수 있다.

따라서 미리 repartition() 함수를 통해 동일한 키로 미리 파티셔닝을 해놓으면 셔플 조인을 최적화할 수 있다.

 

브로드캐스트 조인

큰 테이블과 작은 테이블을 조인할 때 발생할 수 있고, 강제성이 없는 힌트를 줄 수도 있다.

작은 테이블을 큰 테이블이 있는 모든 노드로 복사하므로, 단 한 번의 통신이 발생한다.

다만, 너무 큰 데이터를 브로드캐스트 조인할 경우에는 각 노드에서 고비용의 연산이 발생하므로 부하가 발생할 수 있다.