kafka consumer 예제

# bin/kafka-consumer-group.sh–new-consumer–describe –group 소비자-자습서 그룹–부트스트랩-서버 localhost:9092 오프셋 커밋 실패는 실제로 중복 읽기를 발생 하지 않기 때문에 다음 커밋이 성공 하는 경우 성가신. 그러나 재조정이 발생하기 전이나 소비자가 종료되기 전에 마지막 커밋이 실패하면 오프셋이 마지막 커밋으로 재설정되고 중복이 표시될 수 있습니다. 따라서 일반적인 패턴은 폴링 루프의 비동기 커밋을 재조정 또는 종료시 동기화 커밋과 결합하는 것입니다. 가까이 커밋하는 것은 간단하지만 재균형에 연결하는 방법이 필요합니다. 이를 위해 앞에서 소개한 subscribe() 메서드에는 재조정 동작에 연결하는 두 가지 방법이 있는 ConsumerRebalanceListener를 허용하는 변형이 있습니다. 둘째, auto.offset.reset을 사용하여 커밋된 위치가 없거나(그룹이 처음 초기화될 때) 또는 오프셋이 범위를 벗어난 경우 소비자의 동작을 정의합니다. 위치를 „가장 빠른” 오프셋 또는 „최신” 오프셋(기본값)으로 재설정하도록 선택할 수 있습니다. 초기 오프셋을 직접 설정하고 범위를 벗어난 오류를 수동으로 처리하려는 경우 „none”을 선택할 수도 있습니다. 각 그룹의 코디네이터는 커밋된 오프셋을 저장하는 데 사용되는 내부 오프셋 항목 __consumer_offsets의 리더에서 선택됩니다. 기본적으로 그룹의 ID는 이 항목의 파티션 중 하나에 해시되고 해당 파티션의 리더가 코디네이터로 선택됩니다. 이러한 방식으로 소비자 그룹의 관리는 클러스터의 모든 브로커에 대해 균등하게 균등하게 분할되어 브로커 수를 늘려 그룹 수를 확장할 수 있습니다. 이 예제에서는 commitSync 호출 주위에 try/catch 블록이 추가됩니다.

CommitFailedException 그룹 재조정 으로 커밋을 완료할 수 없을 때 throw 됩니다. 이것은 Java 클라이언트를 사용할 때 주의해야 할 중요한 사항입니다. 모든 네트워크 IO(하트비트 포함) 및 메시지 처리가 포그라운드에서 수행되므로 메시지 일괄 처리가 처리되는 동안 세션 시간 시간이 만료될 수 있습니다. 이 문제를 처리하려면 두 가지 선택 사항이 있습니다. Kafka는 소비자 그룹의 상태를 보기위한 관리 유틸리티를 포함한다. 소비자 그룹을 처음 만들 때 초기 오프셋은 auto.offset.reset 구성 설정에 의해 정의된 정책에 따라 설정됩니다. 소비자가 처리를 시작하면 응용 프로그램의 요구에 따라 정기적으로 오프셋을 커밋합니다. 이후의 모든 재조정 후 위치는 그룹의 해당 파티션에 대해 마지막으로 커밋된 오프셋으로 설정됩니다. 성공적으로 처리된 메시지에 대한 오프셋을 커밋하기 전에 소비자가 충돌하면 다른 소비자가 작업을 반복하게 됩니다. 오프셋을 커밋하는 빈도가 많을수록 충돌시 중복이 줄어듭니다.

지금까지의 예제에서는 자동 커밋 정책이 활성화되어 있다고 가정했습니다. 설정 enable.auto.commit가 true로 설정되면(기본값인 경우) 소비자는 „auto.commit.interval.ms”로 구성된 간격에 따라 오프셋 커밋을 주기적으로 트리거합니다. 커밋 간격을 줄이면 충돌이 발생할 때 소비자가 수행해야 하는 재처리 양을 제한할 수 있습니다. 소비자의 커밋 API를 사용하려면 먼저 소비자 구성에서 false에 enable.auto.commit을 설정하여 자동 커밋을 사용하지 않도록 설정해야 합니다.