Depp Dive 시리즈는 공부를 통해 알게된 지식을 기록, 공유하기 위한 포스팅입니다.

잘못된 정보에 대한 지적과 조언 감사합니다 :D

 

1. Spark란?

Apache Spark는 대규모의 클러스터 환경에서 데이터를 병렬적으로 빠르게 처리하기 위한 엔진이며, 오픈 소스입니다.

기존 데이터 처리는 Hadoop의 MapReduce(이하 MR) 연산과 같이 처리되었는데, MR은 맵 과정과 리듀스 과정의 중간 단계에서 나오는 결과값을 Disk에 저장, 참조하여 이에 Disk I/O가 발생해 성능에 큰 영향을 끼쳤습니다.

 

Spark는 인-메모리 형식으로 데이터를 처리하며, 클러스터링 된 여러 대의 서버의 메모리,core,CPU를 활용하여 작업에 필요한 리소스를 할당, 이 리소스 내에서 데이터를 처리하기에 빠른 연산이 가능합니다. 이에 실시간 처리가 더욱 중요해지는 요즘 많은 기업들이 Spark를 채택하고 있습니다.

 

2. 어떻게 사용하는가?

Spark는 사용자가 작성한 코드를 기반으로 동작됩니다. 작성된 코드는 트리거 지점에 Spark Context 혹은 Spark Session을 초기화 하도록 작성되는데, 이를 통해 Spark 클러스터가 연동된 클러스터 매니저가 해당 작업에 필요한 리소스를 요청받고, 할당하여 후의 사용자가 정의한 코드들을 수행합니다. 언어는 Scala, Python, Java를 지원합니다.

 

3. Spark의 구조

Spark는 크게, Driver, 클러스터 매니저로 구분되며, Spark Application은 하나의 Driver, 여러개의 Executor들로 이루어진 Spark가 실행되는 프로그램입니다. Spark의 동작 구조를 알아보며 더욱 깊게 이해해보겠습니다.

 

4. Spark Application의 주요 용어

spark-submit: 사용자가 작성한 코드를 Spark에 제출하기 위한 명령어입니다.

 

driver: main() 함수를 실행 시키고, Spark Context 혹은 Spark Session을 생성하며, 사용자의 입력에 응답하고, Application 정보를 유지, 관리하는 프로세스입니다.

 

Spark Context 혹은 Spark Session: 클러스터 매니저와 연결되며, 작업에 필요한 리소스를 요청하고, 전체적인 작업인 job을 executor에서 실행될 분할된 작업 단위인 task로 나누어 executor들에게 전달합니다.

 

클러스터 매니저: Spark Application에 필요한 리소스를 요청 받고, 워커 노드들 내부에 executor를 생성하는 역할을 수행합니다. yarn, mesos, kubernetes를 지원하며, spark 자체적으로 매니징하는 spark standalone 모드를 지원합니다. yarn일 경우 워커노드는 노드매니저입니다.

 

executor: 실제 task 단위의 작업을 처리하는 프로세스이며, 결과를 driver에게 전달합니다. executor는 워커 노드 내부에 생성됩니다.

 

5. Spark Application의 동작 순서

1. spark-submit을 통해 사용자는 작업을 제출합니다.

2. driver가 생성되며, main()함수를 실행시키고 Spark Context 혹은 Spark Session을 생성합니다.

3. driver 프로그램이 작업에 필요한 executor들을 클러스터 매니저에게 요청합니다.

4. Driver 내의 Spark Context 혹은 Spark Session이 job을 task단위로 쪼개어 executor들에게 할당합니다.

5. executor가 실제 task를 수행하며, 결과를 driver에게 전달합니다.

 

6. 클러스터 매니저 채택

spark-submit 또는 spark-shell, pyspark 명령어 실행 시, 클러스터 매니저를 지정하는 --master 옵션 값을 확인 할 수 있습니다.

 

클러스터 매니저를 사용하지 않는, 즉, 클러스터를 사용하지 않고 단일 서버에서만 작업을 실행하기 위한 local모드,

