본문 바로가기
DevOps/Cloud

📖[Kinesis] 직접 알아보는 Kinesis Data Stream

by 카프리썬 2023. 7. 1.
728x90

직접 알아보는 sagmaker에 이어서....! 이번엔 직접 알아보는 Kinesis Data Stream...! 

https://pearlluck.tistory.com/773

 

[Sagemaker] 직접 알아보는 AWS Sagemaker, 우리팀은 잘 사용하고 있는가? (feat. DS와 DE의 갈등 원인과 해

사실 요즘 세이즈메이커로 distributed training 를 해야하는 "챌린지" 가 주어졌다. 그래서 밤낮없이 troubleshooting을 하고 있는데.. (인생은 원래 혼자인법이지..🥲) 그전에 지금까지 우리팀에서 개발

pearlluck.tistory.com

 

 

 

Kinesis(키네시스)란? 

실시간으로 데이터 스트립을 수집,처리,분석 해주는 서비스이다.

(AWS에서 제공해주는 카프카랑 비슷한 서비스라고 하면 되려나..?)

 

키네시스는 아래에 보이는것 처럼 data stream, data firehose, analytics 라고 크게 3가지 기능을 제공한다. 

(공식문서상으로는 kinesis video stream 까지 4가지라곤 하더라!)

간단하게 키네시스의 기능을 보면 아래와 같다. 

- kinesis data stream : 실시간으로 스트림 데이터를 받아 주는 곳

- kinesis data firehose (delivery stream) :  데이터 스트림을 aws 저장소(s3, redshift)에 저장하는 곳

- kinesis data analytics : SQL로 스트림 데이터 분석하는 곳, 분석결과도 다시 firehose로 보낼수 있다.

 

이중에서 일단은 kinesis data stream 을 
언제 사용하고, 어떻게 동작되서, 실제로는 어떻게 사용하고 있는지 알아보려고 한다. 

 

kinesis data stream

실시간으로 들어오는 데이터를 받고, 저장하는 역할을 한다.

 

언제 사용하는가? 앱이 보낸 실시간 데이터를 받을때 사용한다.  

실시간 데이터라고 하면 약간 막연한데, 예를 들면 커머스 앱을 개발했다고 치자.

이때 이 앱을 사용하는 사용자들에 대한 행동분석(?)이 필요하다면, 어떤 데이터가 필요할까.

사용자들이 상품을 구매하고, 장바구니에 담고, 어떤 상품을 좋아하는지,

이런 사용자들의 이벤트 데이터가 필요할 것 같다.

 

그런데 한번 상상해보면 이런 사용자들의 이벤트 데이터가 어떻게 수집될까?

한명도 아닌 수십 수백명의 사용자들이 계속해서 활동하기 때문에, 계속해서 흘러 들어온다는 개념으로 상상이 된다.

그렇다면 이렇게 마치 흘러가는 바람처럼(띵곡추천) 들어오는 데이터를 누가 받는가?!

그게 바로 데이터 스트림이다.

 

그래서 예를 들어 개발한 커머스 앱에서 data stream으로 실시간으로 데이터를 전송하면,

이 data stream을 항상 바라보고 있었던,  listening하고 있었던, 다른 시스템이 데이터를 받아서 처리한다. 

 

 

어떻게 동작하는가? producerconsumer 그리고 shard

키네시스에 데이터를 넣는 producer(생산자), 데이터를 받는 consumer(소비자)가 있다.

키네시스의 응답을 기다리면서 데이터가 들어오면 이를 받아서 처리하는 서비스를 application이라고 한다.

그리고 위에 그림에서 보이는것처럼 키네시스는 하나이상의 shard로 구성되어 있다.

샤드가 데이터 레코드를 파티션 키 기준으로 데이터를 그룹화해서 consumer로 흘려보낸다고 이해하면 될것 같다.

 

샤드(shard)data stream을 고유하게 식별할 수 있는 단위로, 흘러오는 스트림 데이터들의 뭉치로 이해할 수 있을것 같다.

하나의 샤드를 읽을때는 최대 5개의 트랜잭션, 초당 2MB 읽기 속도, 최대 1,000개의 data record 읽을 수 있다.

data record는 데이터 스트림 안에서 사용하는 메세지로, 스트림 데이터 뭉치들의 각각의 요소들로 이해할 수 있을것 같다.

한번 스트림에 들어가면 변경이 불가능하며 시퀀스 번호, 파티션키, 실제 메세지 등으로 이루어져 있다. 

시퀀스 번호(sequence number)는 data record를 고유하게 식별하는 단위고, 

파티션 키(partition key)는 data stream 내에서 샤드별로 데이터를 그룹화 하는데 사용한다. 

그리고 이 data record보존기간(retention)은 기본적으로 24시간이며,

최대 365일로 늘릴수 있지만, 추가요금이 발생한다고 한다. 

 

결론적으로, 내가 이해한 동작방식은 아래와 같이 정리할 수 있다.

개발한 앱에서 실시간으로 데이터를 키네시스로 보내는 이 앱이 producer(생산자)이며,

키네시스로 들어오는 데이터를 처리하려고 기다리고 있는 서비스들이 consumer(소비자)가 된다.

또, 키네시스에는 하나이상의 샤드(shard)로 스트림 데이터를 식별하고,

샤드(shard)는 시퀀스 번호(sequence number)로 식별할 수 여러개의 data record 메세지를

파티션 키(partition key)로 샤드별로 그룹화해서 실시간으로 읽는다. 

 

참고 : 키네시스 용어 및 개념정리

https://docs.aws.amazon.com/ko_kr/streams/latest/dev/key-concepts.html

 

 

이제 본격적으로 실제로 어떻게 사용하고 있는지 알아봐야한다..코드주의..

 

 

어떻게 사용하고 있는가? 프로비저닝 모드로, 유저 이벤트를 수집하는 data stream 생성

실제로 실시간으로 유저이벤트 데이터를 수집하는 data stream이 있다. 

대충 이런식의 stream이 있다. 기본적으로 24시간 하루 retention으로 세팅이 되어 있고, capacity mode는 provisioned이다. 

용량모드(capacity mode)는 온디맨드와 provisioned를 선택할 수 있다. 

온디맨드는 샤드를 자동으로 관리해서, 들어오는 데이터에 따라 자동으로 샤드를 확장 및 축소 할 수 있고, 그만큼 비용이 청구된다.

프로비저닝 사용자가 샤드 수를 직접 지정할 수 있고, 샤드의 수는 아래와 같은 수식으로 계산할 수 있다.

number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)

