일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 백준 1043 거짓말 파이썬
- 역사 파이썬
- SQL SERVER MIGRATION
- 프로그래머스 등굣길
- 다중 컬럼 NOT IN
- 프로그래머스 순위 파이썬
- 반도체 설계 파이썬
- 프로그래머스 베스트앨범
- 백준 1167 트리의 지름 파이썬
- SQL SERVER 장비교체
- 프로그래머스 가장 긴 팰린드롬
- 백준 2146 다리 만들기
- 백준 1613 역사
- 가장 긴 팰린드롬 파이썬
- 백준 1238 파티 파이썬
- 램프 파이썬
- 순위 파이썬
- 다리 만들기 파이썬
- 프로그래머스 순위
- 트리의 지름 파이썬
- 게임 개발 파이썬
- 가장 긴 바이토닉 부분 수열 파이썬
- SWEA
- 백준 11054.가장 긴 바이토닉 부분 수열
- 백준 1516 게임 개발
- 백준 1034 램프 파이썬
- 등굣길 파이썬
- 프로그래머스 여행경로
- 백준 2352 반도체 설계 파이썬
- 베스트앨범 파이썬
Archives
- Today
- Total
공부, 기록
카프카 다중 컨슈머 설정 (multi kafka consumer config) 본문
개발 중 하나의 프로젝트에서 다양한 토픽을 통하여 컨슈머 처리를 해야하는 상황.
토픽끼리 받아오는 데이터의 deserialize 형식이 다른 경우라서 각기 다른 consumer factory가 필요하였다.
컨슈머는 @KafkaListener를 사용하여 처리 하였고 factory를 다르게 사용하는건 다음과 같이 처리 가능하였다.
1. consumer config 설정
//기본 kafka consuer factory 설정 (avro 파일 형식)
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(concurrency);
factory.setConsumerFactory(consumerFactory());
// enable batch listening
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWait);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //수동 commit
props.put("schema.registry.url", schemaRegistryUrl);
props.put("specific.avro.reader", "true");
return props;
}
//String 형식의 topic을 처리하는 consumer 설정
public ConsumerFactory<String, String> validConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(validConsumerConfigs());
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> validKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(concurrency);
factory.setConsumerFactory(validConsumerFactory());
// enable batch listening
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
return factory;
}
@Bean
public Map<String, Object> validConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWait);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //수동 commit
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put("schema.registry.url", schemaRegistryUrl);
return props;
}
2. consumer에서 설정 (필요한 컨슈머에 containerFactory 를 설정)
@KafkaListener(topics = "토픽명", groupId = "그룹명")
public void reprocessConsume(List<ConsumerRecord<String, avro파일.Envelope>> messeges) {
for (ConsumerRecord<String, avro파일.Envelope> messege : messeges) {
log.info(messege);
}
}
@KafkaListener(topics = "토픽명2", groupId = "그룹명2", containerFactory = "validKafkaListenerContainerFactory")
public void reprocessConsume(@Payload List<String> messages,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> messageKeys,
Acknowledgment acknowledgment) {
if (messages.size() > 0) {
for (String message : messages) {
log.info(message);
}
}
}
'공부 > 소소한 개발' 카테고리의 다른 글
Servlet, Filter, Interceptor, AOP (0) | 2021.09.12 |
---|---|
Spring Web MVC (0) | 2021.09.12 |
GET POST PUT DELETE OPTION PATCH (0) | 2021.09.04 |
JPA - 02 - 영속성 컨텍스트 (0) | 2021.08.21 |
JPA - 01 - 기본 개념 (0) | 2021.08.07 |