앞에서 구축한 EMR클러스터를 실행한다.
1. Zeppelin에서 s3에 있는 데이터가져오기
load이후에 들어갈 값은 s3의 URI이다.
top_tracks 데이터 가져오기
%pyspark
from datetime import datetime
toptrack_raw = sqlContext.read.format("parquet").load("s3://musicdatalake/top-tracks/dt=2021-08-02/top_tracks.parquet")
print(type(toptrack_raw))
toptrack_raw.printSchema()
toptrack_df = toptrack_raw.toDF('track_id','artist_id','track_name','popularity','external_url','album_name','image_url')
toptrack_df.show()
audio_features 데이터 가져오기
%pyspark
from datetime import datetime
audio_raw = sqlContext.read.format("parquet").load("s3://musicdatalake/audio-features/dt=2021-08-02/audio_features.parquet")
print(type(audio_raw))
audio_raw.printSchema()
audio_raw_df = audio_raw.toDF('danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness','liveness','valence','tempo','type','id','uri','track_href','analysis_url','duration_ms','time_signature')
audio_raw_df.show()
+ 아테나가 편한것 같기도..
근데 생각해보니까, 아테나를 쓰면 이렇게 데이터프레임을 안만들어도 바로 만들어주는데...
2. Zeppelin에서 RDS에 있는 데이터가져오기
이전에 사실 아테나를 미리 수행을 해서, RDS에 유사도값을 저장해두었다.
그 값을 pyspark로 해서 가지고 올 수도 잇었다.
참고로, zeppelin에서 파이썬의 외부라이브러리를 사용하기 위해서 마스터노드에 접속한다.
그래서 pip으로 별도로 패키지를 설치해야한다.
sudo pip install pandas
sudo pip install pymysql
아티스트간의 유사도 데이터 가져오기
%pyspark
import pandas as pd
import pymysql
rds_host ='database-1.cj2sbwq1t1o1.ap-northeast-2.rds.amazonaws.com' #RDS: endpoint
rds_user ='admin'
rds_pwd = 'qwer1234'
rds_db = 'musicdb'
try:
conn = pymysql.connect(host=rds_host, user=rds_user, password=rds_pwd, db=rds_db)
cursor = conn.cursor()
except:
logging.error('Database Connect Error')
# rds(아티스트정보)의 데이터가져오기
cursor.execute("select * from artists limit 30")
col_names = [d[0] for d in cursor.description]
artists = [dict(zip(col_names,row)) for row in cursor.fetchall()]
# Pandas로 데이터프레임으로 변형
artists=pd.DataFrame(artists)
artists.head()
# rds(유사도계산)의 데이터가져오기
cursor.execute("select * from related_artists limit 30")
col_names = [d[0] for d in cursor.description]
related_artists = [dict(zip(col_names,row)) for row in cursor.fetchall()]
# Pandas로 데이터프레임으로 변형
related_artists=pd.DataFrame(related_artists)
related_artists.head()
3. SparkSQL 사용해보기
지금까지 가지고 온 데이터들의 형식은 아래와 같다.
%pyspark
type(artists) #pandas.core.frame.DataFrame
type(related_artists) #pandas.core.frame.DataFrame
type(audio_raw_df) #pyspark.sql.dataframe.DataFrame
type(toptrack_df) #pyspark.sql.dataframe.DataFrame
toptrack_df_pands = toptrack_df.toPandas()
type(toptrack_df_pands) #pandas.core.frame.DataFrame
RDS에 저장된 데이터
-artists : 아티스트정보
-releated_artists : 아티스트간의 유사도 정보
-audio_raw_df : 음악메타데이터 정보
-toptrack_df : 앨범 및 음악정보
Spark에서는 SparkSQL을 지원해주기 때문에 pyspark.sql.dataframe.DataFrame 형식인 경우, sql을 쓸 수 있다.
그리고 toPandas()로 쉽게 pandas형식으로도 변형할 수 있다.
Select
%pyspark
toptrack_df.createOrReplaceTempView("toptrack")
spark.sql("select artist_id,track_id, track_name, popularity from toptrack limit 5 ").show()
audio_raw_df.createOrReplaceTempView("audio")
spark.sql("select id,danceability,energy,speechiness,tempo,acousticness from audio limit 5").show()
Join
조인해보기
사실 조인을 하는것도 이 안에서 할 수 가 있다.
그래서 나는 유사도 데이터에 있는 아티스트id를 아티스트 데이터와 조인을 해서 바꿔볼까 싶다.
4. 데이터 시각화
select / filter 구문
이제 만들어진 데이터프레임을 가지고 쿼리를 날려볼 수 있다.
pyspark 집계함수 사용
%pyspark
#spark의 집계함수
import pyspark.sql.functions as F
audio_raw_df2 = audio_raw_df.select(audio_raw_df['energy'],audio_raw_df['tempo'])
audio_raw_df2_new = audio_raw_df2.agg(F.avg(audio_raw_df2['energy']).alias("avg_energy"),
F.max(audio_raw_df2['tempo']).alias("max_tempo"),
F.min(audio_raw_df2['tempo']).alias("min_tempo"))
audio_raw_df2_new.show()
엥, 갑자기 Connection이 끊켰다.
java.net.ConnectException: Connection refused (Connection refused)
Pymysql사용해보기
%pyspark
import pandas as pd
import pymysql
rds_host ='database-1.cj2sbwq1t1o1.ap-northeast-2.rds.amazonaws.com' #RDS: endpoint
rds_user ='admin'
rds_pwd = 'qwer1234'
rds_db = 'musicdb'
try:
conn = pymysql.connect(host=rds_host, user=rds_user, password=rds_pwd, db=rds_db)
cursor = conn.cursor()
except:
logging.error('Database Connect Error')
# rds(유사도계산)의 데이터가져오기
cursor.execute("select * from related_artists limit 30")
col_names = [d[0] for d in cursor.description]
related_artists = [dict(zip(col_names,row)) for row in cursor.fetchall()]
related_artists=pd.DataFrame(related_artists)
related_artists.head()
connectin이 끊켜서 그런가 rds와 연결해서 보고싶었는데 이것도 이런 오류가 발생햇다.
메모리가 부족한 탓일지도 모르겠다. 실제로 점점 제플린으로 스팤을 돌리는데 시간이 오래걸리면서 버벅거렸다.
그래서 모니터링 지표를 보니까 메모리가 이렇게 치솟았던것이다. 그래서 원인은 아마 메모리가 부족한게 아닐까 싶다..
https://stophyun.tistory.com/224?category=877899
그리고 AWS안에 있는 RDS랑 연결도 해서 유사도DB에 있는 데이터도 가져와보자!
'사이드 프로젝트 > 음악추천 챗봇 서비스' 카테고리의 다른 글
음악추천챗봇9. AWS EMR 클러스터구축(Hadoop+Spark+Zeppelin) (0) | 2021.08.07 |
---|---|
음악추천챗봇8. 아티스트 유사도 테스트 수행 및 예외처리 (2) | 2021.07.02 |
음악추천챗봇7. crontab 스케쥴링 및 최종 메세지 확인 (0) | 2021.06.29 |
음악추천챗봇6. Athena 쿼리수행 및 음악 유사도 저장 | 데이터마트 (0) | 2021.06.26 |
음악추천챗봇 5.S3에 parquet 형태로 데이터저장 | 데이터레이크 구성 (0) | 2021.06.23 |
음악추천챗봇 4.2 DynamoDB 데이터저장 및 1차 테스트 완료 (0) | 2021.06.19 |