2.6.7.TrafficEndToEnd.md 20.2 KB
Newer Older
HooYoungAhn's avatar
HooYoungAhn committed
---
html:
  toc: true
  offline: true
export_on_save:
  html: true
---
# 실시간 시계열 교통속도 센서스트림 처리/학습/예측 및 주기적 모델 갱신하기 (TrafficEndToEnd)
---
교통센서로부터 시계열 속도 데이터를 수집한 후 전처리를 수행하고, 텐서플로우 기반 딥러닝 모델에 의해 예측된 속도를 클라이언트로 전달하는 한편, 전처리한 속도 데이터를 계속해서 파일에 저장한 후 텐서플로우 기반 딥러닝 모델을 주기적으로 학습하는 예제를 설명합니다.

<br>

![교통 시나리오](./images/2.6.7_TrafficSenario.png)

<br>

본 예제는 <a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.6.TrafficStreamingPredict.md">실시간 시계열 교통속도 센서스트림 처리 및 텐서플로우 모델 서빙하기</a> 매뉴얼에 따라 워크플로우를 작성하였다고 가정하고 이어서 설명을 합니다.  
HooYoungAhn's avatar
HooYoungAhn committed


## 입력 데이터 준비하기
본 예제에서는 교통센서로부터 시계열 속도 데이터(LINK_ID, 속도, 날짜)를 Kafka로 입력 받는 것을 가정합니다. 초 단위로 측정된 센서 데이터가 무작위로 들어온다고 가정합니다. 또한 텐서플로우 기반의 15분 뒤 속도를 예측하는 모델이 파이썬 코드로 구현되어 있다고 가정합니다.

### 시계열 속도 스트림데이터 생성하기
본 예제를 위해 준비된 파일로부터 시계열 속도 데이터를 한줄씩 읽어 Kafka로 보내주는 파이썬 프로그램을 제공합니다.

입력파일은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 존재하는 ``201601_kangnam_orgarnized_new.csv`` 파일입니다. 초 단위로 측정한 LINK_ID 와 속도가 무작위로 섞여있습니다.

- 입력파일 형태 및 내용

PRCS_DATE  | LINK_ID  | PRCS_SPD
--|---|--
2016-01-01 00:00:27  | 1220034700  | 32.8
2016-01-01 00:00:12  | 1220019900  | 15.9
2016-01-01 00:00:08  | 1220025800  | 21.1   
...  | ...  | ...  


파이썬 프로그램은 Host PC의 /home/csle/ksb-csle/examples 폴더에 존재하는 kangnam_producer.py 입니다. 파이썬 프로그램이 있는 폴더로 이동하여 아래 명령어를 실행합니다.

```
cd /home/csle/ksb-csle/examples
python3 kangnam_producer.py ./input/201601_kangnam_orgarnized_new.csv localhost:9092 traffic 0.01

첫번째 arg: 보내려는 csv 파일 경로
두번째 arg: 카프카 서버 접속 주소
세번째 arg: 토픽명
네번째 arg: 각 row 를 전송하는 주기 (초 단위)
```

아래 그림과 같이 파일로 부터 데이터를 한줄씩 읽어 Kafka로 보내는 것을 확인할 수 있습니다.

![스트림 데이터 생성](./images/2.6_kafkaInput.png)

### 파이썬 코드 업로드
Host PC의 /home/csle/ksb-csle/components/src/main/python/kangnam 폴더를 HDFS repository에 웹툴킷을 이용하여 업로드 합니다.
dataset/tensorflowTrainSource 위치에 폴더를 업로드 합니다.

![코드 업로드](./images/2.6.2_codeUpload.png)

![코드 업로드](./images/2.6.2_codeUpload2.png)

`tensorflow_train.py` 파일은 텐서플로우 기반의 15분 뒤 속도를 예측하는 모델을 학습하는 코드가 구현되어 있는 파일입니다. (본 예제에서는 LSTM 알고리즘을 이용하여 15분 뒤 속도를 예측하는 모델을 사용합니다.) 프레임워크와 연동하여 학습이 되도록 파이썬 코드에 input 과 output 경로 등을 argument 로 받는 부분이 코딩되어야 합니다. 자세한 내용은 <a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.7.2.KSB_TfPyCoding_Guide.md">KSB 프레임워크과 연동을 위한 tensorflow 학습코드 작성하기</a> 를 참고합니다.
HooYoungAhn's avatar
HooYoungAhn committed