클러스터 내 여러 노드들에서 작업을 실행하고, 매니저를 채택하는 클러스터 모드(yarn, kubernetes, standalone 등)가 있습니다.

 

1. 로컬 모드

단일 노드에서 작업이 수행되며, 클러스터를 사용하지 않습니다. 주로 간단한 테스트가 필요할 때 사용됩니다.

ex) spark-submit --master local -> (=local[1], core와 executor가 각각 1개씩만 사용)

      spark-submit --master local[*] -> (1개의 executor와 단일 노드가 가지고 있는 모든 core를 사용)

      spark-submit --master local[N] -> (1개의 executor와 N개의 core를 사용)

 

2. 클러스터 모드

로컬모드와 달리 클러스터 노드들을 활용하며, 클러스터 매니저가 존재합니다.

yarn, kubernetes, standalone모드 등이 있으며, 주로 사용했던 yarn, standalone 모드를 알아보겠습니다.

 

standalone 모드:

spark에서 자체적으로 제공하는 클러스터 매니저를 사용합니다.

(* 다른 클러스터 프레임워크들은 standalone이라고 한다면 한 대의 서버가 실행된다의 개념이 많은데, spark에서 standalone모드는 클러스터 매니저로 yarn, kubernetes와 같이 외부 매니저가 아닌 자체 내장된 매니저를 채택한 상황을 의미합니다.)

이때, spark master와 worker 프로세스가 실행되어야하며, master 노드가 worker노드들의 리소스를 관리, 작업에 필요한 리소스를 할당합니다. 리소스 현황, 작업 현황 등을 spark master UI에서 확인가능합니다.

standalone모드에서는 각 노드에 하나의 executor만 실행될 수 있습니다.

ex) spark-submit --master spark://[master IP]:[master PORT]

 

yarn 모드:

클러스터 매니저로 yarn을 채택한 상황입니다. 

이 때, exectuor들은 yarn의 node manager 내부에 생성됩니다.

yarn의 resource manager가 클러스터의 리소스를 관리, spark 작업에 필요한 리소스를 할당 및 스케쥴링합니다.

ex) spark-submit --master yarn

 

7. 배포 모드 채택

클러스터의 여러 노드들을 활용하여 작업을 처리하는 클러스터 모드 환경에서 적용되는 방식이며, 단일 노드에서 작업이 실행되는 local모드에서는 적용되지 않는 방식입니다. 가장 중요한 차이점은 driver의 생성 위치입니다.

 

1. client 모드

client 모드는 driver가 spark 작업이 제출된 서버에서 생성됩니다. 즉, 클러스터 외부에서 drvier가 실행됩니다.

이에, spark 작업을 제출한 서버에서 콘솔을 닫거나, spark 작업을 취소시키면 모든 작업이 중단됩니다.

client가 spark 작업에 개입할 수 있다는 것 입니다. deploy-mode를 client, 혹은 옵션 값을 주지 않으면, default로 적용되는 모드입니다. 

ex) spark-submit --master yarn deploy-mode client or 빈 값

 

2. cluster 모드

cluster모드는 driver가 spark 작업이 제출된 서버가 아닌, 클러스터 내부 노드들 중 어딘가에 생성됩니다.

또한, spark 작업이 제출되면, 작업에 대한 리소스, 권한 등이 클러스터 매니저가 위임합니다. 이에 작업을 제출한 client는 제출된 작업에 대해 더이상 개입할 수 없으며, client 모드에서 콘솔을 닫거나 작업을 취소시키면 spark 작업이 중단되었지만, cluster 모드에서는 모든 작업이 클러스터 매니저에 의해 조정되기에 client가 개입할 수 없습니다.

 

3. 두 모드 활용 사례

client모드는 대게 개발 환경에서 많이 사용됩니다. spark 작업에 대한 결과 값을 빠르게 출력 받고, 테스트할때 대화형(spark-shell, pyspark 등)으로 자주 사용됩니다. cluster 모드는 운영 환경에서 자주 사용되며, 배치, 실시간 처리가 계속 이루어지는 환경에서 필요한 리소스들을 클러스터 매니저가 조정하는 그림일 수 있겠습니다.

 

