Kafka 테스트해보기2-Python으로 메세지 생성
이번엔 이렇게 python으로 Producer와 Comsumer를 구현해 볼 것이다.
그래서 연속적으로 데이터를 보내는걸 테스트해보려고 한다.
Producer 구현
from kafka import KafkaProducer
from json import dumps
import time
topic_name = "topic_test"
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
start = time.time()
print("[begin] producer가 메세지전송 시작")
for i in range(10000):
data = {'str' : 'result'+str(i)}
print("메세지 전송중 ..."+data['str'])
producer.send(topic_name, value=data)
producer.flush()
print("[end] 걸린시간 :", time.time() - start)
Consumer 구현
from kafka import KafkaConsumer
from json import loads
import time
# topic, broker list
topic_name = "topic_test"
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=1000
)
# consumer list를 가져온다
start = time.time()
print("[begin] Topic: %s 으로 consumer가 메시지 받아옴" % (topic_name))
for message in consumer:
print("Partition: %d, Offset: %d, Value: %s" % ( message.partition, message.offset,message.value ))
print("[end] 걸린시간 :", time.time() - start)
이제 이 코드를 컨테이너 안으로 옮긴다
docker cp 파일명 컨테이너명:위치
docker cp producer.py local-kafka:root/kafka_2.12-0.10.2.0
docekr cp consumer.py local-kafka:root/kafka_2.12-0.10.2.0
그리고 컨테이너에 접속해서 보면 이렇게 복사된걸 볼 수 있다.
접속 : docker exec -i -t local-kafka bash
그리고 이 python을 실행하면 되는데 지금 컨테이너엔 python이 없다!
그래서 설치하려고 봤는데 apt-get도 yum도 안먹힌다..도대체 뭐냐!
근데 아예 OS부터 알지 못했다.
cat /etc/*-release
그래서 os를 찾아봤더니 ? 엥? Alpine Linux(알파인 리눅스)?
구글링해본 결과 docker의 기반이 되는 가장 초경량의 linux라고 한다.
패키지관리툴을 apk 이다.. (으 그러니까 yum이랑 apt-get이 안되지ㅠㅠ)
그래서 apk 패키지 레파지토리를 업데이트 할 수 있고, 추가할수도 있었다
apk --no-cache updat
apk --no-cache add python3
이제 파이썬을 쓸 수 있다! 휴..
아.. 그전에 pip으로 사용할 파이썬 라이브러리도 추가해줘야한다...
pip3 install kafka
(참고로, python3을 설치해서 pip도 pip3로 명령해야함)
자, 이제 드디어 파이썬코드를 실행할 수 있는 준비가 되었다!
Producer 실행
python3 producer.py
1부터 1000까지 데이터를 전송하는거였는데 나도모르게 너무 많아서 중간에 중지시켰다..
그래서 순식간에 793메세지까지 전송댐 (그리고 너무 많이 계속 받아오니까 점점 버벅거려짐...!!!)
Consumer 실행
python3 consumer.py
한번에 테스트에 성공한게 아니라서 곘고해서 오프셋이 쌓이고 있었다.
향후에 이 코드를 수정해서 트위터 api를 통해서 실시간 데이터를 가져와서 보내 볼 수 있을 것 같다..!
참고한내용
https://needjarvis.tistory.com/607
https://m.blog.naver.com/wideeyed/221973877361
- bash: python: 명령을 찾을 수 없음
알파인리눅스?
https://dreamholic.tistory.com/92
'🌿 Data Engineering > Study' 카테고리의 다른 글
[Elasticsearch] 동작원리, shard와 replica를 몇개로 설정해야하는가? (0) | 2021.12.23 |
---|---|
[Elasticsearch] 내부구조, cluster/index/replica/shard 개념정리 (0) | 2021.12.23 |
[Kafka] Docker로 Kafka구축하기 | 트위터API사용해서 실시간데이터 전송하기 (0) | 2021.08.11 |
[Kafka] Docker로 Kafka 구축하기 | Producer에서 Consumer로 메세지 전송 (0) | 2021.08.10 |
[ELK] Flask 웹로그 분석해보기2-Flask 로그남기기 (0) | 2021.07.09 |
[ELK] Flask 웹로그 분석해보기1-Docker로 ELK Stack 설치 (0) | 2021.07.06 |