본문 바로가기

서버개발스터디/2주차 (Kafka)

#03 Spring boot & Kafka 연동하기

이번 실습에서는 1주차에 간단히 JPA를 사용한 Spring boot 프로젝트에 Kafka를 연결하고 Producer 로 Kafka 서버의 Topic에 메세지를 보내고 Comsumer 설정으로 특정 Topic을 구독하고 있다가 Topic에 메세지가 들어오면 받아서 콘솔에 출력 하도록 해보자! 

Index

  • Producer & Consumer 설정을 위한 class 작성
  • application.properties 에 property 작성
  • Controller 작성 & 테스트 수행

Producer & Consumer 설정을 위한 class 작성

KafkaConfiguration.java (목적 : kafkaTemplate 클래스를 Bean 으로 등록하기 위한 class)

package com.example.study.kafka;
 
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
 
@Configuration
public class KafkaConfiguration {
    
    private Environment env;
    
    @Autowired
    KafkaConfiguration(Environment env) {
        this.env = env;
    }
    
    public Map<String,Object> producerConfig() {
        
        Map<String,Object> props = new HashMap<>();
        
        //server host 지정
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
                 ,env.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        
        // retries 횟수
        props.put(ProducerConfig.RETRIES_CONFIG
                 ,env.getProperty(ProducerConfig.RETRIES_CONFIG));
        
        //batcch size 지정
        props.put(ProducerConfig.BATCH_SIZE_CONFIG
                 ,env.getProperty(ProducerConfig.BATCH_SIZE_CONFIG));
        
        // linger.ms 
        props.put(ProducerConfig.LINGER_MS_CONFIG
                 ,env.getProperty(ProducerConfig.LINGER_MS_CONFIG));
        
        //buufer memory size 지정
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG
                 ,env.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG));
        
        //key serialize 지정
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
                 ,StringSerializer.class);
        
        //value serialize 지정
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
                 ,StringSerializer.class);
        
        return props;
    }
    
    public ProducerFactory<String,String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }
    
    @Bean
    public KafkaTemplate<StringString> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}
 
cs

 

해당 클래스의 용도는 IOC컨테이너에 org.springframework.kafka.core 페키지에 있는 KafkaTemplate 을 Bean 으로 등록하기 위한 설정 파일이라고 보면되겠다. 생성자를 제외하고 3개의 메서드를 만들어서 KafkaTemplate 을 Bean으로 등록 하는데 사용한다.

 

producerConfig() :  해당 메서드는 이름 그대로 설정 정보를 Map<String,Object> 형식으로 작성해서 return 하는 일을한다.

 

producerFactory() : 해당 메서드는 ProducerFactory를 return하는데 이때 위에서 만들어 놓은 producerConfig()의 설정 정보를 이용해서 ProducerFactory생성자를 호출한다.

 

kafkaTemplate() : 최종적으로 우리가 Controller에서 사용 할 KafkaTemplate을 IOC 컨테이너에 등록하는 일을 하게되고 이때 위에서 만들어 놓은 producerFactory() 를 호출해서 KafkaTemplate생성자 Parameter로 넘겨준다.

여기서 만들어 놓은 kafkaTemplate() 덕분에 우리가 앞으로 사용 할 HomeController에서 @Autowired 를 이용해서 kafkaTemplate Bean을 주입할 수 있다.

 

 

정리 

KafkaTemplate : 클래스

ProducerFactory : 인터페이스 

DefaultKafkaProducerFactory : KafkaResourceFactory를 상속받고 ProducerFactory, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean를 implements 한 클래스

 

1. KafkaTemplate클래스의 생성자를 호출하기 위해서 ProducerFactory인터페이스를 구현한 객체 DefaultKafkaProducerFactory가 필요하다.

2. DefaultKafkaProducerFactory의 생성자를 호출하기 위해서 Map<String,Object>의 설정 정보가 필요하기 때문에 producerConfig() 메서드를 만들어서 설정 정보를 return 하고 있다.

 

 

KafkaConsumerConfig.java (목적 : ConcurrentKafkaListenerContainerFactory클래스를  생성하고 ConsumerFactory인터페이스를 내부 멤버변수에 Set하고 Bean 으로 등록하기 위한 class)

package com.example.study.kafka;
 
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
 
    private Environment env;
    
    @Autowired
    KafkaConsumerConfig(Environment env) {
        this.env = env;
    }
    
    public ConsumerFactory<StringString> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,env.getProperty("bootstrap.servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"foo");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<StringString> kafkaListenerContainerFactory() {
 
        ConcurrentKafkaListenerContainerFactory<StringString> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
    
}
 
cs

 