또한, 가장 중요한 차이는 driver의 실행 위치라고 언급드렸습니다. 만약 spark 작업이 제출되고, 코드 내에 PostgreSQL에 데이터를 Insert하는 코드가 존재한다고 가정하겠습니다. 사용자는 코드 상에 PostgreSQL에 접근하기 위한 postgresql.jar와 같은 JDBC 파일의 위치를 작성하였을 것 입니다. client모드에서는 작업이 제출된 서버 = spark driver가 생성될 노드이므로, 작업이 제출될 서버에만 해당 JDBC파일, 즉, 종속성을 가진 파일들이 존재하면 될 것입니다.

 

하지만 위와 같은 환경에서, cluster 모드로 작업을 제출하였다면, driver는 클러스터의 노드들 중 어딘가에 생성되고, 이것은 client가 조정할 수 없는 영역입니다. 어디서 driver가 생성될지는 클러스터 매니저가 결정하기에 알 수 없다는 것 입니다. driver가 실행된 노드에 JDBC 종속성 파일을 위치하여야 코드가 동작하지만, 생성 위치를 알 수 없습니다.

 

이는, spark 작업이 실행되는 클러스터의 모든 노드들의 spark 종속성 파일을 담아놓은 경로(ex. /opt/spark/jars)에 해당 JDBC 파일을 올려두는 방법이 있을 수 있습니다. 물론 이러한 종속성 파일이 많아진다면, 그 때마다 모든 노드들에게 파일을 업로드 해야 할 것 입니다. 또 다른 방법으로는 pyspark 코드를 예로 필요한 종속성 파일을 zip파일로 묶어  spark-submit 명령어 실행 시 해당 zip 파일을 인자로 전달 가능하고, driver가 생성될 노드, task가 실제 수행될 노드들의 내부에 해당 종속성 파일이 존재하지 않아도, 인자로 받은 zip파일 내부의 파일들을 참조하여 정상적으로 수행이 가능합니다.

 

두 방법 다 가능하며, 개발자의 성향에 따라 채택 가능합니다.

 

 

8. Spark Context, Session 차이

spark 1버전에서는 Context, 2버전 부터 Sesssion이 도입 되었습니다.

두 가지 모두, spark 작업이 실행되기 위한 엔트리 포인트라고 볼 수 있으며, 객체의 초기화, 클러스터 매니저와의 통신으로

작업에 필요한 리소스를 요청, job을 task 단위로 나누어 executor에 할당하는 역할입니다.

 

차이점은 Context는 RDD기반, Session은 DataFrame 기반이라는 것 입니다.

Context는 Spark 작업을 생성, 실행하며, Hive Context, SQL Context와 통합하여 구조, 반구조화된 데이터를 처리할 수 ㅇ있습니다. Session은 내부적으로는 결국 Context 객체를 만듭니다. 이에, Context의 모든 기능을 포함하고 있고, Dataframe, Dataset API 기능이 탑재되어있어, 고수준의 API를 활용할 수 있습니다. Parqeut, ORC 등 다양한 데이터 포맷에 접근 할 수 있는 통합 인터페이스를 제공하고, MLlib, Spark Streaming, GraphX와 같은 다양한 라이브러리 역시 활용 가능합니다.

 

9. RDD와 DataFrame, Dataset의 차이

RDD는 빠른 병렬 연산과 맵리듀스 작업을 위한 자료구조입니다. 불변성을 가지고, lineage를 통한 fault tolerent를 보장합니다. map, join과 같이 rdd에서 새로운 rdd를 만드는 transformation 함수와 rdd가 아닌 타입의 데이터 결과를 반환하는 count, collect와 같은 action 함수로 이루어집니다. rdd 사용 시, 자바 직렬화를 사용하기에 성능상의 이슈가 있을 수 있고, 최적화를 직접 해주어야 합니다.

 

DataFrame 역시 불변성을 가지고, 스키마를 가집니다. 이에 sql과 같이 친숙한 쿼리가 가능하며, 자동으로 옵티마이징 기능이 포함되어있어 성능 상 우수 합니다. 

 

