걍 로컬환경에서 Kafka 구축해서 테스트해봤는데 잘 안됐었다.
그리고 java로 컨슈머랑 프로듀셔도 만들어서 직접 데이터 전송되는걸 확인해보려고 했다...
하지만 java를 그냥 쓰기엔 흐지부지되어버렸었다.
그래서 이왕 이렇게 된 겸 intellij도 설치해봤다.
아 그리고 intellij는 커뮤니티 말고 utlimate버전으로(학생인증받아서)
그리고 이렇게 된겸 간단한게 spring boot 써보려고 한다!!
1. intellij 환경설정
1) 프로젝트 생성
java IDE는 너무 오랜만이다...맨날 이클립스만 써봤는데 intellij도 처음이라 새롭다..
인텔리제이에서 프로젝트생성 = 이클립스의 workspace
인텔리제이에서 모듈생성 = 이클립스의 project
2) 모듈생성
즉, 프로젝트 생성(kafakProject)-모듈추가(Group : KafkaSpringTest)-SDK 지정(java sdk)-spring boot환경지정
3) 의존성 선택
Spring boot 2.4.4 버전의 의존성이다. 나같은 경우 참고한 spring boot 버전이 2.0.2라서 비슷한거 찾아옴
-DevTools
-Spring Web
-Spring for Apache Kafka
-Spring for Apache Kafka Stream
4) 준비완료
이제 프로트(kafkaproject)안의 모듈(Test01)안에 카프카 관련 라이브러리가 다 설치되면 준비완료
2. 카프카 서버 실행(Local : Windows)
1) 로컬에서 zookeeper server 실행
C:\Kafka\kafka_>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
2) 로컬에서 kafka server 실행 (다른cmd창)
C:\Kafka\kafka_>bin\windows\kafka-server-start.bat config\server.properties
3) topic 생성(window powershell)
create topic mytopic나오면 정상적으로 생성된것
그리고 테스트용으로 로컬에 카프카브로커를 하나 설치한거라 replication factor는 1
3. API 코드작성
원하는 목표는 웹에서 파라미터로 호출한 값들이 kafka로 실시간으로 전달하는 것이다.
상상하자면 api로 호출한 값이 kafka 컨슈머에서 받는거!
1) Kafka.properties : 카프카 기본설정 속성
일단 카프카서버가 로컬에 있으니까 bootstrap.server=localhost로 지정하는듯!
2) 프로젝트구조
Kafka API를 만드는 것이기때문에 컨슈머, 프로듀셔 클래스가 아니다. 굳이 의미하자면 '프로듀셔'를 만드는 것일것이다
그리고 프로듀셔로서 데이터를 만들어 보낼 수 있게 하는 controller.
3) 클래스 생성 - KafakConfiguration.java
package kafkaspringtest.demo.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
//@PropertySource("classpath:kafka.properties")
@EnableKafka
public class KafkaConfiguration {
@Autowired
private Environment env;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// server host 및 port 지정
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// key serialize 지정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value serialize 지정
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
// Bean을 통하여 의존성 주입
return new KafkaTemplate<String, String>(producerFactory());
}
}
3) 클래스 생성 - HomeController.java
package kafkaspringtest.demo.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@RestController
public class HomeController {
private static final DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// KafkaConfiguration에서 작성한 Bean 주입.
@Autowired
KafkaTemplate kafkaTemplate;
/**
* /get?message=value 형태로 접근할 수 있도록 api 작성
* @param message
* @return
*/
@RequestMapping(value="/get")
public String getData(@RequestParam(value = "message", required = true, defaultValue = "") String message ){
// 현재 시간
LocalDateTime date = LocalDateTime.now();
String dateStr = date.format(fmt);
// mytopic에 현재 시간 + message를 produce 한다.
kafkaTemplate.send("mytopic", dateStr + " " + message);
return "kafkaTemplate.send >> " + message ;
}
}
4. 완료(03.29)
즉, localhost:8080/get?message= 형식의 웹에서 message의 파라미터값을 데이터로 생산한다.
그래서 kafka에서 받아서, kafka내장 컨슈머가 실시간으로 받는 것을 확인할 수 있다.
spring을 아예 몰라서 약간의 뻘짓을 했다. 좀 공부해야겠다.
5. 후기
일단 첫번쨰 테스트는 local서버에 kafka 브로커를 한개만 이용한 상태다. replication도 안된 상태
목표1. aws이용
그래서 Kafka 브로커를 3대써서 클러스터로 구성한 replication상태로 변경해보고 싶다.
그럼 server에 localhost가 아니라 public ip를 넣어야하겠지?
목표2. docker 이용
docker로 브로커3대를 올리는 것이다.
목표3. jar/war파일로 배포하기
목표4. elasticSearch 이용
kafka lag 데이터를 저장하는 목적, 여기에 모니터링툴로 그라파나까지?
참고자료 : junil-hwang.com/blog/intellij-kafka-spring-api/
'🌿 Data Engineering > Study' 카테고리의 다른 글
[Spark] Apache Spark란? | 구조 및 동작방식 (0) | 2021.04.22 |
---|---|
[Spark] Apache Spark란? | 빅데이터 처리단계-분산처리/하둡/맵리듀스 (0) | 2021.04.22 |
[Spark] Apache Spark란? | 빅데이터 처리단계 -수집과 저장/ETL/HBase (0) | 2021.04.21 |
[Spark] Apache Spark란? | Spark정의를 알아보기까지 빅데이터흐름 (0) | 2021.04.21 |
[kafka 기초] 카프카 실행하기(단일브로커) (0) | 2021.03.26 |
[kafka 기초] AWS에 카프카 클러스터(kafka,zookeeper) 구축하기 (0) | 2021.03.26 |