주로 온디맨드는 예측이 불가능한 가변적인 트래픽, 프로비저닝은 예측이 가능한 고정적인 트래픽을 처리할때 사용한다.

 

참고 : 키네시스 데이터 스트림 용량모드

https://docs.aws.amazon.com/ko_kr/streams/latest/dev/how-do-i-size-a-stream.html 

 

우리는 작은 서비스이기에,, 스트림 데이터가 작아 샤드를 하나만 사용하기로 직접 지정했다.

조금 더 규모가 성장하면 샤드수에 대한 고민이 더 나아가 용량모드에 대한 고민까지 필요할것 같다. 

 

 

어떻게 사용하고 있는가? 실시간으로 수집한 유저 이벤트 데이터를 주기적으로 처리하는 application 개발

이 키네시스 데이터 스트림에 유저이벤트 데이터를 보내는 producer(생산자)는 우리 서비스 앱이다.

그리고 consumer(소비자)는 내부적으로 개발한 spring application이다. 

 

이 어플리케이션은 kinesis client로 키네시스의 실시간 데이터를 받고,

shedulerLock로 주기적으로 redis에 저장하는 스케쥴링 작업을 수행한다.

 

이 어플리케이션의 로직을 간략하게 살펴보면 아래와 같다. (사실 아직도 요부분은 정확하게 잘 모르겠다ㅠㅠ)