### 학습된 모델 업로드
<a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.2.TrafficTraining.md">교통속도예측 텐서플로우 모델 학습하기</a> 매뉴얼에서 학습한 후 export 된 모델이 ``hdfs://csle1:9000/user/ksbuser_etri_re_kr/model/kangnam`` 위치에 저장되어 있을 경우, 아래 과정을 생략합니다.
HooYoungAhn's avatar
HooYoungAhn committed
그렇지 않을 경우, Host PC의 /home/csle/ksb-csle/examples/models/kangnam/model 폴더에 있는 텐서플로우 서빙용으로 export 한 모델을 HDFS repository에 웹툴킷을 이용하여 업로드합니다.
model/kangnam 위치에 model 폴더를 업로드 합니다.

![모델 업로드](./images/2.6.6_modelUpload.png)

모델이 아래 폴더 구조로 업로드 되어야 합니다.

![모델 업로드](./images/2.6.6_modelUpload2.png)


### 기타 입력파일 업로드
그 밖에 필요한 파일들을 HDFS repository에 웹툴킷을 이용하여 dataset/input/traffic 위치에 업로드 합니다. 아래 파일들은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 있습니다.
- traffic_kangnam_cols.txt
- traffic_kangnam_clos2.txt
- trafficStreamingSplitSample.json

![파일 업로드](./images/2.6_fileUpload.png)


## 워크플로우 생성하기
HooYoungAhn's avatar
HooYoungAhn committed
워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 다섯 개의 엔진을 생성합니다.

- 워크플로우 속성

속성  | 값  | 비고
--|---|--
name  | TrafficEndToEnd | 워크플로우 이름   
description  | 강남 교통 예측 및 학습 시나리오  | 워크플로우를 설명하는 글
isBatch  | false | 스트림 처리를 하는 워크플로우 이므로, false 로 지정
verbose  | false | 디버깅을 위해 로그정보를 보고자할 경우, true 로 지정

- 엔진 속성

순번  | 엔진 Type | NickName  | RunType | 설명
--|---|---|---|--
1  | StreamJoin  | StreamProcessingEngine  | 즉시실행 | 1분 단위로 속도 aggregation
2  | StreamJoin  | StreamProcessing2Engine  | 즉시실행 | 5분 단위로 속도 aggregation
3  | StreamToStream  | PredictEngine  | 즉시실행| 속도 전처리 후 딥러닝 모델 기반 예측
4  | StreamToBatch  | PreprocessingEngine  | 즉시실행  | 딥러닝 모델 학습을 위한 전처리
5  | Batch  | TrainEngine  | 반복실행  | 딥러닝 모델 학습

앞의 세 개 엔진은 <a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.6.TrafficStreamingPredict.md">실시간 시계열 교통속도 센서스트림 처리 및 텐서플로우 모델 서빙하기</a> 매뉴얼을 참고하여 생성합니다. 워크플로우 편집화면 상단의 **Load Workflow** 메뉴를 이용하여 *TrafficStreamingPredict* 워크플로우를 불러옵니다.
HooYoungAhn's avatar
HooYoungAhn committed

![워크플로우 불러오기](./images/2.6.7_workflowLoad.png)

또는 **Workflow instances** 화면에서 *TrafficStreamingPredict* 워크플로우를 선택한 후 편집화면으로 드래그하여 워크플로우를 재사용할 수도 있습니다.

![워크플로우 재사용하기](./images/2.6.7_workflowInstances.png)



### 두 번째 엔진 수정하기
KafkaPipeWriter를 하나 더 추가합니다. StreamJoin 엔진은 여러 개의 Reader 와 Writer 를 가질 수 있습니다.


#### Writer
KafkaPipeWriter를 선택하고 아래표와 같은 속성을 지정합니다.
HooYoungAhn's avatar
HooYoungAhn committed
field  |value   | 설명
--|---|--
mode | append | 새로 들어온 입력데이터에 대해서 처리함
trigger  | 5 seconds  |  5초 동안 들어온 입력데이터에 대해서 처리함
bootStrapServers  | localhost:9092  |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181  |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output2_copy  |  Kafka 큐의 이름
checkpointLocation  | file:///home/csle/ksb-csle/examples/output/<br>kangnam/checkpoint/kafka2_copy  |  checkpoint 파일을 저장할 폴더 지정
failOnDataLoss  | true  |  



