본문 바로가기
🌿 Data Engineering/Study

[Kafka] Docker로 Kafka구축하기 | 트위터API사용해서 실시간데이터 전송하기

by 카프리썬_ 2021. 8. 11.
728x90
728x90

이번엔 실제 트위터 실시간 데이터를 kafka로 보내려고 한다!! 두둥! 기대된다.

이걸 하기 위해선 트위터 key가 필요하다 (무려4개나..)

그래서 트위터api 사용 신청을 사전에 해야한다! 

나는 이미 빅데이터를 지탱하는 기술 책의 실습을 진행하기 위해서 신청받았었다! (엄청오래걸리고 승인이 까다롭다ㅠ)

 

Producer 구현

나는 트위터의 실시간데이터 중에서 '김연경'키워드를 가지고 필터링을 할 예정이다.

이걸 실행하면 필터링한 데이터를 producer가 가져와서 consumer에서 보이겟지!?

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from kafka import KafkaProducer
import json

# 트위터API Key
access_token = ""
access_token_secret =  ""
api_key =  ""
api_secret =  ""

# 사용할 topic
topic_name="twitter_topic"

class StdOutListener(StreamListener):
    def on_data(self, data):
        raw_data = json.loads(data)
        producer.send(topic_name, raw_data["text"].encode('utf-8'))
        return True
    def on_error(self, status):
        print (status)

producer = KafkaProducer(bootstrap_servers='localhost:9092')
outlisten = StdOutListener()
auth = OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, outlisten)
stream.filter(track=["김연경"])

 

 

하나하나씩 코드리뷰를 해보면 아래와 같다.

 

1.사용패키지 설치

pip3 install kafak-python
pip3 install python-twitter
pip3 install tweepy 

 

+ 사용할 Topic 생성

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic twitter_topic

 

2. 트위터API사용하기 위한 Auth핸들러 

Auth핸들러를 가지고 auth가 생성이 되면 트위터api를 사용할 준비가 되었다! 

from tweepy import OAuthHandler

access_token = ""             
access_token_secret =  ""
api_key =  ""
api_secret =  ""

auth = OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)

#print(auth) <tweepy.auth.OAuthHandler object at 0x7f33066aa630>

 

3. KafkaProducer생성, StdOutListener생성, Stream전송

KafkaProducer를 가지고 kafka(localhost:9092)로 보낼 객체를 하나 만든다. 

이 producer를 가지고 StreamListener Class의 on_data()함수에서 카프카의 topic으로 메세지를 보낼 수 있게 된다.

 

그리고 StdOutListener 클래스가 필요하다. 

이 클래스에선 데이터를 받아서 어떻게 할건지 구현하는데 kafka로 메세지를 보내는 로직을 구현하면 된다.

 

마지막으로

승인받은auth와 StdOutListener클래스의 객체를 넣어 tweepy에서 제공해주는Stream을 사용해서 스트림한다.  

아 그리고 모든 트위터 데이터를 가져오면 너무 많으니까 나는 '김연경'이라는 키워드로 필터링했다. 

from tweepy import Stream
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers='localhost:9092')
stdout = StdOutListener()

auth = OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)

stream = Stream(auth, stdout)
stream.filter(track=["김연경"])

 

4. StdOutListener 클래스 구현

컨슈머로 데이터를 보내는 method가 있는 StreamListener Class를 만든다. 

이 클래스는 on_data()함수와 on_error()함수를 가지고 트위터 StreamListener를 생성한다.

 

on_error()함수는 스트림데이터를 제대로 받아오지 못했을때 status를 출력해준다. 

예를 들어, Twitter’s rate limit policy 문제가 발생하면 status.code가 420가 되면서 리스너와 연결이 끊긴다고 한다. 

 

on_data()함수에서는 스트림데이터를 제대로 받아왔을때 그 데이터를 핸들링한다. 

스트림에서 받아온 데이터를 json으로 가지고 오고, 그 중에서 text만 topic으로 보낸다. 

from tweepy.streaming import StreamListener
import json

# 사용할 topic
topic_name="twitter_topic"

class StdOutListener(StreamListener):
    def on_data(self, data):
        raw_data = json.loads(data)
        producer.send(topic_name, raw_data["text"].encode('utf-8'))
        return True
    def on_error(self, status):
        print (status)

사실 트위터에서 josn.load로 받아온 raw data의 형식은 이렇다.  (트위터 developer API에서 제공해주는 sample)

그래서 이중에서 text에 해당하는 값을 topci으로 보내는 것이다. 

{
 "created_at": "Wed Oct 10 20:19:24 +0000 2018",
 "id": 1050118621198921728,
 "id_str": "1050118621198921728",
 "text": "To make room for more expression, we will now count all emojis as equal—including those with gender‍‍‍ ‍‍and skin t… https://t.co/MkGjXf9aXm",
 "user": {},  
 "entities": {}
}

 

이제 producer.py을 실행해본다.

python3 producer.py

그리고 consumer에서 받는다. 받는형식을 따로 지정하지 않았더니 twitter['text']가 그대로 들어오는 것 같다. 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic twitter_topic --from-beginning

 

Consumer 결과

 

 

추가로 재미삼아 '백신'관련 데이터도 가져와봤다.

음 근데 왜 계속 반복적인 메세지가 나오지? 아무래도 이 컨슈머에서 받아온 데이터를 좀 잘 정리해야할 것 같다.

그리고 topic과 offset에 대해서도 자세히 봐야할듯 싶다. 

컨슈머를 시작하게 되면 이전 데이터부터 읽어오게 된다. 

 

 

참고자료 코드리뷰 

https://elkhayati.me/kafka-python-twitter/

 

Streaming Covid19 tweets using Kafka and Twitter API

People use Twitter for all kinds of business puposes. And unlike Facebook and other social media platforms, Twitter provides some sort of data freely using the…

elkhayati.me

https://ljvmiranda921.github.io/notebook/2017/02/24/twitter-streaming-using-python/#credentials

 

How to stream Twitter using Python

Streaming tweets can be a fun exercise in data mining. With almost a million tweets being published everyday, there is an enormous wealth of data that can be...

ljvmiranda921.github.io

 

추가로 해보면 좋을 내용 

이렇게 카프카를 사용해서 실시간으로 받아온 데이터를 카산드라에 저장하고, Spark Straming을 처리하는것이다.

 

https://velog.io/@shinychan95/Twitter-API%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%98%EC%97%AC-%EC%8B%A4%EC%8B%9C%EA%B0%84-tweet%EC%9D%84-kafka%EB%A1%9C-%EB%B3%B4%EB%82%B4%EA%B8%B0#producer-%EC%83%9D%EC%84%B1-%EC%8B%9C-%EC%B6%94%EA%B0%80-%EC%84%A4%EC%A0%95-%EB%B0%8F-data-sending-%EA%B4%80%EB%A0%A8-handling

 

Twitter API를 활용하여 실시간 tweet을 Kafka로 보내기

2019년 겨울 프로젝트 정리 TweetDeck Clone Project = 데이터 파이프라인 및 tweet 실시간 피드 개발 (위 사진에서 왼쪽에 해당하며) Twitter API로부터 실시간 tweet들을 받아서 tweet과 retweet으로 분리하여 k

velog.io

 

728x90
반응형