- [스파크 완벽 가이드] 15장 - 스파크 애플리케이션의 생애주기2024년 10월 21일
- 31514
- 작성자
- 2024.10.21.:09
스파크에서 코드를 실행할 때 어떤 일이 발생하는지 알아보자.
생애주기 - 외부
스파크 드라이버는 애플리케이션 실행을 제어하고, 클러스터(익스큐터의 상태와 태스크)의 모든 상태 정보를 유지한다.
물리적 자원 확보와 익스큐터 실행을 위해 클러스터 매니저와 통신할 수 있어야 한다.
스파크 익스큐터는 드라이버가 할당한 태스크를 수행하는 프로세스이다.
태스크의 상태와 결과를 드라이버에게 보고한다.
클러스터 매니저는 애플리케이션을 실행할 클러스터 머신을 유지한다.
마스터와 워커라는 개념을 가지고 있으며
<스파크 클러스터의 초기 상태>
클러스터 매니저와 물리적인 노드 또는 가상의 노드들이 존재한다.
<스파크 애플리케이션을 제출하면 일어나는 일>
- 수 많은 노드 중에서 스파크 드라이버 역할을 할 노드를 선택하고 애플리케이션 정보를 드라이버에게 전달한다.
- 스파크 드라이버는 애플리케이션 정보를 읽고, 필요한 리소스를 클러스터 매니저에게 요구한다.
- 클러스터 매니저는 리소스 정보를 바탕으로 사용 가능한 노드들 중에서 워커 노드 역할을 수행할 노드를 선택하고, 익스큐터를 실행한다.
- 스파크 드라이버는 익스큐터와 통신하며 수행할 태스크를 분배한다.
- 익스큐터는 수행하는 태스크의 상태를 보고한다.
스파크 드라이버는 제출된 애플리케이션 당 생성되며, 워커 노드, 익스큐터, 태스크는 다수가 생성될 수 있다.
스파크 드라이버가 클러스터 환경에서 실행되면 클러스터 드라이버, 로컬 모드에서 실행되면 로컬 드라이버다.
생애주기 - 내부
모든 스파크 애플리케이션은 가장 먼저 SparkSession을 생성한다.
SparkSession을 생성할 때 빌더 메서드 사용을 추천한다.
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").appName("Word Count")\ .config("spark.some.config.option", "some-value")\ .getOrCreate()
모든 스파크 코드는 RDD 명령으로 컴파일되고, 트랜스포메이션과 액션으로 구성된다.
df1 = spark.range(2, 100000000, 2) df2 = spark.range(2, 100000000, 4) step1 = df1.repartition(5) step12 = df2.repartition(6) step2 = step1.selectExpr("id * 5 as id") step3 = step2.join(step12, ["id"]) step4 = step3.selectExpr("sum(id)") step4.collect()
마지막 부분의 collect()와 같은 액션을 수행하면 잡이 실행된다.
<JOB>
보통 액션 하나당 하나의 스파크 잡이 생성되며 항상 결과를 반환한다.
잡은 일련의 스테이지로 나뉜다.
<Stage>
스테이지는 다수의 머신에서 동일한 연산을 수행하는 태스크의 그룹을 나타낸다.
셔플 작업이 일어난 다음에 반드시 새로운 스테이지를 시작한다.
<Task>
스테이지의 가장 작은 실행 단위로, 파티션에 적용되는 연산 단위를 말한다.
만약 1,000개의 작은 파티션으로 구성되어 있다면 1,000개의 태스크를 만들어 병렬로 실행할 수 있다.
코드 실행 과정
- df1 = spark.range(2, 100000000, 2)
- Stage1 : range 메서드는 데이터를 기본적으로 8개의 파티션으로 나뉘어 생성한다. 따라서 8개의 태스크가 생성된다. range 메서드는 셔플링이 발생하지 않지만, 새로운 데이터를 생성하는 연산이므로 스테이지가 나뉘어진다.
- df1 = spark.range(2, 100000000, 4)
- Stage2 : 1번과 설명이 같다.
- step1 = df1.repartition(5)
- Stage3 : df1 데이터프레임을 repartition 메서드를 통해 5개의 파티션으로 재분배하므로 셔플링이 발생한다. 따라서 새로운 스테이지가 생성되고 5개의 태스크가 생성된다.
- step12 = df2.repartition(6)
- Stage 4 : 3번과 설명이 같다.
- step2 = step1.selectExpr("id * 5 as id")
- Stage 4 : 이 연산은 셔플이 발생하지 않고, 각 파티션에서 수행된다. 따라서 새로운 스테이지가 생성되지 않는다.
- step3 = step2.join(step12, ["id"])
- Stage 5 : 두 DataFrame이 각각 다른 파티션에 저장되어 있으므로, 데이터를 셔플링하여 다시 파티션을 분배해야 한다. 따라서 새로운 스테이지가 생성되고, 조인의 경우 spark.sql.shuffle.partitions 속성의 기본값으로 인해 200개의 파티션과 태스크가 생성된다.
- step4 = step3.selectExpr("sum(id)")
- Stage 6 : 전체 데이터에서 합계를 계산하기 위해 데이터가 하나의 파티션으로 모아지게 되므로, 새로운 스테이지가 생성되고 1개의 태스크가 생성된다.
- step4.collect()
- 액션으로 마지막 스테이지(Stage 6)에서 드라이버 노드로 결과를 가져온다.
'Book > 스파크 완벽 가이드' 카테고리의 다른 글
[스파크 완벽 가이드] 10장 - 스파크 SQL (0) 2024.10.23 [스파크 완벽 가이드] 14장 - 분산형 공유 변수 (0) 2024.10.22 [스파크 완벽 가이드] 18장 - 모니터링 (0) 2024.10.17 [스파크 완벽 가이드] 17장 - 스파크 배포 환경 (0) 2024.10.16 [스파크 완벽 가이드] 9장 - 데이터소스 (0) 2024.10.15 다음글이전글이전 글이 없습니다.댓글