출처: 실무로 배우는 빅데이터 기술(김강원)
2단계 - 수집 환경 구성
3단계 - 플럼 수집 기능 구현
4단계 - 카프카 기능 구현
5단계 - 수집 기능 테스트
2단계 - 수집 환경 구성
플럼 설치 및 환경설정
카프카 설치
3단계 - 플럼 수집 기능 구현
스마트카의 상태 정보를 수집하는 SmartCarInfo Agent
운전자의 운행 정보를 수집하는 DriverCarInfo Agent
플럼의 에이전트를 만들려면 플럼이 인식할 수 있는 특정 디렉터리에 이름이 {Agent 고유이름}.conf 형식인 파일을 생성하면 된다. CM에서 제공하는 플럼 구성 정보 설정을 통해 에이전트 생성 가능.
SmartCar 에이전트 생성
.conf 파일 → github(https://github.com/wikibook/bigdata2nd/blob/master/CH03/예제-3.1/SmartCar_Agent.conf)
#플럼의 에이전트에서 사용할 source, channel, sink의 각 리소스 변수 정의
SmartCar_Agent.sources = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks = SmartCarInfo_LoggerSink
#에이전트의 source 설정.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000
#에이전트의 channel 설정
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity = 10000
#에이전트의 sink 설정
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger
#source와 channel sink를 연결: File -> Channel -> Sink로 이어지는 에이전트 리소스를 하나로 연결
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel #SpoolSource의 채널값 설정
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel #LoggerSink의 채널값 설정
Source.type spooldir : 지정한 특정 디렉터리를 모니터링하고 있다가 새로운 파일이 생성되면 이벤트를 감지해서 "batchSize"의 설정값만큼 읽어서 Channel에 데이터 전송
Channel.type 은 크게 memory와 type이 있음
- memory channel은 Source로부터 받은 데이터를 메모리상에 중간 적재하므로 성능 높고, 안정성 낮음
- file channel은 Source에서 전송한 데이터를 받아 로컬 파일 시스템 경로인 ‘dataDirs’에 임시로 저장했다가 Sink에게 데이터를 제공하므로 성능 낮고, 안정성 높음
LoggerSink.type logger : 수집한 데이터를 테스트 및 디버깅 목적으로 플럼의 표준 출력 로그 파일인 /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log에 출력
SmartCar 에이전트에 Interceptor 추가
#수집 데이터를 필터링하기 위해 filterInterceptor 변수 선언후 할당
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor
#정상적인 로그 데이터 발생시 14자리의 날짜 형식을 가짐.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\\\d{14} #14자리 날짜 형식으로 시작하는 데이터에 대한 정규 표현식
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false #true로 하면 반대로 제외 대상만 수집
Interceptor : Source와 Channel의 중간에서 데이터를 가공하는 역할. 플럼의 Source에서 유입되는 데이터 중 일부 데이터를 수정하거나 필요한 데이터만 필터링하는 등 중간에 데이터를 추가/가공/정제하는 데 사용. Event의 Hedaer에 특정값을 추가하거나 Event의 Body에 데이터를 가공하는 기능으로 활용.
Event : 플럼에서 데이터 전송 단위. 구조는 Header와 메시지 본문인 Body로 구성.
filterInterceptor.type regex_filter : 정규 표현식을 이용해 수집 데이터를 필터링할 때 사용. 스마트카의 로그 파일의 내용 중간에 로그 포맷의 형식을 알리는 메타정보가 포함돼 있는데, 이 메타 정보를 수집 데이터에서 제외하고 필요한 데이터만 수집해야 하기 때문에 사용.
DriverCarInfo 에이전트 생성
#DriverCarInfo 리소스 변수 추가
SmartCar_Agent.sources = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink
#DriverCarInfo 에이전트를 위한 Source, Interceptor, Channel, Sink 정보 추가
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false
#플럼에서 수집과 동시에 카프카로 전송.
#카프카 브로커 서버가 실행중인 server02.hadoop.com:9092에 연결해서 SmartCar-Topic에 데이터를 100개의 배치 크기로 전송
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000
SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000
#DriverCarInfo의 Source와 Sink의 Channel을 앞서 정의한 DriverCarInfo_Channel로 설정해서 Source-Channel-Sink 구조 완성
SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel
Source.type exec : 플럼 외부에서 수행한 명령의 결과를 플럼의 event로 가져와 수집할 수 있는 기능 제공. 스마트카 운전자의 운행 정보가 로그 시뮬레이터를 통해 /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log 에 생성되는데, 리눅스의 tail 명령을 플럼의 exec를 실행해서 운전자의 실시간 운행 정보 수집.
filterInterceptor.type regex_filter
Channel.type : Memory
4단계 - 카프카 기능 구현
카프카의 명령어를 이용해 카프카의 브로커 안에 앞으로 사용하게 될 토픽을 생성하고, 카프카의 Producer 명령어를 통해 토픽에 데이터 전송. 토픽에 들어간 데이터를 다시 카프카의 Consumer 명령어로 수신
카프카 Topic 생성
토픽 생성하는 명령어
kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic
- --zookeeper 옵션: 카프카가 Zookeeper에 의존적이라는 것을 알 수 있음. 토픽의 메타 정보들이 Zookeeper의 Z노드라는 곳에 만들어지고 관리됨.
- --replication-factor 옵션 : 카프카를 다중 Broker로 만들고 전송한 데이터를 --replication-factor ****개수만큼 복제하게 되는데, 현재 프로젝트에서는 단일 카프카 브로커이므로 복제개수는 1개로 설정
- --partitions 옵션 : 해당 Topic에 데이터들이 partitions의 개수만큼 분리 저장. 다중 Broker에서 쓰기/읽기 성능 향상을 위해 사용하는 옵션. 현재는 1로 설정.
- --topic 옵션 : 파일럿에서 사요알 토픽명 정의. 이는 플럼 구성파일에서 설정한 DriverCarInfo_KafkaSink에서 설정한 토픽 이름과 같아야 함.
특정 토픽 삭제하는 명령어
kafak-topics --delete --zookeeper server02.hadoop.com:2181 --topic 토픽명
카프카 Producer 사용
kafka-console-producer --broker-list server02.hadoop.com:9092 -topic SmartCar-Topic
카프카 Consumer 사용
kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0 --from-beginning
5단계 - 수집 기능 테스트
SmartCar 로그 시뮬레이터 작동
1. 로그 시뮬레이터를 백그라운드 방식으로 실행
2. 파일이 생성됐는지 확인
3. 정상적으로 시뮬레이터가 작동하고 있는지 확인 - SmartCarStatusInfo
4. 정상적으로 시뮬레이터가 작동하고 있는지 확인 - SmartCarDriverInfo
5. 파일을 플럼 SmartCarInfo 에이전트의 SpoolDir 경로로 옮김
6. filezilla에서 파일을 옮긴거 확인
플럼 에이전트 작동
카프카 Consumer 작동
kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0
앞서 실행한 시뮬레이터에 의해 스마트카의 운행 로그 데이터가 실시간으로 카프카에 유입됨
수집 기능 점검
1. 스마트카의 상태값 데이터 출력
tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
2. 스마트카의 운전자 정보가 실시간으로 수집되고 있는지 확인하는 코드
kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0
3. 백그라운드로 실행했던 스마트카 로그 시뮬레이터 종료(CarLogMain, DriverLogMain의 pid 찾아 강제 종료)
'데이터분석' 카테고리의 다른 글
[DACOS] CH2 실습. 빅데이터 파일럿 프로젝트 (1) | 2023.01.10 |
---|