### 네 번째 엔진 생성하기
스트림형태로 입력되는 속도 데이터를 입력 받아 가공한 뒤 배치형태의 파일로 내보내므로 StreamToBatch엔진을 선택합니다.

#### Reader
KafkaReader를 선택하고 아래표와 같은 속성을 지정합니다.

field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092  | Kafka 접속 주소(IP, 포트번호)  
zooKeeperConnect  | csle1:2181 |  zookeeper의 접속주소(IP, 포트번호)  
groupId  | traffic  | Kafka topic 그룹 아이디   
topic  | traffic_output2_copy | Kafka 큐의 이름


#### Writer
FileWriter를 선택하고 아래표와 같은 속성을 지정합니다.

field  |value   | 설명
--|---|--
filePath  | file:///home/csle/ksb-csle/examples/output/traffic_processing.csv  | 출력 파일 경로  
fileType  | CSV  |  출력 파일 타입
delimiter  | ,  |  구분자
header  | false  |  헤더 포함 유무
saveMode  | APPEND  |  파일 저장 방식

- 출력 파일 경로 지정
HDFS 의 상대경로를 지정할 경우, 상대경로 앞에 **사용자 기본 파일경로** 가 워크플로우 실행 시 동적으로 생성되어 적용됩니다. **사용자 기본 파일경로**``hdfs://{host}:{port}/user/{user_id}/applications/{workflow_id}/output/`` 입니다.
스트림 처리를 통해 주기적으로 파일을 생성하는 경우 HDFS 를 지원하지 않습니다. 따라서 본 예제에서는 로컬파일 시스템 경로를 사용합니다. 로컬파일 시스템 경로를 지정할 경우, ``file:///`` 키워드를 앞에 붙여서 사용합니다.

#### Controller
SparkStreamController를 선택하고 아래표와 같은 속성을 지정합니다.
SparkStreamController는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다.

field  |value   | 설명
--|---|--
operationPeriod  |  10 | Reader로 부터 데이터를 읽어올 주기, 초 단위
windowSize  | 1530  | 큐에서 사용할 윈도우 크기 <br> 170 x 9  
slidingSize  |  1360 | 윈도우를 sliding 하는 크기 <br> 170 x 8  

본 예제에서는 10초 주기마다 1530 개의 데이터를 잘라서 operator에게 전달합니다. 입력데이터는 총 170개의 LINK_ID 를 가집니다. 따라서 5분 간격으로 170개의 데이터가 들어오는 셈입니다. 그런데 스트림 데이터는 계속해서 들어오고 있으므로 첫 시작 데이터가 어디일지 가늠하기 어렵습니다. 따라서 필요한 8 주기 보다 한 주기를 더 가져와서 처리하고 (170 x 9 = 1530 개), 8 주기 씩 슬라이딩 합니다 (170 x 8 = 1360 개). 아래에서 설명할 GroupByFilterOperator 및 OrderByFilterOperator 를 이용하여 170 x 8 = 1360 개 데이터를 필터링해서 사용합니다.

#### Runner
SimpleSparkRunner 를 선택합니다.
HooYoungAhn's avatar
HooYoungAhn committed
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
HooYoungAhn's avatar
HooYoungAhn committed
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수  
excuterCores  | 2  | 익스큐터 코어 개수  
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores  | 2  |  Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전



#### Operator
세 번째 엔진에서는 6개의 Operator를 사용하여 입력데이터를 가공합니다.


1. **ColumnSelectOperator**

field  |value   | 설명
--|---|--
selectedColumnName | PRCS_DATE <br> LINK_ID <br> PRCS_SPD |  선택할 칼럼 이름

2. **GroupByFilterOperator**

field  |value   | 설명
--|---|--
keyColName  | PRCS_DATE  |  총계처리를 할 key 칼럼 이름
groupby  | COUNT  |  총계처리 방법 선택
condition  | EQUAL  |  총계처리 후 Filter 방법 선택
value  | 170  | Filter 의 값 입력

1530 개 데이터에서 LINK_ID 가 170 개가 되지 않는 시간의 데이터를 버리기 위해서, PRCS_DATE 칼럼을 COUNT 하여 170 이 되는 조건의 데이터만 남깁니다. 그러면 170 x 8 = 1360 개의 데이터만 남습니다. 그런데 우연히 데이터를 시작시점부터 잘 받았을 경우, 170 x 9 = 1530 개의 데이터가 남습니다.

