---
html:
toc: true
offline: true
export_on_save:
html: true
---
# 교통속도 스트림 데이터 전처리하기 (TrafficPreprocessing)
---
교통센서로부터 시계열 속도 데이터를 수집하여 전처리를 수행하고, 파일로 저장하는 예제를 설명합니다.
## 입력 데이터 준비하기
본 예제에서는 교통센서로부터 시계열 속도 데이터(LINK_ID, 속도, 날짜)를 Kafka로 입력 받는 것을 가정합니다. 5분 단위로 측정된 센서 데이터가 시간 순으로 들어온다고 가정합니다.
### 시계열 속도 스트림데이터 생성하기
본 예제를 위해 준비된 파일로부터 시계열 속도 데이터를 한줄씩 읽어 Kafka로 보내주는 파이썬 프로그램을 제공합니다.
입력파일은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 존재하는 ``201601_kangnam.csv`` 파일입니다. 5분 단위로 LINK_ID 와 속도가 시간순으로 정렬되어 있습니다.
- 입력파일 형태 및 내용
PRCS_DATE | LINK_ID | PRCS_SPD
--|---|--
2016-01-01 00:00:00 | 1210006200 | 61
2016-01-01 00:00:00 | 1210006300 | 66.8
2016-01-01 00:00:00 | 1210011200 | 49.8
... | ... | ...
파이썬 프로그램은 Host PC의 /home/csle/ksb-csle/examples 폴더에 존재하는 kangnam_producer.py 입니다. 파이썬 프로그램이 있는 폴더로 이동하여 아래 명령어를 실행합니다.
```
cd /home/csle/ksb-csle/examples
python3 kangnam_producer.py ./input/201601_kangnam.csv localhost:9092 traffic 0.01
첫번째 arg: 보내려는 csv 파일 경로
두번째 arg: 카프카 서버 접속 주소
세번째 arg: 토픽명
네번째 arg: 각 row 를 전송하는 주기 (초 단위)
```
아래 그림과 같이 파일로 부터 데이터를 한줄씩 읽어 Kafka로 보내는 것을 확인할 수 있습니다.
![스트림 데이터 생성](./images/2.6_kafkaInput_sorted.png)
### 기타 입력파일 업로드
전처리에 필요한 파일들을 HDFS repository에 웹툴킷을 이용하여 dataset/input/traffic 위치에 업로드 합니다. 아래 파일들은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 있습니다.
- traffic_kangnam_clos2.txt
![파일 업로드](./images/2.6_fileUpload.png)
## 워크플로우 생성하기
워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 하나의 엔진을 생성합니다.
- 워크플로우 속성
속성 | 값 | 비고
--|---|--
name | TrafficPreprocessing | 워크플로우 이름
description | 강남 교통 전처리 예제 | 워크플로우를 설명하는 글
isBatch | false | 스트림 처리를 하는 워크플로우 이므로, false 로 지정
verbose | false | 디버깅을 위해 로그정보를 보고자할 경우, true 로 지정
### 엔진 생성하기
스트림형태로 입력되는 속도 데이터를 입력 받아 가공한 뒤 배치형태의 파일로 내보내므로 StreamToBatch엔진을 선택합니다.
- 엔진 속성
순번 | 엔진 Type | NickName | RunType | 설명
--|---|---|---|--
1 | StreamToBatch | PreprocessingEngine | 즉시실행 | 딥러닝 모델 학습을 위한 전처리
#### Reader
KafkaReader를 선택하고 아래표와 같은 속성을 지정합니다.
field |value | 설명
--|---|--
bootStrapServers | localhost:9092 | Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect | csle1:2181 | zookeeper의 접속주소(IP, 포트번호)
groupId | traffic | Kafka topic 그룹 아이디
topic | traffic | Kafka 큐의 이름
#### Writer
FileWriter를 선택하고 아래표와 같은 속성을 지정합니다.
field |value | 설명
--|---|--
filePath | file:///home/csle/ksb-csle/examples/output/traffic_processing.csv | 출력 파일 경로
fileType | CSV | 출력 파일 타입
delimiter | , | 구분자
header | false | 헤더 포함 유무
saveMode | APPEND | 파일 저장 방식
- 출력 파일 경로 지정
HDFS 의 상대경로를 지정할 경우, 상대경로 앞에 **사용자 기본 파일경로** 가 워크플로우 실행 시 동적으로 생성되어 적용됩니다. **사용자 기본 파일경로** 는 ``hdfs://{host}:{port}/user/{user_id}/applications/{workflow_id}/output/`` 입니다.
스트림 처리를 통해 주기적으로 파일을 생성하는 경우 HDFS 를 지원하지 않습니다. 따라서 본 예제에서는 로컬파일 시스템 경로를 사용합니다. 로컬파일 시스템 경로를 지정할 경우, ``file:///`` 키워드를 앞에 붙여서 사용합니다.
#### Controller
SparkStreamController를 선택하고 아래표와 같은 속성을 지정합니다.
SparkStreamController는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다.
field |value | 설명
--|---|--
operationPeriod | 10 | Reader로 부터 데이터를 읽어올 주기, 초 단위
windowSize | 1530 | 큐에서 사용할 윈도우 크기
170 x 9
slidingSize | 1360 | 윈도우를 sliding 하는 크기
170 x 8
본 예제에서는 10초 주기마다 1530 개의 데이터를 잘라서 operator에게 전달합니다. 입력데이터는 총 170개의 LINK_ID 를 가집니다. 따라서 5분 간격으로 170개의 데이터가 들어오는 셈입니다. 그런데 스트림 데이터는 계속해서 들어오고 있으므로 첫 시작 데이터가 어디일지 가늠하기 어렵습니다. 따라서 필요한 8 주기 보다 한 주기를 더 가져와서 처리하고 (170 x 9 = 1530 개), 8 주기 씩 슬라이딩 합니다 (170 x 8 = 1360 개). 아래에서 설명할 GroupByFilterOperator 및 OrderByFilterOperator 를 이용하여 170 x 8 = 1360 개 데이터를 필터링해서 사용합니다.
#### Runner
SimpleSparkRunner 를 선택합니다.
field |value | 설명
--|---|--
inJason | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs | | 아래의 표 참고
sparkArgs 설정은 다음과 같이 합니다.
field |value | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수
excuterCores | 2 | 익스큐터 코어 개수
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores | 2 | Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전
#### Operator
총 6 개의 Operator를 사용하여 입력데이터를 가공합니다.
1. **ColumnSelectOperator**
field |value | 설명
--|---|--
selectedColumnName | PRCS_DATE
LINK_ID
PRCS_SPD | 선택할 칼럼 이름
2. **GroupByFilterOperator**
field |value | 설명
--|---|--
keyColName | PRCS_DATE | 총계처리를 할 key 칼럼 이름
groupby | COUNT | 총계처리 방법 선택
condition | EQUAL | 총계처리 후 Filter 방법 선택
value | 170 | Filter 의 값 입력
1530 개 데이터에서 LINK_ID 가 170 개가 되지 않는 시간의 데이터를 버리기 위해서, PRCS_DATE 칼럼을 COUNT 하여 170 이 되는 조건의 데이터만 남깁니다. 그러면 170 x 8 = 1360 개의 데이터만 남습니다. 그런데 우연히 데이터를 시작시점부터 잘 받았을 경우, 170 x 9 = 1530 개의 데이터가 남습니다.
3. **OrderByFilterOperator**
field |value | 설명
--|---|--
keyColName | PRCS_DATE | 정렬을 할 key 칼럼 이름
method | ASC | 정렬 방법 선택 (ASC: 오름차순, DESC: 내림차순)
value | 1360 | 선택할 Row 의 개수
PRCS_DATE 칼럼을 정렬하여 8 주기의 데이터 (170 x 8 = 1360 개) 만 남깁니다.
4. **MinMaxScalingOperator**
field |value | 설명
--|---|--
selectedColumnId | 2 | 선택할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
max | 0.5 | 데이터가 scaling 될 때 최대값
min | -0.5 | 데이터가 scaling 될 때 최소값
withMinMaxRange | true | 위에서 설정한 max, min 값 사용 여부
maxRealValue | 100 | 데이터에서 정상 범위로 사용할 최소값
minRealValue | 0 | 데이터에서 정상 범위로 사용할 최대값
5. **PivotOerator**
field |value | 설명
--|---|--
selectedColumnId | 1 | Pivot 할 칼럼 ID, LINK_ID 선택 (0부터 시작)
groupByColumn | 0 | 총계처리를 할 key 칼럼 ID, PRCS_DATE 선택 (0부터 시작)
valueColumn | 2 | 총계처리를 할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
method | AVG | 총계처리 방법 선택
시간 및 170 개의 LINK_ID 값을 칼럼으로 가지고, 시간에 따른 평균속도를 행으로 가지도록 DataFrame 을 변경합니다.
![Pivot 결과](./images/2.6_pivotResult.png)
6. **ColumnSelectWithFileOperator**
field |value | 설명
--|---|--
columnIdPath | dataset/input/traffic/
traffic_kangnam_cols2.txt | 선택할 칼럼 이름이 정의된 텍스트파일 경로 및 이름 입력
텐서플로우 기반 속도 예측 모델의 입력 순서에 맞도록 칼럼을 선택합니다. 본 예제에서는 칼럼의 개수가 많으므로 ColumnSelectOperator 를 사용하여 선택할 칼럼 이름을 일일이 입력하는 것이 매우 번거로우므로 ColumnSelectWithFileOperator 를 사용합니다. 칼럼 이름을 쉼표(,) 로 구분하여 입력한 텍스트파일로 부터 칼럼 이름을 읽어들여 칼럼을 선택합니다.
![워크플로우 완성 화면](./images/2.6.1_workflow.png)
ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.
## 워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 스트림 데이터를 처리하므로 Batch 체크 박스는 해제하고 워크플로우를 제출합니다.
### 워크플로우 모니터링 하기
KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에서는 실행한 워크플로우들의 목록 및 동작 상태를 확인하고 제어할 수 있습니다. 위에서 실행한 워크플로우 이름을 클릭하면 워크플로우의 각 엔진들의 동작 상태 (status) 를 확인할 수 있습니다.
![워크플로우 동작 상태 확인](./images/2.6.1_monitoring.png)
WorkFlow History 탭을 선택하면, 프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.
![워크플로우 히스토리](./images/2.6.1_monitoring_history.png)
### 결과 파일 확인
FileWriter 에서 지정한 위치(home/csle/ksb-csle/examples/output)에 결과 파일이 생성된 것을 확인할 수 있습니다.
![결과 파일 확인](./images/2.6.1_resultFile.png)
"APPEND" 모드로 파일을 저장하는 경우, 기존의 데이터에 새로 가공한 데이터가 추가되어 저장됩니다. 그러나 시간 순으로 정렬이 되어 추가되지 않으므로 데이터를 활용할 때 주의가 필요합니다.
## 워크플로우 종료하기
KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 TrafficPreprocessing 워크플로우를 종료(◼)할 수 있습니다.
## 워크플로우 저장하기
워크플로우 편집 화면에서 작성한 워크플로우를 "Save Workflow" 메뉴를 이용하여 저장합니다.