본문 바로가기
사이드 프로젝트/음악추천 챗봇 서비스

음악추천챗봇10. S3데이터 PySpark처리 및 시각화 (Zeppelin)

by 카프리썬_ 2021. 8. 9.
728x90
728x90

앞에서 구축한 EMR클러스터를 실행한다.

 

1. Zeppelin에서 s3에 있는 데이터가져오기

load이후에 들어갈 값은 s3의 URI이다.

 

top_tracks 데이터 가져오기

 

%pyspark
from datetime import datetime

toptrack_raw = sqlContext.read.format("parquet").load("s3://musicdatalake/top-tracks/dt=2021-08-02/top_tracks.parquet")
print(type(toptrack_raw))
toptrack_raw.printSchema()

toptrack_df = toptrack_raw.toDF('track_id','artist_id','track_name','popularity','external_url','album_name','image_url')
toptrack_df.show()

audio_features 데이터 가져오기

%pyspark

from datetime import datetime

audio_raw = sqlContext.read.format("parquet").load("s3://musicdatalake/audio-features/dt=2021-08-02/audio_features.parquet")
print(type(audio_raw))
audio_raw.printSchema()

audio_raw_df = audio_raw.toDF('danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness','liveness','valence','tempo','type','id','uri','track_href','analysis_url','duration_ms','time_signature')
audio_raw_df.show()

 

+ 아테나가 편한것 같기도..

근데 생각해보니까, 아테나를 쓰면 이렇게 데이터프레임을 안만들어도 바로 만들어주는데...

 

2. Zeppelin에서 RDS에 있는 데이터가져오기

이전에 사실 아테나를 미리 수행을 해서, RDS에 유사도값을 저장해두었다.

그 값을 pyspark로 해서 가지고 올 수도 잇었다. 

 

참고로, zeppelin에서 파이썬의 외부라이브러리를 사용하기 위해서 마스터노드에 접속한다.

그래서 pip으로 별도로 패키지를 설치해야한다.

sudo pip install pandas

sudo pip install pymysql

아티스트간의 유사도 데이터 가져오기

%pyspark

import pandas as pd
import pymysql

rds_host ='database-1.cj2sbwq1t1o1.ap-northeast-2.rds.amazonaws.com' #RDS: endpoint
rds_user ='admin' 
rds_pwd = 'qwer1234'
rds_db = 'musicdb'

try:
    conn = pymysql.connect(host=rds_host, user=rds_user, password=rds_pwd, db=rds_db)
    cursor = conn.cursor()

except:
    logging.error('Database Connect Error')

# rds(아티스트정보)의 데이터가져오기
cursor.execute("select * from artists limit 30")
col_names = [d[0] for d in cursor.description]
artists = [dict(zip(col_names,row)) for row in cursor.fetchall()]
# Pandas로 데이터프레임으로 변형
artists=pd.DataFrame(artists)
artists.head()


# rds(유사도계산)의 데이터가져오기
cursor.execute("select * from related_artists limit 30")
col_names = [d[0] for d in cursor.description]
related_artists = [dict(zip(col_names,row)) for row in cursor.fetchall()]
# Pandas로 데이터프레임으로 변형
related_artists=pd.DataFrame(related_artists)
related_artists.head()

 

3. SparkSQL 사용해보기

지금까지 가지고 온 데이터들의 형식은 아래와 같다.

%pyspark


type(artists) #pandas.core.frame.DataFrame
type(related_artists) #pandas.core.frame.DataFrame
type(audio_raw_df) #pyspark.sql.dataframe.DataFrame
type(toptrack_df) #pyspark.sql.dataframe.DataFrame

toptrack_df_pands = toptrack_df.toPandas()
type(toptrack_df_pands) #pandas.core.frame.DataFrame

 

RDS에 저장된 데이터 

-artists : 아티스트정보 

-releated_artists : 아티스트간의 유사도 정보

-audio_raw_df : 음악메타데이터 정보

-toptrack_df : 앨범 및 음악정보 

Spark에서는 SparkSQL을 지원해주기 때문에 pyspark.sql.dataframe.DataFrame 형식인 경우, sql을 쓸 수 있다.

그리고 toPandas()로 쉽게 pandas형식으로도 변형할 수 있다. 

>>SparkSQL사용방법

 

Select

%pyspark

toptrack_df.createOrReplaceTempView("toptrack")
spark.sql("select artist_id,track_id, track_name, popularity from toptrack limit 5 ").show()

audio_raw_df.createOrReplaceTempView("audio")
spark.sql("select id,danceability,energy,speechiness,tempo,acousticness from audio limit 5").show()

Join

 

 

조인해보기

사실 조인을 하는것도 이 안에서 할 수 가 있다.

그래서 나는 유사도 데이터에 있는 아티스트id를 아티스트 데이터와 조인을 해서 바꿔볼까 싶다.

 

4. 데이터 시각화

 

select / filter 구문

이제 만들어진 데이터프레임을 가지고 쿼리를 날려볼 수 있다.

 

pyspark 집계함수 사용

더보기
%pyspark

#spark의 집계함수

import pyspark.sql.functions as F


audio_raw_df2 = audio_raw_df.select(audio_raw_df['energy'],audio_raw_df['tempo'])
audio_raw_df2_new = audio_raw_df2.agg(F.avg(audio_raw_df2['energy']).alias("avg_energy"),
                                    F.max(audio_raw_df2['tempo']).alias("max_tempo"),
                                    F.min(audio_raw_df2['tempo']).alias("min_tempo"))
audio_raw_df2_new.show()

 

 

엥, 갑자기 Connection이 끊켰다.

java.net.ConnectException: Connection refused (Connection refused)

 

Pymysql사용해보기

더보기
%pyspark

import pandas as pd
import pymysql

rds_host ='database-1.cj2sbwq1t1o1.ap-northeast-2.rds.amazonaws.com' #RDS: endpoint
rds_user ='admin' 
rds_pwd = 'qwer1234'
rds_db = 'musicdb'

try:
    conn = pymysql.connect(host=rds_host, user=rds_user, password=rds_pwd, db=rds_db)
    cursor = conn.cursor()

except:
    logging.error('Database Connect Error')

# rds(유사도계산)의 데이터가져오기
cursor.execute("select * from related_artists limit 30")
col_names = [d[0] for d in cursor.description]
related_artists = [dict(zip(col_names,row)) for row in cursor.fetchall()]
related_artists=pd.DataFrame(related_artists)
related_artists.head()

 

connectin이 끊켜서 그런가 rds와 연결해서 보고싶었는데 이것도 이런 오류가 발생햇다.

메모리가 부족한 탓일지도 모르겠다. 실제로 점점 제플린으로 스팤을 돌리는데 시간이 오래걸리면서 버벅거렸다.

그래서 모니터링 지표를 보니까 메모리가 이렇게 치솟았던것이다. 그래서 원인은 아마 메모리가 부족한게 아닐까 싶다..

 

 

 

https://stophyun.tistory.com/224?category=877899 

 

[Spotify Data Analysis/스포티파이 데이터 분석] Zeppelin을 활용한 스파크(Spark), AWS S3, SQL (7)

제플린(Zeppelin)을 활용해서 Spark의 대한 기능들을 살펴보도록 하겠습니다. 기본적인 적들은 아래와 같은 구문을 통해서 활용할 수 있습니다. 스파크는 rdd라는 개념을 사용합니다. AWS S3에 있는 par

stophyun.tistory.com

 

그리고 AWS안에 있는 RDS랑 연결도 해서 유사도DB에 있는 데이터도 가져와보자!

 

728x90
반응형