공부, 기록

카프카 다중 컨슈머 설정 (multi kafka consumer config) 본문

공부/소소한 개발

카프카 다중 컨슈머 설정 (multi kafka consumer config)

무는빼주세요 2021. 9. 4. 15:04

개발 중 하나의 프로젝트에서 다양한 토픽을 통하여 컨슈머 처리를 해야하는 상황.

토픽끼리 받아오는 데이터의 deserialize 형식이 다른 경우라서 각기 다른 consumer factory가 필요하였다.

 

컨슈머는 @KafkaListener를 사용하여 처리 하였고 factory를 다르게 사용하는건 다음과 같이 처리 가능하였다.

 

1. consumer config 설정

//기본 kafka consuer factory 설정 (avro 파일 형식)
@Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(concurrency);
        factory.setConsumerFactory(consumerFactory());
        // enable batch listening
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWait);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);     //수동 commit
        props.put("schema.registry.url", schemaRegistryUrl);
        props.put("specific.avro.reader", "true");
        return props;
    }

//String 형식의 topic을 처리하는 consumer 설정
    public ConsumerFactory<String, String> validConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(validConsumerConfigs());
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> validKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(concurrency);
        factory.setConsumerFactory(validConsumerFactory());
        // enable batch listening
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
        return factory;
    }

    @Bean
    public Map<String, Object> validConsumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  StringDeserializer.class);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWait);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);     //수동 commit
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put("schema.registry.url", schemaRegistryUrl);
        return props;
    }

 

2. consumer에서 설정 (필요한 컨슈머에 containerFactory 를 설정)

@KafkaListener(topics = "토픽명", groupId = "그룹명")
public void reprocessConsume(List<ConsumerRecord<String, avro파일.Envelope>> messeges) {
        for (ConsumerRecord<String, avro파일.Envelope> messege : messeges) {
			log.info(messege);
        }
}
   

@KafkaListener(topics = "토픽명2", groupId = "그룹명2", containerFactory = "validKafkaListenerContainerFactory")
public void reprocessConsume(@Payload List<String> messages,
							 @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                             @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> messageKeys,
                             Acknowledgment acknowledgment) {
        if (messages.size() > 0) {
            for (String message : messages) {
            	log.info(message);
            }
        }
	}

'공부 > 소소한 개발' 카테고리의 다른 글

Servlet, Filter, Interceptor, AOP  (0) 2021.09.12
Spring Web MVC  (0) 2021.09.12
GET POST PUT DELETE OPTION PATCH  (0) 2021.09.04
JPA - 02 - 영속성 컨텍스트  (0) 2021.08.21
JPA - 01 - 기본 개념  (0) 2021.08.07