반응형 🌿 Data Engineering58 [Kafka] Docker로 Kafka구축하기 | 트위터API사용해서 실시간데이터 전송하기 이번엔 실제 트위터 실시간 데이터를 kafka로 보내려고 한다!! 두둥! 기대된다. 이걸 하기 위해선 트위터 key가 필요하다 (무려4개나..) 그래서 트위터api 사용 신청을 사전에 해야한다! 나는 이미 빅데이터를 지탱하는 기술 책의 실습을 진행하기 위해서 신청받았었다! (엄청오래걸리고 승인이 까다롭다ㅠ) Producer 구현 나는 트위터의 실시간데이터 중에서 '김연경'키워드를 가지고 필터링을 할 예정이다. 이걸 실행하면 필터링한 데이터를 producer가 가져와서 consumer에서 보이겟지!? from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from kafka imp.. 2021. 8. 11. [Kafka] Docker로 Kafka 구축하기 | Python으로 Producer,Consumer 구현 Kafka 테스트해보기2-Python으로 메세지 생성 이번엔 이렇게 python으로 Producer와 Comsumer를 구현해 볼 것이다. 그래서 연속적으로 데이터를 보내는걸 테스트해보려고 한다. Producer 구현 from kafka import KafkaProducer from json import dumps import time topic_name = "topic_test" producer = KafkaProducer( acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8') ) start = time.time() print("[begi.. 2021. 8. 11. [Kafka] Docker로 Kafka 구축하기 | Producer에서 Consumer로 메세지 전송 목표 카프카를 이전에 간단한 이론정도로 학습해봤지만, 뭔가 제대로 와닿은 느낌이 없었다. 그냥 대략적으로 훑어본 수준이고, 프로젝트를 하긴 했지만 뭔가 시착오가 있어서 제대로된 결과물이 만족스럽지 않아서 다시 도전 해보려고한다! 특히 그때는 ec2에다가 직접 카프카를 설치했지만, 이번엔 도커로 해보자! 그래서 나의 목표는 아래와 같이 3가지이다. - docker로 카프카 구축해보기 : topic만들고, 메세지를 직접 입력해보면서 프로듀서와 컨슈머 확인해보기 - Python으로 실시간 데이터 전송하기 : 연속적으로 들어오는 임의의 데이터를 python으로 만들고 메세지 전송확인하기 - 트위터API를 사용해서 실제데이터 전송하기 : 이제 python코드를 직접 수정해서 실제 데이터를 전송해보는 것이다. Do.. 2021. 8. 10. [Pandas 데이터전처리] 3-2.Dataframe 조작함수2 apply,cut,set_index DataFrame으로 데이터를 조작하기 위한 여러가지 함수의 사용방법 데이터엔지니어링/데이터사이언티스트 교육을 통해 배운내용을 복습하고, 추가로 공부한 내용을 정리하였습니다. 아래의 주피터내용은 여기 깃허브에서 확인할 수 있습니다. 6. 동일한 연산반복 : apply() 동일한 연산을 모든열에 혹은 모든 행에 반복 적용하고자 할때 사용 apply(반복적용할 함수, axis=0/1) : 열마다(0), 행마다(1), 생략시 기본값0 집계함수는 행/단위 백터화연산을 수행하기 때문에 굳이 apply()를 쓸 필요가 없다. lambda 함수나 사용자 정의 함수를 각 열 또는 행에 일괄 적용시키기 위해 사용 7. 데이터변형(관측데이터->범주형데이터) : cut(), 카테고리객체 값의 크기를 기준으로하여 카테고리 값.. 2021. 7. 15. [Pandas 데이터전처리] 3-1.Dataframe 조작함수1. value_counts(), 정렬 DataFrame으로 데이터를 조작하기 위한 여러가지 함수의 사용방법 데이터엔지니어링/데이터사이언티스트 교육을 통해 배운내용을 복습하고, 추가로 공부한 내용을 정리하였습니다. 아래의 주피터내용은 여기 깃허브에서 확인할 수 있습니다. 1. 데이터개수세기 : count() NaN값은 세지 않는다. 2. 카테고리값 세기 : value_counts() 각각의 값이 나온 횟수를 센다. 파라미터 normalize=True 를 사용하면 각 값 및 범주형 데이터의 비율을 계산 - 시리즈에서 카테고리값 세기 - 범주형 데이터에서 카테고리값 세기 - 데이터프레임에서 카테고리값 세기 3. 데이터정렬 - 시리즈에서 데이터 정렬 sort_index() : 인덱스를 기준으로 정렬 sort_value() : 데이터 값을 기준으로 .. 2021. 7. 15. [Pandas 데이터전처리] 2-2.Pandas 데이터구조(Dataframe 인덱싱) pandas의 기본적인 데이터구조인 Series와 DataFrame 데이터엔지니어링/데이터사이언티스트 교육을 통해 배운내용을 복습하고, 추가로 공부한 내용을 정리하였습니다. 아래의 주피터내용은 여기 깃허브에서 확인할 수 있습니다. 데이터프레임 인덱싱방법 1. 열기준 인덱싱 2. 인덱서를 사용하지 않는 행기준 인덱싱 3. 인덱서를 사용하는 행기준 인덱싱 1. 열기준인덱싱 1) 하나의 열추출 하나의 열을 가지고 오는 방법 : df[컬럼명] -> 그 결과는 시리즈 타입을 가지고 있다. 2) 2개이상의 열추출 데이터프레임으로 리턴받고자 하면 : df[[컬럼명]] -> [컬럼명]을 리스트형태로 사용한다. 컬럼명이 문자열일 경우, 수치인덱스를 사용할 수 없다. 위치인덱싱(index번호로 찾는것) 을 사용할 수 없어.. 2021. 7. 15. [Pandas 데이터전처리] 2-2.Pandas 데이터구조 (DataFrame) pandas의 기본적인 데이터구조인 Series와 DataFrame 데이터엔지니어링/데이터사이언티스트 교육을 통해 배운내용을 복습하고, 추가로 공부한 내용을 정리하였습니다. 아래의 주피터내용은 여기 깃허브에서 확인할 수 있습니다. pandas를 사용하기에 앞서 numpy와 pandas 패키지를 모두 import해야한다. import numpy as np import pandas as pd DataFrame (데이터 프레임) 엑셀의 스프레드시트와 같은 개념으로, 2차원 행렬 데이터에 인덱스를 붙인 구조 (행과 열로 만들어진 2차원 배열) 데이터프레임의 각 열은 시리즈로 구성되어 있다. 1. 데이터프레임 생성 : pd.DataFrame() 데이터프레임을 생성하는 방법은 아래와 같이 4가지 경우 1) 리스트로.. 2021. 7. 15. [Pandas 데이터전처리]] 2-1.Pandas 데이터구조(Series) pandas의 기본적인 데이터구조인 Series와 DataFrame 데이터엔지니어링/데이터사이언티스트 교육을 통해 배운내용을 복습하고, 추가로 공부한 내용을 정리하였습니다. 아래의 주피터내용은 여기 깃허브에서 확인할 수 있습니다. pandas를 사용하기에 앞서 numpy와 pandas 패키지를 모두 import해야한다. import numpy as np import pandas as pd Series(시리즈) 1차원 배열의 값에 대응되는 인덱스를 부여할 수 있는 구조 1. 시리즈 정의 : pd.Series() python의 list나 numpy가 array인자로 입력된다. 시리즈의 결과는 왼쪽에 index값, 오른쪽에 value가 동시에 확인된다. 왼쪽결과는 일반적인 series를 정의한 경우이고, 오른.. 2021. 7. 15. [Pandas 데이터전처리] 1. Numpy,Pandas 라이브러리 알아보기 데이터의 가공과 처리에 유용한 python의 라이브러리 데이터엔지니어링/데이터사이언티스트 교육을 통해 배운내용을 복습하고, 추가로 공부한 내용을 정리하였습니다. 파이썬 라이브러리를 활용한 데이터분석의 책 내용과 비슷한 것 같다. Numpy(넘파이) 과학연산을 위한 라이브러리, 리스트, 배열, 매트릭스 연산 등을 빠르게 만들어주는 라이브러리 일반적으로 대규모데이터의 리스트는 중첩된 자료를 처리하는게 느리고 복잡하지만 numpy 라이브러리를 써서 처리 속도를 30~40배 빠르게 할 수 있다! import numpy as np 넘파이 라이브러리를 통해 배열에 접근하는 함수는 다음과 같다. numpy로 1차원 배열접근 numpy로 2차원 배열접근 numpy를 사용해서 array에 접근하면 모든원소에 대해서 조건.. 2021. 7. 15. [ELK] Flask 웹로그 분석해보기2-Flask 로그남기기 이렇게 docker로 간단하게 elk를 구성했다. 이제 내가 만든 웹서버를 켜서 로그를 보내려고한다. 그러니까 내가 만든 웹서버안에 로그를 남겨두면 된다. 아 그전에 EC2의 스펙을 늘렸다. 이전에 왠지모르게 ELK설치 후 계속 인스턴스가 버벅거렸다. 알고보니 docker로 ELK세팅을 할 경우 메모리가 최소2기가 이상 필요했다. 메모리가 2기가인 t2.small도 버벅거렸다. 그래서 결국 t2.medium 타입으로 인스턴스 스펙을 올렸다. >> elk구성시 물리적인 스펙에 관련해서 여기를 참고했다. Flask 추가설정 일단 테스트페이지에 대한 로그를 남겨두도록 Flask에서 추가로 설정했다. ( 이분 블로그참고) -app.py 수정 : 로그를 가져올 메인py(my_test.py) 를 라우트등록 -log.. 2021. 7. 9. 이전 1 2 3 4 5 6 다음 728x90