3. **OrderByFilterOperator**

field  |value   | 설명
--|---|--
keyColName  | PRCS_DATE  |  정렬을 할 key 칼럼 이름
method  | ASC  |  정렬 방법 선택 (ASC: 오름차순, DESC: 내림차순)
value  | 1360  | 선택할 Row 의 개수

PRCS_DATE 칼럼을 정렬하여 8 주기의 데이터 (170 x 8 = 1360 개) 만 남깁니다.

4. **MinMaxScalingOperator**

field  |value   | 설명
--|---|--
selectedColumnId |  2 |  선택할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
max  | 0.5  |  데이터가 scaling 될 때 최대값
min  |  -0.5 |  데이터가 scaling 될 때 최소값
withMinMaxRange  |  true | 위에서 설정한 max, min 값 사용 여부
maxRealValue  | 100  |  데이터에서 정상 범위로 사용할 최소값
minRealValue  |  0 |   데이터에서 정상 범위로 사용할 최대값


5. **PivotOerator**

field  |value   | 설명
--|---|--
selectedColumnId  | 1  | Pivot 할 칼럼 ID, LINK_ID 선택 (0부터 시작)
groupByColumn  | 0  |  총계처리를 할 key 칼럼 ID, PRCS_DATE 선택 (0부터 시작)
valueColumn  | 2  |  총계처리를 할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
method  | AVG  |  총계처리 방법 선택

시간 및 170 개의 LINK_ID 값을 칼럼으로 가지고, 시간에 따른 평균속도를 행으로 가지도록 DataFrame 을 변경합니다.   

![Pivot 결과](./images/2.6_pivotResult.png)

6. **ColumnSelectWithFileOperator**

field  |value   | 설명
--|---|--
columnIdPath  | dataset/input/traffic/<br>traffic_kangnam_cols2.txt  |  선택할 칼럼 이름이 정의된 텍스트파일 경로 및 이름 입력

텐서플로우 기반 속도 예측 모델의 입력 순서에 맞도록 칼럼을 선택합니다. 본 예제에서는 칼럼의 개수가 많으므로 ColumnSelectOperator 를 사용하여 선택할 칼럼 이름을 일일이 입력하는 것이 매우 번거로우므로 ColumnSelectWithFileOperator 를 사용합니다. 칼럼 이름을 쉼표(,) 로 구분하여 입력한 텍스트파일로 부터 칼럼 이름을 읽어들여 칼럼을 선택합니다.  


### 다섯 번째 엔진 생성하기
텐서플로우를 이용하여 배치 형태로 딥러닝 모델을 생성하기 위해 Batch 엔진을 선택합니다. 다섯 번째 엔진은 딥러닝 모델을 주기적으로 생성하기 위해 **RUNTYPE****반복 실행** 으로 설정합니다. **주기****매분** 으로 설정합니다. 매분은 테스트용으로 5분 단위로 엔진이 submit 되어 실행됩니다.

![RunType 설정](./images/2.6.7_runTypeSetting.png)

#### Reader
FileReader를 선택하고 아래표와 같은 속성을 지정합니다. 앞의 엔진에서 출력한 파일을 지정합니다.

field  |value   | 설명
--|---|--
filePath  | file:///home/csle/ksb-csle/examples/output/traffic_processing.csv |  파일 경로  
fileType  |  CSV |  파일 타입
delimiter  |  , |  구분자
header  |  false |  header 포함 유무
saveMode  |   | 사용 하지 않음


#### Writer
FileWriter를 선택하고 아래표와 같은 속성을 지정합니다. 학습한 딥러닝 모델이 최종 저장될 폴더를 지정합니다. 다른 엔진에서 학습한 딥러닝 모델을 사용하도록 하기 위해 HDFS 절대경로를 지정합니다. 상대경로를 지정할 경우, 동적으로 생성한 **사용자 기본 파일경로** 가 앞에 추가되므로 다른 엔진에서 정확한 파일 경로를 지정하는 것이 어렵습니다.   

field  |value   | 설명
--|---|--
filePath  | hdfs://csle1:9000/user/ksbuser_etri_re_kr/model/kangnam |  저장 경로  
fileType  |  CSV |  파일 타입
delimiter  |  , |  구분자
header  |  false |  header 포함 유무
saveMode  |   | 사용 하지 않음

