이번 포스팅을 통해서 컨슈머의 커밋과 오프셋에 대해서 알아보도록 하겠습니다.
카프카가 다른 메시지 큐 솔류션과 차별화되는 특징은 하나의 토픽에 대해 여러 용도로 사용할 수 있다는 점입니다. 일반적인 큐 솔로션은 특정 컨슈머가 메시지를 가져가면 큐에서 메시지가 삭제되어 다른 컨슈머는 가져갈 수 없는데 카프카는 컨슈머가 메시지를 가져가도 삭제하지 않습니다. 이런 특징을 이용해서 하나의 메시지를 여러 컨슈머가 다른 용도로 사용할 수 있도록 시스템을 구성할 수 있습니다.
예를 들어 서비스A팀에서 로그 메시지를 토픽으로 보내고 서비스 A팀은 컨슈머 그룹 01을 이용하여 메시지를 처리하고 잇습니다. 얼마 후 B팀이 신규 프로젝트를 진행되면서 로그 메시지들이 필요하게 되었습니다. 이런 경우 A팀이 직접 보내줘도 되지만 카프카 클러스터 내에 토픽에 컨슈머 그룹 02를 연결해 가져올 수 있습니다. 이렇게 여러 컨슈머 그룹들이 하나의 토픽에서 메시지를 가져올 수 있는 이유는 컨슈머 그룹마다 각자의 오프셋을 별도로 관리하기 때문에 하나의 토픽에 두 개의 컨슈머 그룹뿐만 아니라 더 많은 컨슈머 그룹이 연결되어도 다른 컨슈머 그룹에게 영향 없이 메시지를 가져갈 수 있기 때문입니다.
커밋과 오프셋
컨슈머가 poll()을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽지 않은 메시지를 가져옵니다. 이렇게 동작하는 이유는 컨슈머 그룹이 메시지를 가져갔는지 알 수 있기 때문입니다. 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(=오프셋)를 기록하고 있습니다. 각 파티션에 대해 현재 위치를 업데이트하는 동작을 커밋한다고 합니다.
카프카는 각 컨슈머 그룹의 파티션별로 오프셋 정보를 저장하기 위한 저장소가 별도로 필요합니다. 이전 카프카에서는 오프셋 정보를 주키퍼에 저장했으며 뉴 카프카에서는 카프카 내에 별도로 내부에서 사용하는 토픽(__consumer_offsets)을 만들고 그 토픽에 오프셋 정보를 저장하고 있습니다.
카프카 컨슈머 그룹내에 컨슈머 하나가 다운되는 경우 리벨런스된다고 이전 포스팅에서 말씀드렸습니다. 리벨런스하는 과정에서 메시지를 가져오지 않고 일시정지 후 다시 메시지를 가져올 때 최근 커밋된 오프셋을 읽고 그 이후부터 메시지들을 가져오기 시작합니다. 만약 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메시지는 중복으로 처리되고, 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 크면 마지막 처리된 오프셋과 커밋된 오프셋 사이의 모든 메시지는 누락됩니다. 이렇게 커밋은 매우 중요하고, 카프카에서는 여러 가지 방법을 제공해줍니다.
1. 자동 커밋
오프셋을 직접 관리하는 방법도 있지만, 각 파티션에 대한 오프셋 정보 관리, 파티션 변경에 대한 관리 등이 매우 번거로울 수 있습니다. 그래서 카프카에서는 직접 관리하지 않는 방법인 자동 커밋을 지원합니다. 이 방법은 가장 많이 사용하고 있습니다. 이제 방식에 대해서 알아보도록 하겠습니다.
자동 커밋을 사용하고 싶을 때는 컨슈머 옵션 중 enable.auto.commit=true 로 설정하면 5초마다 컨슈머는 poll()를 호출할 때 가장 마지막 오프셋을 커밋합니다. 5초는 기본값이며, auto.commit.interval.ms 옵션을 통해 조정이 가능합니다. 컨슈머는 poll을 요청할 때마다 커밋할 시간인지 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋합니다.
위 사진은 자동커밋에 대한 예제로 파티션 01로부터 컨슈머 01이 메시지를 가져오고 있고, 5초 주기로 컨슈머는 메시지의 오프셋을 자동 커밋하고 있습니다. 컨슈머는 한 번의 poll을 이용해 메시지를 2개씩 가져오고 있습니다. 처음의 poll로 1, 2 메시지를 가져오게 되었고, 5초가 되면서 오프셋 2를 커밋한 다음, 메시지 3, 4를 가져온 후 5초가 지나 오프셋 4를 커밋했습니다.
자동 커밋은 편리한 기능이지만 주의할 부분도 있습니다. 만약 커밋 해야 하는 5초가 되기 전인 마지막 커밋 이후 3초가 지나 리벨런스가 일어나면 어떻게 될까요?
위 사진은 메시지 5를 컨슈머 01에게 보내다가 컨슈머 02가 추가되면서 리벨런스된 상황입니다. 파티션 0번에 대한 마지막 커밋은 4로 되어 있기 때문에 컨슈머 02는 메시지 5, 6을 가져오게 됩니다. 하지만 메시지 5, 6는 컨슈머 01에 이미 가져왔던 메시지로 중복되게 됩니다. 만약 중복을 줄이기 위해 자동 커밋의 시간을 줄일 수 있지만 중복을 완벽하게 제거하는 것은 불가능 합니다.
이렇게 자동 커밋은 매우 편리하지만, 중복 등이 발생할 수 있기 때문에 동작에 대해 완벽하게 이해하고 사용하는 것이 중요합니다.
수동 커밋
경우에 따라 자동 커밋이 아닌 수동 커밋을 사용해야 하는 경우도 있는데, 이러한 경우는 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안 되는 경우에 사용합니다. 예를 들어, 컨슈머가 메시지를 가져와서 데이터베이스에 메시지를 저장한다고 가정하겠습니다. 만약 자동 커밋을 사용하는 경우라면 자동 커밋의 주기로 인해 poll 하면서 마지막 값의 오프셋으로 자동 커밋이 되었고, 일부 메시지들은 데이터베이스에는 저장하지 못한 상태로 컨슈머 장애가 발생한다면 해당 메시지들은 손실될 수도 있습니다. 이러한 경우를 방지하기 위해 컨슈머가 메시지를 가져오자마자 커밋을 하는 것이 아니라, 데이터베이스에 메시지를 저장한 후 커밋을 해야만 안전합니다. 이처럼 수동 커밋은 메시지를 가져온 것으로 간주되는 시점을 자유롭게 조정할 수 있는 장점이 있습니다.
하지만 수동 커밋의 경우에도 중복이 발생할 수 있습니다. 메시지들을 데이터베이스에 저장하는 도중에 실패하게 된다면, 마지막 커밋된 오프셋부터 메시지를 다시 가져오기 때문에 일부 메시지들은 데이터베이스에 중복으로 저장될 수 있습니다. 이렇게 카프카에서 메시지는 한번씩 전달되지만 장애 등의 이유로 중복이 발생할 수 있기 때문에 카프카는 적어도 한 번을 보장합니다.
특정 파티션 할당
지금까지 컨슈머 그룹의 컨슈머들에게 직접 파티션을 공평하게 분배하게 했습니다. 하지만 일부 경우에는 특정 파티션에 대해 세밀하게 제어하기를 원할 수 있습니다.
- key-value 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지들만 가져와야 하는 경우
- 컨슈머 프로세스가 가용성이 높은 구성인 경우, 카프카가 컨슈머의 실패를 감지하고 재조정할 필요 없고 자동으로 컨슈머 프로세스가 다른 시스템에서 재시작되는 경우
이렇게 특정 파티션을 할당해 메시지를 가져오는 경우, 컨슈머 인스턴스마다 컨슈머 그룹 아이디를 서로 다르게 설정해야 합니다. 동일한 컨슈머 그룹 아디를 설정하게 되면, 컨슈머마다 할당된 파티션에 대한 오프셋 정보를 서로 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되거, 오프셋을 커밋하게 됩니다. 결국 원치 않는 형태로 동작을 할 수 있습니다.
특정 오프셋부터 메시지 가져오기
카프카의 컨슈머 API를 사용하게 되면 메시지 중복 처리 등의 이유로 경우에 따라 오프셋 관리를 수동으로 하는 경우가 존재합니다. 이러한 경우에는 수동으로 어디서부터 메시지를 읽어올지를 지정하는데 이때 seek() 메소드를 사용하면 됩니다.
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
consumer.seek(partition0, 2); ## 파티션0의 2번 오프셋부터 메시지 가져오기
consumer.seek(partition1, 2); ## 파티션1의 2번 오프셋부터 메시지 가져오기
while(true){
ConsumerRecodrds<String, String> records = consumer.poll(100);
....
[REFERENCE]
해당 글의 모든 레퍼런스는 "카프카, 데이터 플랫폼의 최강자" (고승범, 공용준 지음)을 알립니다.
"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."