본문 바로가기
🌿 Data with AI/Study

[kafka 기초] Spring boot웹 Producer->Kafka구현(Intellij 환경)

by 카프리썬 2021. 3. 28.
728x90

걍 로컬환경에서 Kafka 구축해서 테스트해봤는데 잘 안됐었다.

그리고 java로 컨슈머랑 프로듀셔도 만들어서 직접 데이터 전송되는걸 확인해보려고 했다...

하지만 java를 그냥 쓰기엔 흐지부지되어버렸었다.

그래서 이왕 이렇게 된 겸 intellij도 설치해봤다.

 

[JAVA]Intellij IDE 설치 | JAVA 설치 | JDK 다운로드

이클립스는 너ㅓㅓ무 질렸다. 내가 그동안 자바에 끌리지 않는 이유도 다 안예쁜 이클립스 탓.... 그래서 새로운 툴을 써보고 싶어서(?) 다시 자바에 눈을 돌렸다.. 헐키 학생인증하면 ulitmate 라이

pearlluck.tistory.com

아 그리고 intellij는 커뮤니티 말고 utlimate버전으로(학생인증받아서)

그리고 이렇게 된겸 간단한게 spring boot 써보려고 한다!!

 

 

 

1. intellij 환경설정

1) 프로젝트 생성

java IDE는 너무 오랜만이다...맨날 이클립스만 써봤는데 intellij도 처음이라 새롭다..

인텔리제이에서 프로젝트생성 = 이클립스의 workspace

인텔리제이에서 모듈생성 = 이클립스의 project

2) 모듈생성

즉, 프로젝트 생성(kafakProject)-모듈추가(Group : KafkaSpringTest)-SDK 지정(java sdk)-spring boot환경지정

모둘생성완료!

3) 의존성 선택 

Spring boot 2.4.4 버전의 의존성이다. 나같은 경우 참고한 spring boot 버전이 2.0.2라서 비슷한거 찾아옴

-DevTools

-Spring Web

-Spring for Apache Kafka

-Spring for Apache Kafka Stream

4) 준비완료

이제 프로트(kafkaproject)안의  모듈(Test01)안에 카프카 관련 라이브러리가 다 설치되면 준비완료

apache.kafka 랑 apache.zookeeper가 얼핏 보인다

 

2. 카프카 서버 실행(Local : Windows)

1) 로컬에서 zookeeper server 실행 

C:\Kafka\kafka_>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2) 로컬에서 kafka server 실행 (다른cmd창)

C:\Kafka\kafka_>bin\windows\kafka-server-start.bat config\server.properties

C:\Kafka\kafka_>bin\windows\kafka-server-start.bat config\server.properties

3) topic 생성(window powershell)

create topic mytopic나오면 정상적으로 생성된것

그리고 테스트용으로 로컬에 카프카브로커를 하나 설치한거라 replication factor는 1

 

 

3. API 코드작성

원하는 목표는 웹에서 파라미터로 호출한 값들이 kafka로 실시간으로 전달하는 것이다.

상상하자면 api로 호출한 값이 kafka 컨슈머에서 받는거! 

 

1) Kafka.properties : 카프카 기본설정 속성

일단 카프카서버가 로컬에 있으니까 bootstrap.server=localhost로 지정하는듯!

2) 프로젝트구조 

Kafka API를 만드는 것이기때문에 컨슈머, 프로듀셔 클래스가 아니다. 굳이 의미하자면 '프로듀셔'를 만드는 것일것이다

그리고 프로듀셔로서 데이터를 만들어 보낼 수 있게 하는 controller.

3) 클래스 생성 - KafakConfiguration.java

package kafkaspringtest.demo.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
//@PropertySource("classpath:kafka.properties")
@EnableKafka
public class KafkaConfiguration {

    @Autowired
    private Environment env;
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // server host 및 port 지정
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        // key serialize 지정
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // value serialize 지정
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }


    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        // Bean을 통하여 의존성 주입
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

3) 클래스 생성 - HomeController.java

package kafkaspringtest.demo.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@RestController
public class HomeController {

    private static final DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    // KafkaConfiguration에서 작성한 Bean 주입.
    @Autowired
    KafkaTemplate kafkaTemplate;

    /**
     * /get?message=value 형태로 접근할 수 있도록 api 작성
     * @param message
     * @return
     */
    @RequestMapping(value="/get")
    public String getData(@RequestParam(value = "message", required = true, defaultValue = "") String message ){
        // 현재 시간
        LocalDateTime date = LocalDateTime.now();
        String dateStr = date.format(fmt);

        // mytopic에 현재 시간 + message를 produce 한다.
        kafkaTemplate.send("mytopic", dateStr + "   " + message);
        return "kafkaTemplate.send >>  " + message ;
    }
}

 

4. 완료(03.29)

즉, localhost:8080/get?message= 형식의 웹에서 message의 파라미터값을 데이터로 생산한다. 

그래서 kafka에서 받아서, kafka내장 컨슈머가 실시간으로 받는 것을 확인할 수 있다. 

spring을 아예 몰라서 약간의 뻘짓을 했다. 좀 공부해야겠다.

 

 

5. 후기

일단 첫번쨰 테스트는 local서버에 kafka 브로커를 한개만 이용한 상태다. replication도 안된 상태 

목표1. aws이용

그래서 Kafka 브로커를 3대써서 클러스터로 구성한 replication상태로 변경해보고 싶다.

그럼 server에 localhost가 아니라 public ip를 넣어야하겠지?

목표2. docker 이용

docker로 브로커3대를 올리는 것이다.  

목표3. jar/war파일로 배포하기

목표4. elasticSearch 이용

kafka lag 데이터를 저장하는 목적, 여기에 모니터링툴로 그라파나까지?

blog.voidmainvoid.net/279

 

아파치 카프카 Lag 모니터링 대시보드 만들기

kafka-lag-dashboard Kafka lag을 모니터링하는 확실한 방법 Kafka Consumer의 처리시간이 지연되면 topic 내부의 partition lag이 증가합니다. lag 모니터링을 통해 어느 partition이 lag이 ..

blog.voidmainvoid.net

 

참고자료 : junil-hwang.com/blog/intellij-kafka-spring-api/

 

반응형

$(document).ready(function() { var $toc = $("#toc"); $toc.toc({content: ".tt_article_useless_p_margin", headings: "h2,h3,h4"}); });