#### Controller
Controller는 외부 Tensorflow 호출하기 위해 ExternalAnalysisController을 선택합니다.

#### Runner
TensorflowRunner를 선택합니다.

field  |value   | 설명
--|---|--
pyEntryPath  | dataset/tensorflowTrainSource/kangnam/tensorflow_train.py  |  파이썬 코드 경로
cluster |  false |  cluster 환경 실행 여부
inJson  | false  |  Json 형태 파라미터 전달 유무
tfVersion  |  r1.6 |  Tensorflow 버전 정보


#### Operator
DLTrainOperator를 선택하고 아래표와 같은 속성을 지정합니다.  

field  |value   | 설명
--|---|--
modelPath  | file:///home/csle/ksb-csle/examples/output/kangnam/model |  내부적으로 학습된 모델을 저장할 경로 <br> 반드시 로컬 경로만 지정
additionalParams  |   |  아래의 표 참고

additionalParams 설정은 [+] 버튼을 두 번 클릭하여 다음과 같이 합니다. 파이썬 코드에서 argument 로 받는 부분입니다.

field  |value   | 설명
--|---|--
paramName  | isTrain  |  학습 유무
paramValue  | True  |  

field  |value   | 설명
--|---|--
paramName  | num_epochs  |  학습 반복 횟수
paramValue  | 2  |  

<br>
<br>

![워크플로우 완성 화면](./images/2.6.7_workflow.png)

ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.

## 워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 스트림 데이터를 처리하므로 Batch 체크 박스는 해제하고 워크플로우를 제출합니다.


### 워크플로우 모니터링 하기

KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에서는 실행한 워크플로우들의 목록 및 동작 상태를 확인하고 제어할 수 있습니다. 위에서 실행한 워크플로우 이름을 클릭하면 워크플로우의 각 엔진들의 동작 상태 (status) 를 확인할 수 있습니다.

![워크플로우 동작 상태 확인](./images/2.6.7_monitoring.png)

다섯 번째 엔진은 주기적으로 (5분 마다) 실행되므로, 동작 상태(status) 가 완료(Completed)였다가 실행 중일 때는 실행 중(Inprogress) 으로 바뀌는 것을 확인할 수 있습니다.

![워크플로우 동작 상태 확인](./images/2.6.7_monitoring2.png)


WorkFlow History 탭을 선택하면, 프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.
그리고 본 예제에서는 다섯 번째 엔진이 5분 마다 동작한 내역을 확인할 수 있습니다.

![워크플로우 히스토리](./images/2.6.7_monitoring_history.png)


### 학습한 후 export 된 모델 확인하기  
다섯 번째 엔진의 FileWriter 에서 지정한 위치에 학습된 모델이 생성된 것을 확인합니다. 다섯 번째 엔진이 5분 마다 실행되어 모델을 주기적으로 생성하는 것을 확인할 수 있습니다. 모델의 버전은 0000 부터 자동으로 1씩 증가합니다.
본 예제에서는 **학습된 모델 업로드** 에서 올려둔 0001 버전 이후부터 자동으로 모델이 생성된 것을 확인할 수 있습니다.

![학습된 모델 확인](./images/2.6.7_modelSave.png)

### 체크포인트 파일 확인

첫 번째 엔진과 두 번째 엔진의 KafkaPipeWriter 가 생성한 체크포인트 파일이 지정한 위치에 생성된 것을 확인할 수 있습니다.
워크플로우 종료 후 재시작 할 경우에는 체크포인트 파일을 삭제하고 실행해야 원하는 결과를 얻을 수 있습니다.

![체크포인트 파일 확인](./images/2.6.7_checkpoint.png)

### 속도 예측 결과 확인하기
Kafka의 consumer를 이용하여 속도 예측 결과를 확인합니다. 이를 위해 카프카가 설치된 폴더로 이동하여 아래 명령어를 실행합니다. 카프카는 ``home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1`` 위치에 설치되어 있습니다.

```
cd /home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --topic kangnam_output
```

아래 그림과 같이 예측된 속도값을 Kafka로 보내는 것을 확인할 수 있습니다.

![예측 결과 확인](./images/2.6_predictResult.png)


## 워크플로우 종료하기
KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 TrafficEndToEnd 워크플로우를 종료(<span style="color:red">&#9724;</span>)할 수 있습니다.