이번 실습에서는 1주차에 간단히 JPA를 사용한 Spring boot 프로젝트에 Kafka를 연결하고 Producer 로 Kafka 서버의 Topic에 메세지를 보내고 Comsumer 설정으로 특정 Topic을 구독하고 있다가 Topic에 메세지가 들어오면 받아서 콘솔에 출력 하도록 해보자!
Index
- Producer & Consumer 설정을 위한 class 작성
- application.properties 에 property 작성
- Controller 작성 & 테스트 수행
Producer & Consumer 설정을 위한 class 작성
KafkaConfiguration.java (목적 : kafkaTemplate 클래스를 Bean 으로 등록하기 위한 class)
package com.example.study.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaConfiguration {
private Environment env;
@Autowired
KafkaConfiguration(Environment env) {
this.env = env;
}
public Map<String,Object> producerConfig() {
Map<String,Object> props = new HashMap<>();
//server host 지정
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
,env.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// retries 횟수
props.put(ProducerConfig.RETRIES_CONFIG
,env.getProperty(ProducerConfig.RETRIES_CONFIG));
//batcch size 지정
props.put(ProducerConfig.BATCH_SIZE_CONFIG
,env.getProperty(ProducerConfig.BATCH_SIZE_CONFIG));
// linger.ms
props.put(ProducerConfig.LINGER_MS_CONFIG
,env.getProperty(ProducerConfig.LINGER_MS_CONFIG));
//buufer memory size 지정
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG
,env.getProperty(ProducerConfig.BUFFER_MEMORY_CONFIG));
//key serialize 지정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
,StringSerializer.class);
//value serialize 지정
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
,StringSerializer.class);
return props;
}
public ProducerFactory<String,String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
|
cs |
해당 클래스의 용도는 IOC컨테이너에 org.springframework.kafka.core 페키지에 있는 KafkaTemplate 을 Bean 으로 등록하기 위한 설정 파일이라고 보면되겠다. 생성자를 제외하고 3개의 메서드를 만들어서 KafkaTemplate 을 Bean으로 등록 하는데 사용한다.
producerConfig() : 해당 메서드는 이름 그대로 설정 정보를 Map<String,Object> 형식으로 작성해서 return 하는 일을한다.
producerFactory() : 해당 메서드는 ProducerFactory를 return하는데 이때 위에서 만들어 놓은 producerConfig()의 설정 정보를 이용해서 ProducerFactory생성자를 호출한다.
kafkaTemplate() : 최종적으로 우리가 Controller에서 사용 할 KafkaTemplate을 IOC 컨테이너에 등록하는 일을 하게되고 이때 위에서 만들어 놓은 producerFactory() 를 호출해서 KafkaTemplate생성자 Parameter로 넘겨준다.
여기서 만들어 놓은 kafkaTemplate() 덕분에 우리가 앞으로 사용 할 HomeController에서 @Autowired 를 이용해서 kafkaTemplate Bean을 주입할 수 있다.
정리
KafkaTemplate : 클래스
ProducerFactory : 인터페이스
DefaultKafkaProducerFactory : KafkaResourceFactory를 상속받고 ProducerFactory, ApplicationContextAware, BeanNameAware, ApplicationListener, DisposableBean를 implements 한 클래스
1. KafkaTemplate클래스의 생성자를 호출하기 위해서 ProducerFactory인터페이스를 구현한 객체 DefaultKafkaProducerFactory가 필요하다.
2. DefaultKafkaProducerFactory의 생성자를 호출하기 위해서 Map<String,Object>의 설정 정보가 필요하기 때문에 producerConfig() 메서드를 만들어서 설정 정보를 return 하고 있다.
KafkaConsumerConfig.java (목적 : ConcurrentKafkaListenerContainerFactory클래스를 생성하고 ConsumerFactory인터페이스를 내부 멤버변수에 Set하고 Bean 으로 등록하기 위한 class)
package com.example.study.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private Environment env;
@Autowired
KafkaConsumerConfig(Environment env) {
this.env = env;
}
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,env.getProperty("bootstrap.servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG,"foo");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
|
cs |
ConsumerFactory() : 해당 메서드는 ConsumerFactory를 return하는데 이때 DefaultKafkaConsumerFactory 생성자로
설정정보를 만들어서 넣어준다.
kafkaListenerContainerFactory() : ConcurrentKafkaListenerContainerFactory 인스턴스 생성 후 setConsumerFactory(ConsumerFactory())를 이용해서 위에서 만들어 놓은 ConsumerFactory() 를 setting해서
IOC 컨테이너에 Bean으로 등록해 놓는다.
정리
ConcurrentKafkaListenerContainerFactory : 클래스 (멤버로 ConsumerFactory 인터페이스를 가지고 있다.)
ConsumerFactory : 인터페이스 ( Consumer관련 메서드를 가지고 있음 )
DefaultKafkaConsumerFactory : KafkaResourceFactory를 상속받고 ConsumerFactory, BeanNameAware 를 implements 하고있다.
application.properties 에 property 작성
본 실습에서는 properties 파일을 따로 생성하지 않고 기본으로 존재하는 application.properties에 작성해서 사용한다.
(혹시라도 다른 설정 파일을 사용 한다면 @PropertySource 어노테이션을 사용해서 Config클래스 파일 상단에 작성하고 파일명을 명시해 주면된다.)
bootstrap.servers=localhost:9092
retries=0
batch.size=4096
linger.ms=1
buffer.memory=40960
|
cs |
Controller 작성
HomeController (목적 : Http 요청으로 들어온 msg 를 kafkaTemplate 을 이용해서 Topic에 보내고 Listener를 이용해서 구독하고 있는 Topic에 전달된 메세지를 출력하기 위한 테스트 클래스)
package com.example.study.kafka;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class HomeController {
private static final DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final String topicName = "jjjwodls";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/kafka.do")
public void kafka(String msg) {
LocalDateTime date = LocalDateTime.now();
String dateStr = date.format(fmt);
kafkaTemplate.send(topicName, "8080 send " + dateStr + " : " + msg);
}
@KafkaListener(topics = topicName,groupId = "foo")
public void listen(String message) {
System.out.println("Received Msg jjjwodls " + message);
}
/**
* groupId 를 다르게 하여 내용을 보여주기 위해 일부로 다르게 설정하였음.
* @param headers header에 담긴 내용을 보여준다.
* @param payload 넘어온 문자열에 대해 보여줌.
*/
@KafkaListener(topics = topicName,groupId = "bar")
public void listen2(@Headers MessageHeaders headers,@Payload String payload ) {
System.out.println("=======================================");
System.out.println("Consume Headers : " + headers.toString());
System.out.println("=======================================");
System.out.println("PayLoad : " + payload);
System.out.println("=======================================");
}
}
|
cs |
kafkaTemplate : 우리가 만들어 놓은 Bean이 @Autowired를 통해서 DI된다.
kafka() : @RequestMapping의 선언된 "/kafka.do" 로 들어온 msg를 kafkaTemplate.send를 이용해서 Topic에 전달한다.
listen() : 구독하고 있는 Topic에 메세지가 들어오면 읽어서 출력한다.
listen2() : 구독하고 있는 Topic에 메세지가 들어오면 읽어서 출력한다.
listen() 과 listen2()는 서로 groupid가 다르기 때문에 같은 Topic을 구독하고 메세지가 들어오면 가져와서 출력한다.
테스트 수행
1. command로 Zookeeper와 Kafka를 실행한다. (저번 포스팅에서 그대로 수행한다.)
>> https://debaeloper.tistory.com/24
Kafka 설치 & Zookeeper 실행 ( window 10 )
들어가기 전 Kafka를 대부분 docker에 설치해서 사용을 한다고 하는데 이번 포스팅과 실습은 windows10 에 직접 설치하고 command를 이용해서 Producer & Consumer 테스트를 할 예정이다. 본 포스팅은 서버스
debaeloper.tistory.com
2. Spring boot 서버구동
구동을 하면 우리가 구독하고 있는 정보를 확인할 수있고
3. http 요청으로 Kafka서버의 Topic에 메세지 전달 및 Listener 출력확인하기
위 그림처럼 출력되면 테스트 완료!!
Kafka 책을 하나사서 제대로 공부하고 블로그 글도 작성해봐야 겠다. !!
'서버개발스터디 > 2주차 (Kafka)' 카테고리의 다른 글
#02 Kafka 설치 & Zookeeper 실행 ( window 10 ) (0) | 2021.05.19 |
---|---|
#01 Kafka / Zookeeper 알아보기 (0) | 2021.05.17 |