이번에는 아파치 스파크의 DataFrame, SQL을 사용해 클러스터, 스파크 애플리케이션 그리고 구조적 API를 살펴봅시다.
스파크의 기본 아키텍처
보통 컴퓨터로는 대규모 정보를 연산할 만한 자원이나 성능을 가지지 못합니다. 연산을 할 수 있다고 해도 완료하는 데 너무 많은 시간이 걸릴 수 있습니다. 컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용할 수 있게 만듭니다. 하지만 컴퓨터 클러스터를 구성하는 것만으로는 부족하며 클러스터에서 작업을 조율할 수 있는 프레임워크가 필요합니다. 스파크가 바로 그런 역할을 하는 프레임워크입니다. 스파크는 클러스터의 데이터 처리 작업을 관리하고 조율합니다.
스파크가 연산에 사용할 클러스터는 스파크 Standalone 클러스터 매니저, 하둡 YARN, Mesos 같은 클러스터 매니저에서 관리합니다. 사용자는 클러스터 매니저에 스파크 애플리케이션을 Submit 합니다. 이를 받은 클러스터 매니저는 애플리케이션 실행에 필요한 자원을 할당하며 우리는 할당받은 자원으로 작업을 처리합니다.
1. 스파크 애플리케이션
스파크 애플리케이션은 Driver 프로세스와 다수의 Executor 프로세르로 구성됩니다. 드라이버 프로세스는 클러스터 노드 중 하나에서 실행되며 main() 함수를 실행합니다. 이는 스파크 애플리케이션 정보의 유지 관리, 사용자 프로그램이나 입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포 그리고 스케줄링 역할을 수행하기 때문에 필수적입니다. 드라이버 프로세스는 스파크 애플리케이션의 심장과 같은 존재로서 애플리케이션의 수명 주기 동안 관련 정보를 모두 유지합니다.
익스큐터는 드라이버 프로세스가 할당한 작업을 수행합니다. 즉, 드라이버가 할당한 코드를 실행하고 진행 상황을 다시 드라이버 노드에 보고하는 두 가지 역할을 수행합니다.
위 그림은 클러스터 매니저가 물리적 머신을 관리하고 스파크 애플리케이션에 자원을 할당하는 방법을 나타냅니다. 클러스터 매니저는 스파크 Standalone 클러스터 매니저, 하둡 YARN, Mesos 중 하나를 선택할 수 있으며 하나의 클러스터에서 여러 개의 스파크 애플리케이션을 실행할 수 있습니다. 그림을 보면 왼쪽에 드라이버 프로세스가 있고 오른쪽에 네 개의 익스큐터가 있습니다.
스파크 애플리케이션을 이해하기 위한 핵심사항은 다음과 같습니다.
- 스파크는 사용가능한 자원을 파악하기 위해 클러스터 매니저를 사용
- 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있습니다.
익스큐터는 대부분 스파크 코드를 실행하는 역할을 합니다. 하지만 드라이버는 스파크의 언어 API를 통해 다양한 언어로 실행할 수 있습니다.
2. 스파크의 다양한 언어 API
스파크의 언어 API를 이용하면 다양한 프로그래밍 언어로 스파크 코드르 실행할 수 있습니다. 스파크는 모든 언어에 맞는 몇몇 핵심 개념을 제공합니다. 이러한 핵심 개념은 클러스터 머신에서 실행되는 스파크 코드로 변환됩니다. 구조적 API만으로 작성된 코드는 언어에 상관없이 유사한 성능을 발휘합니다. 다음은 언어별 정보입니다.
- Scala : 스파크는 스칼라로 개발되어 있으므로 스칼라가 스파크의 기본언어입니다.
- Java : 스파크가 스칼라로 개발되어 있지만, 스파크 창시자들은 자바를 이용해 스파크 코드를 작성할 수 있도록 심혈을 기울였습니다.
- Python : 파이썬은 스칼라가 지원하는 거의 모든 구조를 지원합니다.
- SQL : 스파크는 ANSL SQL:2003 표준 중 일부를 지원합니다.
- R : 스파크는 일반적으로 사용하는 두개의 R 라이브러리가 있습니다. 하나는 스파크 코어에 포함된 SparkR이고 다른 하나는 R 커뮤니티 기반 패키지인 sparklyr입니다.
위 그림은 SparkSession과 스파크의 언어 API 간의 관계를 나타냅니다. 사용자는 스파크 코드를 실행하기 위해 sparksession 객체를 진입점으로 사용할 수 있습니다. 파이썬이나 R로 스파크를 사용할 때는 JVM 코드를 명시적으로 작성하지 않습니다. 스파크는 사용자를 대신해 파이썬이나 R로 작성한 코드를 익스큐터의 JVM에서 실행할 수 있는 코드로 변환합니다.
3. 스파크 API
다양한 언어로 스파크를 사용할 수 있는 이유는 기본적으로 두 가지 API를 제공하기 때문입니다. 하나는 저수준의 비구조적 API이며, 다른 하나는 고수준의 구조적 API입니다.
4. SparkSession
실제 스파크 애플리케이션을 개발하려면 사용자 명령어와 데이터를 스파크 애플리케이션에 전송하는 방법을 알아야합니다. 대화형 모드로 스파크를 시작하면 애플리케이션을 관리하는 SparkSession이 자동으로 생성됩니다. 하지만 스탠드얼론 애플리케이션으로 스파크를 시작하면 사용자 애플리케이션 코드에서 SparkSession 객체를 직접 생성해야 합니다.
스파크 애플리케이션은 SparkSession이라 불리는 드라이버 프로세스로 제어합니다. SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행합니다. 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응합니다. 스칼라와 파이썬 콘솔을 시작하려면 spark 변수로 SparkSession을 사용할 수 있습니다.
5. DataFrame
DataFrame은 가장 대표적인 구조적 API입니다. DataFrame은 테이블의 데이터를 로우와 칼럼으로 단순하게 표현합니다. 칼럼과 칼럼의 타입을 정의한 목록을 스키마라고 부릅니다. DataFrame은 칼럼에 이름을 붙인 스프레드시트와 비슷하다고 생각할 수 있습니다. 하지만 스프레드시트는 하나의 컴퓨터에 있지만, 스파크 DataFrame은 수천 대의 컴퓨터에 분산되어 있습니다. 여러 컴퓨터에 데이터를 분산하는 이유는 단순합니다. 단일 컴퓨터에 저장하기에는 데이터가 너무 크거나 계산에 너무 오랜 시간이 걸릴 수 있기 때문입니다.
DataFrame은 스파크에서만 사용하는 개념이 아닙니다. 파이썬과 R 모두 비슷한 개념을 가지고 있습니다. 그러나 일반적으로 분산 컴퓨터가 아닌 단일 컴퓨터에 존재합니다. 이런 상황에서는 DataFrame으로 수행할 수 있는 작업이 해당 머신이 가진 자원에 따라 제한될 수 밖에 없습니다. 스파크는 파이썬과 R언어를 모두 지원하기 때문에 파이썬의 Pandas 라이브러리의 DataFrame과 R의 DataFrame을 스파크 DataFrame으로 쉽게 변환할 수 있습니다.
5.1 파티션
스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 청크 단위로 데이터를 분할합니다. 파티션은 클러스터의 물리적 머신에 존재하는 로우의 집합을 의미합니다. DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타냅니다. 만약 파티션이 하나라면 스파크에 수천 개의 익스큐터가 있더라도 병렬성은 1이 됩니다. 또한 수백 개의 파티션이 있더라도 익스큐터가 하나밖에 없다면 병렬성은 1이 됩니다.
DataFrame을 사용하면 파티션을 수동 혹은 개별적으로 처리할 필요가 없습니다. 물리적 파티션에 데이터 변환용 함수를 지정하면 스파크가 실제 처리 방법을 결정합니다. RDD 인터페이스를 이용하는 저수준 API 역시 제공됩니다.
6. 트랜스포메이션
스파크의 핵심 데이터 구조는 불변성을 가집니다. 즉, 한번 생성하면 변경할 수 없습니다. 처음엔 이상한 개념처럼 보일 수 있습니다. DataFrame을 변경하려면 원하는 변경 방법을 스파크에 알려줘야 합니다. 이때 사용하는 명령을 트랜스포메이션이라 부릅니다. 다음 코드는 DataFrame에서 짝수를 찾는 간단한 트랜스포메이션 예제입니다.
// Scala code
val divisBy2 = myRange.where("number % 2 = 0")
# Python code
divisBy2 = myRange.where("number % 2 = 0")
위 코드는 실행해도 결과는 출력되지 않습니다. 추상적인 트랜스포메이션이지만 지정한 상태이기 때문에 Action을 호출하지 않으면 스파크는 실제 트렌스포메이션을 수행하지 않습니다. 트랜스포메이션은 스파크에서 비즈니스 로직을 표현하는 핵심 개념입니다. 트랜스포메이션에는 두 가지 유형이 있습니다. 하나는 좁은 의존성(narrow dependency)이고 다른 하나는 넓은 의존성(wide dependency)입니다.
6.1 좁은 의존성(narrow dependency)
좁은 의존성을 가진 트랜스포메이션은 각 입력 파티션이 하나의 출력 파티션에만 영향을 미칩니다. 이전 코드 예제에서 where 구문은 좁은 의존성을 가집니다. 따라서 하나의 파티션이 하나의 출력 파티션에만 영향을 미칩니다.
6.2 넓은 의존성(wide dependency)
넓은 의존성을 가진 트랜스포메이션은 하나의 입력 파티션이 여러 출력 파티션에 영향을 미칩니다. 스파크가 클러스터에서 파티션을 교환하는 셔플(Shuffle)이라는 단어를 자주 들었을 겁니다. 좁은 의존성을 사용하면 스파크에서 파이프라이닝(pipelining)으로 자동으로 수행합니다. 즉 DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 일어납니다. 하지만 셔플은 다른 방식으로 동작합니다. 스파크는 셔플의 결과를 디스크에 저장합니다. 아래는 넓은 의존성을 보여줍니다.
6.3 지연 연산 (lazy evaluation)
지연 연산(lazy evaluation)은 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식을 의미합니다. 스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성합니다. 스파크는 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일합니다. 스파크는 이 과정을 거치며 전체 데이터 흐름을 최적화하는 엄청난 강점을 가지고 있습니다. DataFrame의 조건절 푸시다운(Predicate pushdown)이 한 예가 될 수 있습니다. 아주 부족한 스파크 잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있다면 필요한 레코드 하나만 읽는 것이 가장 효율적입니다. 스파크는 이 필터를 데이터 소스로 위임하는 최적화 작업을 자동으로 수행합니다.
7. 액션 (Action)
사용자는 트랜스포메이션을 사용해 논리적 실행 계획을 세울 수 있습니다. 하지만 실제 연산을 수행하려면 액션 명령을 내려야 합니다. 액션은 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령입니다. 가장 단순한 액션인 count 메서드는 DataFrame의 전체 레코드수를 반환합니다.
divisBy2.count()
count 외에도 다음 세 가지 유형의 액션이 있습니다.
- 콘솔에서 데이터를 보는 액션
- 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
- 출력 데이터소스에 저장하는 액션
액션을 지정하면 스파크 잡이 시작됩니다. 스파크 잡은 필터(좁은 트랜스포메이션)를 수행한 후 파티션 별로 레코드 수를 카운트(넓은 트랜스포메이션)합니다. 그리고 각 언어에 적합한 네이티브 객체에 결과를 모읍니다. 이때 스파크가 제공하는 스파크 UI로 클러스터에서 실행 중인 스파크 잡을 모니터링할 수 있습니다.
8. 스파크 UI
스파크 UI는 스파크 잡의 진행 상황을 모니터링할 때 사용합니다. 스파크 UI는 드라이버 노드의 4040 포트로 접속할 수 있습니다. 로컬 모드에서 실행했다면 스파크 UI의 주소는 http:/localhost:4040입니다. 스파크 UI에서 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보를 확인할 수 있습니다. 스파크 UI는 스파크 잡을 튜닝하고 디버깅할 때 매우 유용합니다.
REFERENCE
해당 포스팅의 레퍼런스는 "스파크 완벽 가이드" 저자 빌 체임버스, 마테이 자하리아 입니다.