본문 바로가기
데이터분석

[DACOS] CH3 실습. 수집 파일럿 실행

by sum_mit45 2023. 1. 11.
728x90
반응형
출처: 실무로 배우는 빅데이터 기술(김강원)

 

2단계 - 수집 환경 구성
3단계 - 플럼 수집 기능 구현
4단계 - 카프카 기능 구현
5단계 - 수집 기능 테스트 

 

2단계 - 수집 환경 구성

플럼 설치 및 환경설정

server02.hadoop.com  에 Flume 설치 완료

 

Heap Memory 50에서 100으로 변경
CM 홈 → [Flume] → [시작] → 플럼 에이전트 구동 완료

 

카프카 설치

server02.hadoop.com  에 Kafka 설치 완료

 

메시지의 보관 기간(Data Retention Time) 7일에서 10분으로 수정

 

CM 홈 → [Kafka] → [재시작] → 카프카 에이전트 구동 완료

 

 

3단계 - 플럼 수집 기능 구현

스마트카의 상태 정보를 수집하는 SmartCarInfo Agent

운전자의 운행 정보를 수집하는 DriverCarInfo Agent

 

플럼의 에이전트를 만들려면 플럼이 인식할 수 있는 특정 디렉터리에 이름이 {Agent 고유이름}.conf 형식인 파일을 생성하면 된다. CM에서 제공하는 플럼 구성 정보 설정을 통해 에이전트 생성 가능.

 

SmartCar 에이전트 생성

CM 홈 → [Flume] → [구성] → [Agent 이름], [구성 파일]