ConsumerFactory() : 해당 메서드는 ConsumerFactory를 return하는데 이때 DefaultKafkaConsumerFactory 생성자로

설정정보를 만들어서 넣어준다. 

 

kafkaListenerContainerFactory() : ConcurrentKafkaListenerContainerFactory 인스턴스 생성 후 setConsumerFactory(ConsumerFactory())를 이용해서 위에서 만들어 놓은 ConsumerFactory() 를 setting해서 

IOC 컨테이너에 Bean으로 등록해 놓는다.

 

정리 

ConcurrentKafkaListenerContainerFactory : 클래스 (멤버로 ConsumerFactory 인터페이스를 가지고 있다.)

ConsumerFactory : 인터페이스 ( Consumer관련 메서드를 가지고 있음 )

DefaultKafkaConsumerFactory : KafkaResourceFactory를 상속받고 ConsumerFactory, BeanNameAware 를 implements 하고있다.

 

application.properties 에 property 작성

본 실습에서는 properties 파일을 따로 생성하지 않고 기본으로 존재하는 application.properties에 작성해서 사용한다.

(혹시라도 다른 설정 파일을 사용 한다면 @PropertySource 어노테이션을 사용해서 Config클래스 파일 상단에 작성하고 파일명을 명시해 주면된다.)

 

 
bootstrap.servers=localhost:9092
retries=0
batch.size=4096
linger.ms=1
buffer.memory=40960
cs

Controller 작성 

 

HomeController (목적 :  Http 요청으로 들어온 msg 를 kafkaTemplate 을 이용해서 Topic에 보내고 Listener를 이용해서 구독하고 있는 Topic에 전달된 메세지를 출력하기 위한 테스트 클래스)

package com.example.study.kafka;
 
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
 
@Controller
public class HomeController {
    
    
    private static final DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
    private static final String topicName = "jjjwodls";
    
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    
    @RequestMapping("/kafka.do")
    public void kafka(String msg) {
        LocalDateTime date = LocalDateTime.now();
        String dateStr = date.format(fmt);
        kafkaTemplate.send(topicName, "8080 send " + dateStr + " : " + msg);
    }
    
    @KafkaListener(topics = topicName,groupId = "foo")
    public void listen(String message) {
        System.out.println("Received Msg jjjwodls " + message);
    }
    
    /**
     * groupId 를 다르게 하여 내용을 보여주기 위해 일부로 다르게 설정하였음.
     * @param headers header에 담긴 내용을 보여준다.
     * @param payload 넘어온 문자열에 대해 보여줌.
     */
    @KafkaListener(topics = topicName,groupId = "bar")
    public void listen2(@Headers MessageHeaders headers,@Payload String payload ) {
        System.out.println("=======================================");
        System.out.println("Consume Headers : " + headers.toString());
        System.out.println("=======================================");
        System.out.println("PayLoad : " + payload);
        System.out.println("=======================================");
    }
    
}
 
cs

kafkaTemplate : 우리가 만들어 놓은 Bean이 @Autowired를 통해서 DI된다.

 

kafka() : @RequestMapping의 선언된 "/kafka.do" 로 들어온 msg를 kafkaTemplate.send를 이용해서 Topic에 전달한다.

listen() : 구독하고 있는 Topic에 메세지가 들어오면 읽어서 출력한다.

listen2() : 구독하고 있는 Topic에 메세지가 들어오면 읽어서 출력한다.

listen() 과 listen2()는 서로 groupid가 다르기 때문에 같은 Topic을 구독하고 메세지가 들어오면 가져와서 출력한다.

 

 

테스트 수행

1. command로 Zookeeper와 Kafka를 실행한다. (저번 포스팅에서 그대로 수행한다.)

   >>  https://debaeloper.tistory.com/24

 

Kafka 설치 & Zookeeper 실행 ( window 10 )

들어가기 전 Kafka를 대부분 docker에 설치해서 사용을 한다고 하는데 이번 포스팅과 실습은 windows10 에 직접 설치하고 command를 이용해서 Producer & Consumer 테스트를 할 예정이다. 본 포스팅은 서버스

debaeloper.tistory.com

 

2. Spring boot 서버구동

구동을 하면 우리가 구독하고 있는 정보를 확인할 수있고 

 

스프링 부트 서버구동 Listener 확인

 

3. http  요청으로 Kafka서버의 Topic에 메세지 전달 및 Listener 출력확인하기

 

listen과 listen2 의 출력결과 

 

 

위 그림처럼 출력되면 테스트 완료!! 

 

Kafka 책을 하나사서 제대로 공부하고 블로그 글도 작성해봐야 겠다. !!