이번엔 실제 트위터 실시간 데이터를 kafka로 보내려고 한다!! 두둥! 기대된다.
이걸 하기 위해선 트위터 key가 필요하다 (무려4개나..)
그래서 트위터api 사용 신청을 사전에 해야한다!
나는 이미 빅데이터를 지탱하는 기술 책의 실습을 진행하기 위해서 신청받았었다! (엄청오래걸리고 승인이 까다롭다ㅠ)
Producer 구현
나는 트위터의 실시간데이터 중에서 '김연경'키워드를 가지고 필터링을 할 예정이다.
이걸 실행하면 필터링한 데이터를 producer가 가져와서 consumer에서 보이겟지!?
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from kafka import KafkaProducer
import json
# 트위터API Key
access_token = ""
access_token_secret = ""
api_key = ""
api_secret = ""
# 사용할 topic
topic_name="twitter_topic"
class StdOutListener(StreamListener):
def on_data(self, data):
raw_data = json.loads(data)
producer.send(topic_name, raw_data["text"].encode('utf-8'))
return True
def on_error(self, status):
print (status)
producer = KafkaProducer(bootstrap_servers='localhost:9092')
outlisten = StdOutListener()
auth = OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, outlisten)
stream.filter(track=["김연경"])
하나하나씩 코드리뷰를 해보면 아래와 같다.
1.사용패키지 설치
pip3 install kafak-python
pip3 install python-twitter
pip3 install tweepy
+ 사용할 Topic 생성
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic twitter_topic
2. 트위터API사용하기 위한 Auth핸들러
Auth핸들러를 가지고 auth가 생성이 되면 트위터api를 사용할 준비가 되었다!
from tweepy import OAuthHandler
access_token = ""
access_token_secret = ""
api_key = ""
api_secret = ""
auth = OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)
#print(auth) <tweepy.auth.OAuthHandler object at 0x7f33066aa630>
3. KafkaProducer생성, StdOutListener생성, Stream전송
KafkaProducer를 가지고 kafka(localhost:9092)로 보낼 객체를 하나 만든다.
이 producer를 가지고 StreamListener Class의 on_data()함수에서 카프카의 topic으로 메세지를 보낼 수 있게 된다.
그리고 StdOutListener 클래스가 필요하다.
이 클래스에선 데이터를 받아서 어떻게 할건지 구현하는데 kafka로 메세지를 보내는 로직을 구현하면 된다.
마지막으로
승인받은auth와 StdOutListener클래스의 객체를 넣어 tweepy에서 제공해주는Stream을 사용해서 스트림한다.
아 그리고 모든 트위터 데이터를 가져오면 너무 많으니까 나는 '김연경'이라는 키워드로 필터링했다.
from tweepy import Stream
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
stdout = StdOutListener()
auth = OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, stdout)
stream.filter(track=["김연경"])
4. StdOutListener 클래스 구현
컨슈머로 데이터를 보내는 method가 있는 StreamListener Class를 만든다.
이 클래스는 on_data()함수와 on_error()함수를 가지고 트위터 StreamListener를 생성한다.
on_error()함수는 스트림데이터를 제대로 받아오지 못했을때 status를 출력해준다.
예를 들어, Twitter’s rate limit policy 문제가 발생하면 status.code가 420가 되면서 리스너와 연결이 끊긴다고 한다.
on_data()함수에서는 스트림데이터를 제대로 받아왔을때 그 데이터를 핸들링한다.
스트림에서 받아온 데이터를 json으로 가지고 오고, 그 중에서 text만 topic으로 보낸다.
from tweepy.streaming import StreamListener
import json
# 사용할 topic
topic_name="twitter_topic"
class StdOutListener(StreamListener):
def on_data(self, data):
raw_data = json.loads(data)
producer.send(topic_name, raw_data["text"].encode('utf-8'))
return True
def on_error(self, status):
print (status)
사실 트위터에서 josn.load로 받아온 raw data의 형식은 이렇다. (트위터 developer API에서 제공해주는 sample)
그래서 이중에서 text에 해당하는 값을 topci으로 보내는 것이다.
{
"created_at": "Wed Oct 10 20:19:24 +0000 2018",
"id": 1050118621198921728,
"id_str": "1050118621198921728",
"text": "To make room for more expression, we will now count all emojis as equal—including those with gender and skin t… https://t.co/MkGjXf9aXm",
"user": {},
"entities": {}
}
이제 producer.py을 실행해본다.
python3 producer.py
그리고 consumer에서 받는다. 받는형식을 따로 지정하지 않았더니 twitter['text']가 그대로 들어오는 것 같다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic twitter_topic --from-beginning
Consumer 결과
추가로 재미삼아 '백신'관련 데이터도 가져와봤다.
음 근데 왜 계속 반복적인 메세지가 나오지? 아무래도 이 컨슈머에서 받아온 데이터를 좀 잘 정리해야할 것 같다.
그리고 topic과 offset에 대해서도 자세히 봐야할듯 싶다.
컨슈머를 시작하게 되면 이전 데이터부터 읽어오게 된다.
참고자료 코드리뷰
https://elkhayati.me/kafka-python-twitter/
https://ljvmiranda921.github.io/notebook/2017/02/24/twitter-streaming-using-python/#credentials
추가로 해보면 좋을 내용
이렇게 카프카를 사용해서 실시간으로 받아온 데이터를 카산드라에 저장하고, Spark Straming을 처리하는것이다.
'🌿 Data Engineering > Study' 카테고리의 다른 글
디지털 기술이 사람들의 행동에 미치는 10가지 심리학적 현상 (1) | 2024.07.14 |
---|---|
[Elasticsearch] 동작원리, shard와 replica를 몇개로 설정해야하는가? (0) | 2021.12.23 |
[Elasticsearch] 내부구조, cluster/index/replica/shard 개념정리 (0) | 2021.12.23 |
[Kafka] Docker로 Kafka 구축하기 | Python으로 Producer,Consumer 구현 (0) | 2021.08.11 |
[Kafka] Docker로 Kafka 구축하기 | Producer에서 Consumer로 메세지 전송 (0) | 2021.08.10 |
[ELK] Flask 웹로그 분석해보기2-Flask 로그남기기 (0) | 2021.07.09 |