.conf 파일 → github(https://github.com/wikibook/bigdata2nd/blob/master/CH03/예제-3.1/SmartCar_Agent.conf)

 

 

#플럼의 에이전트에서 사용할 source, channel, sink의 각 리소스 변수 정의
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

#에이전트의 source 설정. 
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir 
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

#에이전트의 channel 설정
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000

#에이전트의 sink 설정
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

#source와 channel sink를 연결: File -> Channel -> Sink로 이어지는 에이전트 리소스를 하나로 연결
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel #SpoolSource의 채널값 설정
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel #LoggerSink의 채널값 설정

Source.type spooldir : 지정한 특정 디렉터리를 모니터링하고 있다가 새로운 파일이 생성되면 이벤트를 감지해서 "batchSize"의 설정값만큼 읽어서 Channel에 데이터 전송

Channel.type 은 크게 memory와 type이 있음

  • memory channel은 Source로부터 받은 데이터를 메모리상에 중간 적재하므로 성능 높고, 안정성 낮음
  • file channel은 Source에서 전송한 데이터를 받아 로컬 파일 시스템 경로인 ‘dataDirs’에 임시로 저장했다가 Sink에게 데이터를 제공하므로 성능 낮고, 안정성 높음

LoggerSink.type logger : 수집한 데이터를 테스트 및 디버깅 목적으로 플럼의 표준 출력 로그 파일인 /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log에 출력

 

SmartCar 에이전트에 Interceptor 추가

`CM 홈 → [Flume] → [구성] → [구성 파일]

 

#수집 데이터를 필터링하기 위해 filterInterceptor 변수 선언후 할당
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

#정상적인 로그 데이터 발생시 14자리의 날짜 형식을 가짐.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\\\d{14} #14자리 날짜 형식으로 시작하는 데이터에 대한 정규 표현식
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false #true로 하면 반대로 제외 대상만 수집

Interceptor : Source와 Channel의 중간에서 데이터를 가공하는 역할. 플럼의 Source에서 유입되는 데이터 중 일부 데이터를 수정하거나 필요한 데이터만 필터링하는 등 중간에 데이터를 추가/가공/정제하는 데 사용. Event의 Hedaer에 특정값을 추가하거나 Event의 Body에 데이터를 가공하는 기능으로 활용.

 

Event : 플럼에서 데이터 전송 단위. 구조는 Header와 메시지 본문인 Body로 구성.

 

filterInterceptor.type regex_filter : 정규 표현식을 이용해 수집 데이터를 필터링할 때 사용. 스마트카의 로그 파일의 내용 중간에 로그 포맷의 형식을 알리는 메타정보가 포함돼 있는데, 이 메타 정보를 수집 데이터에서 제외하고 필요한 데이터만 수집해야 하기 때문에 사용.

 

DriverCarInfo 에이전트 생성

CM 홈 → [Flume] → [구성] → [구성 파일]

 

#DriverCarInfo 리소스 변수 추가
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

#DriverCarInfo 에이전트를 위한 Source, Interceptor, Channel, Sink 정보 추가
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

#플럼에서 수집과 동시에 카프카로 전송. 
#카프카 브로커 서버가 실행중인 server02.hadoop.com:9092에 연결해서 SmartCar-Topic에 데이터를 100개의 배치 크기로 전송
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000

SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000

#DriverCarInfo의 Source와 Sink의 Channel을 앞서 정의한 DriverCarInfo_Channel로 설정해서 Source-Channel-Sink 구조 완성
SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

Source.type exec : 플럼 외부에서 수행한 명령의 결과를 플럼의 event로 가져와 수집할 수 있는 기능 제공. 스마트카 운전자의 운행 정보가 로그 시뮬레이터를 통해 /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log 에 생성되는데, 리눅스의 tail 명령을 플럼의 exec를 실행해서 운전자의 실시간 운행 정보 수집.

 

filterInterceptor.type regex_filter

 

Channel.type : Memory

 

4단계 - 카프카 기능 구현

카프카의 명령어를 이용해 카프카의 브로커 안에 앞으로 사용하게 될 토픽을 생성하고, 카프카의 Producer 명령어를 통해 토픽에 데이터 전송. 토픽에 들어간 데이터를 다시 카프카의 Consumer 명령어로 수신

 

카프카 Topic 생성

토픽 생성하는 명령어

kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic

  • --zookeeper 옵션: 카프카가 Zookeeper에 의존적이라는 것을 알 수 있음. 토픽의 메타 정보들이 Zookeeper의 Z노드라는 곳에 만들어지고 관리됨.
  • --replication-factor 옵션 : 카프카를 다중 Broker로 만들고 전송한 데이터를 --replication-factor ****개수만큼 복제하게 되는데, 현재 프로젝트에서는 단일 카프카 브로커이므로 복제개수는 1개로 설정
  • --partitions 옵션 : 해당 Topic에 데이터들이 partitions의 개수만큼 분리 저장. 다중 Broker에서 쓰기/읽기 성능 향상을 위해 사용하는 옵션. 현재는 1로 설정.
  • --topic 옵션 : 파일럿에서 사요알 토픽명 정의. 이는 플럼 구성파일에서 설정한 DriverCarInfo_KafkaSink에서 설정한 토픽 이름과 같아야 함.

특정 토픽 삭제하는 명령어

kafak-topics --delete --zookeeper server02.hadoop.com:2181 --topic 토픽명

 

“Created topic SmartCar-Topic” : 토픽이 정상적으로 생성된 것.

 

카프카 Producer 사용

kafka-console-producer --broker-list server02.hadoop.com:9092 -topic SmartCar-Topic

 

카프카 Consumer 사용

kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0 --from-beginning

Producer에서 입력했던 “Hello BigData”가 출력되는 모습

 

5단계 - 수집 기능 테스트

SmartCar 로그 시뮬레이터 작동

1. 로그 시뮬레이터를 백그라운드 방식으로 실행

2016년 1월 1일에 3대의 스마트카에 대한 상태 정보와 운전자의 운행정보가 생성되기 시작.

 

2. 파일이 생성됐는지 확인

 

3. 정상적으로 시뮬레이터가 작동하고 있는지 확인 - SmartCarStatusInfo

 

4. 정상적으로 시뮬레이터가 작동하고 있는지 확인 - SmartCarDriverInfo

5. 파일을 플럼 SmartCarInfo 에이전트의 SpoolDir 경로로 옮김

6. filezilla에서 파일을 옮긴거 확인

 

플럼 에이전트 작동

CM 홈 → [Flume] → [재시작]

 

카프카 Consumer 작동

kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0

앞서 실행한 시뮬레이터에 의해 스마트카의 운행 로그 데이터가 실시간으로 카프카에 유입됨

 

수집 기능 점검

1. 스마트카의 상태값 데이터 출력

tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

플럼의 데이터 전송 단위인 Event가 Header와 Body 구조로 분리된 것 확인 가능

 

2. 스마트카의 운전자 정보가 실시간으로 수집되고 있는지 확인하는 코드

kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0

 

3. 백그라운드로 실행했던 스마트카 로그 시뮬레이터 종료(CarLogMain, DriverLogMain의 pid 찾아 강제 종료)

 

728x90
반응형

'데이터분석' 카테고리의 다른 글

[DACOS] CH2 실습. 빅데이터 파일럿 프로젝트  (1) 2023.01.10