Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- 새로운 방
- 스택
- 1037
- 코드그라운드
- JAVA #필수값
- Queue
- 정올
- 태그를 입력해 주세요.
- 암스트롱 수
- sql #오라클 #oracle #sequence #foreach #insert #mybatis
- Floyd 알고리즘
- 페이지전환
- 페이지 전환
- 2613
- 오류교정
- 새로운방
- 큐
- kafka #ackmode #manual #acknowledge
- kafka connect #debizium #transform
- Floyd
- 알고리즘
- kafka #consumer #autoStartup
- 1108
- hexagonal architecture #layer architecture #아키텍쳐 #헥사고날
- 1045
- 최단거리
- 토마토(고)
- vue
- 김씨만행복한세상
- maven #메이븐 #빌드 #build #lifecycle
Archives
- Today
- Total
별집사의 IT세상
[Kafka] Kafka Config Ackmode 본문
반응형
BATCH | Commit the offsets of all records returned by the previous poll after they all have been processed by the listener. |
COUNT | Commit pending offsets after ackCount has been exceeded. |
COUNT_TIME | Commit pending offsets after ackCount has been exceeded or after ackTime has elapsed. |
MANUAL | Listener is responsible for acking - use a AcknowledgingMessageListener; acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener. |
MANUAL_IMMEDIATE | Listener is responsible for acking - use a AcknowledgingMessageListener; the commit will be performed immediately if the Acknowledgment is acknowledged on the calling consumer thread; otherwise, the acks will be queued and offsets will be committed when all the records returned by the previous poll have been processed by the listener; results will be indeterminate if you sometimes acknowledge on the calling thread and sometimes not. |
RECODE | Commit the offset after each record is processed by the listener. |
TIME | Commit pending offsets after ackTime has elapsed. |
AckMode 설정 없이 ENABLE_AUTO_COMMIT_CONFIG = true로 하니
데이터 중복 등의 토픽 메세지 실패 시에 커밋이 안되고 무한루프 되는 상황이 발생.
AckMode를 MANUAL로 돌리고 setRecoveryCallback설정을 통해 실패 로직 처리를 하고나니 이번에는 성공시에 커밋이 안되는 상황이 발생..
구글링 해보니 MANUAL, MANUAL_IMMEDIATE는 AcknowledgingMessageListener 혹은 BatchAcknowledgingMessageListener를 사용해야한다고 하여 현재 프로젝트에 맞추려고 노력해보았지만 잘 안되었다..
혹시나하고 리스너의 마지막 단계에 acknowledgment.acknowledge()를 넣어주니 성공하였다.
@Override
@KafkaListener(id = "아이디",
topics = "토픽명",
groupId = "sampleGroup",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "false")
public void registerEmp(List<Map<String, Object>> reqList, Acknowledgment acknowledgment) throws Exception {
for( int i=0 ; i< reqList.size(); i++) {
Map<String, Object> reqMap = new HashMap<String, Object>();
reqMap = reqList.get(i);
//업무 로직 작성
acknowledgment.acknowledge();
}
}
테스트 중 추가적인 문제가 없는지는 확인해봐야 알 것 같다.
반응형
'IT > Kafka' 카테고리의 다른 글
[Kafka] kafka connect transform issue 정리 (1) | 2022.09.07 |
---|---|
[JAVA] kafka sub 리스너 자동 실행 방법 (0) | 2022.09.05 |
Comments