Dataset은 DataFrame과 같이 스키마를 가지고, 불변성을 가지며, type-safey를 통해 spark.sql의 문법 에러를 컴파일 단계에서 알 수 있으며, 2.0버전부터 DataFrame과 통합되었습니다.

 

10. spark partition

partition은 RDD나 Dataset을 구성하고 있는 최소 단위의 객체입니다. 각각의 파티션은 클러스터 내 각기 다른 노드들에서 분산 처리 되며, Spark의 최소 연산단위는 Task이고, 하나의 Task는 하나의 Core가 수행합니다. 이는 1Core = 1 Task = 1 Partition 임을 뜻합니다. 이에 partition의 수가 많다면 Core의 수가 많아야하고, partition의 크기가 크다면 메모리의 크기가 커야함을 의미합니다.

 

partition의 종류는 3가지로 분류됩니다.

1. Input Partition: 처음 데이터를 읽을 때 파티션 수를 지정하는 방식입니다.

ex) spark.conf.set("spark.sql.files.maxPartitionBytes", 값)

만약, HDFS의 데이터를 읽고 처리하는 Spark 작업 일 시, 해당 설정 값이 128MB이고, HDFS상 마지막 경로의 파일의 크기가 128MB보다 크다면, Spark에서는 128MB 만큼 나누어 파일을 읽습니다.

 

2. output partition: 파티션의 수를 재조정하고 저장할 때의 방식입니다. 해당 파티션 수가 마지막에 저장되는 파일의 수를 지정하게 됩니다.

ex) coalesce()는 파티션을 줄일 때 사용되는 함수. 강제 옵션을 통해 늘릴 수도 있지만, 셔플이 발생하게 됩니다.

      repartition은 파티션을 늘릴 때 사용되며, 셔플이 일어나게 됩니다.

셔플(Shuffle)이란, filter, groupBy, join과 같은 연산 시, spark의 특성 상, 여러 파티션이 여러 노드들에 저장되어있는 환경에서, 위와 같은 연산 시, 동일한 key를 가진 데이터가 동일한 파티션에 있어야합니다. 이를 위해 파티션의 위치를 재조정하는 것을 셔플이라 하며, spark 성능의 큰 영향을 차지합니다. 

 

3. shuffle partition: 셔플 과정에서 사용될 파티션의 수를 지정하는 방식

ex) spakr.conf.set("spark.sql.shuffle.partitions", 값)

groupBy, join과 같은 연산 시, 데이터가 셔플링될 파티션 수를 지정합니다.

 

다음은 파티셔닝의 종류입니다.

파티션은 가장 최소단위의 객첵, 단위이며, 파티셔닝이란 이러한 파티션을 나누기 위한 작업 메커니즘입니다.

1. 범위별 파티셔닝, Range Partitioning

특정 범위 내 키들을 범주화하고, 범위에 기반하여 데이터를 파티셔닝합니다.

 

2. Hash Partitioing

기본적으로 spark가 사용하는 파티셔닝 기법이며, 각 키에 해시코드를 할당, 이를 기반으로 파티션이 생성되어 모든 파티션들의 균등한 분산을 보장합니다. 

 

11. Spark Streaming과 Structured Streaming의 차이

두 스트리밍 모두 실시간 처리를 위한 방식입니다.

 

먼저 Spark Streaming은 RDD 기반입니다. 실시간으로 들어오는 데이터를 Micro Batch를 통해 처리하며, DStream으로 변환하는 과정을 거칩니다.

 

Spark Structured Streaming은 들어오는 데이터를 배치처리가 아닌, Data Stream에 계속 추가하며 저지연 시간처리가 가능합니다. 또한, Dataset, DataFrame 기반이기에 고수준의 API를 사용할 수 있고, 이벤트 시간 처리를 통해 지연된 데이터의 정확한 처리가 가능합니다.

 

12. foreach와 foreachbatch 차이

foreach: 사용자 정의 로직을 row 단위에 적용 가능합니다.

foreachbatch: 사용자 정의 로직을 micro batch 단위에 적용 가능합니다.

 

감사합니다 :D

반응형

+ Recent posts