본문 바로가기
Data/Study

[Kafka] Docker로 Kafka 구축하기 | Python으로 Producer,Consumer 구현

by 카프리썬 2021. 8. 11.
728x90

Kafka 테스트해보기2-Python으로 메세지 생성

이번엔 이렇게 python으로 Producer와 Comsumer를 구현해 볼 것이다.

그래서 연속적으로 데이터를 보내는걸 테스트해보려고 한다.

 

Producer 구현

from kafka import KafkaProducer
from json import dumps
import time

topic_name = "topic_test"
producer = KafkaProducer(
        acks=0,
        compression_type='gzip',
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: dumps(x).encode('utf-8')
        )

start = time.time()
print("[begin] producer가 메세지전송 시작")

for i in range(10000):
    data = {'str' : 'result'+str(i)}
    print("메세지 전송중 ..."+data['str'])
    producer.send(topic_name, value=data)
    producer.flush()
print("[end] 걸린시간 :", time.time() - start)

Consumer 구현

from kafka import KafkaConsumer
from json import loads
import time

# topic, broker list
topic_name = "topic_test"
consumer = KafkaConsumer(
            topic_name,
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='my-group',
            value_deserializer=lambda x: loads(x.decode('utf-8')),
            consumer_timeout_ms=1000
            )
# consumer list를 가져온다
start = time.time()
print("[begin] Topic: %s 으로 consumer가 메시지 받아옴" % (topic_name))
for message in consumer:
    print("Partition: %d, Offset: %d, Value: %s" % ( message.partition, message.offset,message.value ))
print("[end] 걸린시간 :", time.time() - start)

 

이제 이 코드를 컨테이너 안으로 옮긴다

docker cp 파일명 컨테이너명:위치

docker cp producer.py local-kafka:root/kafka_2.12-0.10.2.0
docekr cp consumer.py local-kafka:root/kafka_2.12-0.10.2.0

그리고 컨테이너에 접속해서 보면 이렇게 복사된걸 볼 수 있다.

접속 : docker exec -i -t local-kafka bash

 

그리고 이 python을 실행하면 되는데 지금 컨테이너엔 python이 없다!

그래서 설치하려고 봤는데 apt-get도 yum도 안먹힌다..도대체 뭐냐!

근데 아예 OS부터 알지 못했다.

cat /etc/*-release

그래서 os를 찾아봤더니 ? 엥? Alpine Linux(알파인 리눅스)?

구글링해본 결과 docker의 기반이 되는 가장 초경량의 linux라고 한다.  

패키지관리툴을 apk 이다.. (으 그러니까 yum이랑 apt-get이 안되지ㅠㅠ)

그래서 apk 패키지 레파지토리를 업데이트 할 수 있고, 추가할수도 있었다

apk --no-cache updat

apk --no-cache add python3

이제 파이썬을 쓸 수 있다! 휴..

아.. 그전에 pip으로 사용할 파이썬 라이브러리도 추가해줘야한다...

pip3 install kafka

(참고로, python3을 설치해서 pip도 pip3로 명령해야함)

자, 이제 드디어 파이썬코드를 실행할 수 있는 준비가 되었다!

 

Producer 실행

python3 producer.py

1부터 1000까지 데이터를 전송하는거였는데 나도모르게 너무 많아서 중간에 중지시켰다..

그래서 순식간에 793메세지까지 전송댐 (그리고 너무 많이 계속 받아오니까 점점 버벅거려짐...!!!) 

 

Consumer 실행

python3 consumer.py

한번에 테스트에 성공한게 아니라서 곘고해서 오프셋이 쌓이고 있었다.

 

향후에 이 코드를 수정해서 트위터 api를 통해서 실시간 데이터를 가져와서 보내 볼 수 있을 것 같다..!

 

 

 

참고한내용

https://needjarvis.tistory.com/607

 

[카프카] Python으로 Kafka에 전송(Producer)하고 가져오기(consumer)

카프카(Kafka)에서는 다양한 언어로 데이터를 주고 받는 기능을 제공하는데 본 포스팅은 파이썬(Python)으로 구현하는 프로듀서(producer)/컨슈머(consumer) 즉 데이터를 보내고 받는 방법을 설명한다.

needjarvis.tistory.com

https://m.blog.naver.com/wideeyed/221973877361

 

[Kafka] Docker에 Kafka 설치 & 파이썬 테스트

Docker(도커)에 Zookeeper(주키퍼), Kafka(카프카) 컨테이너를 실행하고 파이썬으로 카프카에 메시지를 ...

blog.naver.com

 

- bash: python: 명령을 찾을 수 없음

https://www.hebergementwebs.com/%ED%8A%9C%ED%86%A0%EB%A6%AC%EC%96%BC/-bash-python-command-not-found-error-and-solution

 

-bash: python: 명령을 찾을 수 없음, 오류 및 해결 방법

-bash: python: 명령을 찾을 수 없음, 오류 및 해결 방법 저는 새로운 사용자이고 Python 프로그램을 실행하려고 합니다. 클라우드 기반 VM/VPS가 있고 서버 터미널에 python mycode. py를 입력하면 다음 오류

www.hebergementwebs.com

 

 

알파인리눅스?

 https://dreamholic.tistory.com/92

 

Docker 에서 표준적으로 쓰이는 Apline Linux 는 뭔가요?

Docker 에서 표준적으로 쓰이는 Apline Linux 는 뭔가요? docker 는 경량의 컨테이너에 기반해 서비스를 운영하는 마이크로서비스 아키텍쳐를 가능하게 하는 기본 중의 기본이지요. 그런데 docker 를 조

dreamholic.tistory.com

 

반응형