KinesisClientConfig에서 kinesis client를 통해 kinesis properties와 user Event 스케쥴링도 등록한다는 bean을 주입한다. 

UserEventRecordProcessorFactory 에서는data stream의 shard를 어떻게 처리할지 구현한다. 여기에선 redis 서비스를 주입했다. 

UserEventRecordProcessor는 shard의 안에 있는 data record를 어떻게 처리할지 구현한다. 

보면 List<KinesisClientRecord>로 records를 파라미터로 받아서,

UserEventRecord.of()를 통해 UserEventRecord 기반의 모델로 만들어준다

UserEventRecord는 아래와 같이 구성되어 있고,

실질적으로 키네시스에 실시간으로 들어오는 record의 각 데이터라고 보면 된다. 

실제로 유저들의 이벤트 관련 실시간 데이터이기 때문에 userID가 있고, eventType과 serverTime을 알수 있다.

아무튼간에 shard 기준으로, record 기준으로 들어온 스트림 데이터를 어떻게 처리할지 생각보다 디테일하게 구현해야한다. 

 

그리고 이제 이러한 작업을 주기적으로 동작하는 스케쥴링을 걸어야한다. 

shedulerLock은 마치 airflow 스케쥴러와 유사한데, 하나의 서버에서 하나의 스케쥴링 작업을 처리할수있다. 

예를 들어 airflow 같은 경우, 주로 큰 배치작업을 스케쥴링하는데 사용해서 여러개의 인스턴스를 띄워서 스케쥴링을 동작하는 반면

shedulerLock은 스케쥴링 작업에 마치 락을 거는것처럼 오직 하나의 인스턴스에서 하나의 스케쥴링 작업만 할 수 있다

 

참고 : scheduler lock

https://sg-choi.tistory.com/602 

 

shedulerLock을 수행하기 위해서는 아래 처럼 dependecy를 주입해야하고, 

스케쥴이 동작할때, redis에 접근할수 있도록  lockProvider bean을 주입해야한다. 

lockProvider가 하나의 리소스에만 접근할 수 있도록 제한해주는 bean인가..? 

아무튼 그리고 최종적으로 주기적으로 실행될 cron을 명시해서 스케줄을 등록하면 된다. 

@SchedulerLock 어노테이션에서

lockAtLeastForString으로 락을 유지해야할 기간을 지정해서, 메소드간의 시간차이를 지정하는 것이고,

lockAtMostForString으로 락을 유지해야할 최소 기간을 지정해서, 인스턴스(노드)가 죽었을때 락을 얼마나 유지해야할지 정하는 것이다.

 

그래서 작업이 오류나거나 시간이 오래걸릴경우,

다른 스케쥴러가 최대로 lockAtMostForString 시간 만큼 기다려 줄수 있고

해당 작업이 다시 시작되면 최소로 lockAtLeastForString 시간 만큼 기다려줄 수 있다.  

참고 : lockAtLeastForString, lockAtMostForString

https://eunbc-2020.tistory.com/200

 

 

마무리,, kinesis data stream

생각보다 글이 길어졌는데, 이렇게 해서 kinesis data stream을 

언제 사용하고, 어떻게 동작되서, 실제로는 현재 어떻게 사용하고 있는지까지 알아보았다.

 

요약해보자면..

실시간 스트림 데이터를 받을때 kinesis data stream을 사용하고

producer가 데이터를 보내고, consumer가 데이터를 받으면서 샤드와 데이터 레코드 단위로 키네시스가 동작한다. 

마지막으로 유저이벤트 데이터를 수집하고 주기적으로 처리하도록 내부적으로 spring application을 개발하여 사용하고 있다! 

 

반응형