2.6.5.TrafficTimeseriesProcessing.md 15.1 KB
Newer Older
HooYoungAhn's avatar
HooYoungAhn committed
---
html:
  toc: true
  offline: true
export_on_save:
  html: true
---
# 실시간 시계열 교통속도 센서스트림 처리 하기 (TrafficTimeseriesProcessing)
---
교통센서로부터 동시에 무작위로 들어오는 시계열 속도 데이터를 수집하여 시간 순으로 데이터를 처리해서 내보내는 예제를 설명합니다.

## 입력 데이터 준비하기
본 예제에서는 교통센서로부터 시계열 속도 데이터(LINK_ID, 속도, 날짜)를 Kafka로 입력 받는 것을 가정합니다. 초 단위로 측정된 센서 데이터가 무작위로 들어온다고 가정합니다.

### 시계열 속도 스트림데이터 생성하기
본 예제를 위해 준비된 파일로부터 시계열 속도 데이터를 한줄씩 읽어 Kafka로 보내주는 파이썬 프로그램을 제공합니다.

입력파일은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 존재하는 ``201601_kangnam_orgarnized_new.csv`` 파일입니다. 초 단위로 측정한 LINK_ID 와 속도가 무작위로 섞여있습니다.

- 입력파일 형태 및 내용

PRCS_DATE  | LINK_ID  | PRCS_SPD
--|---|--
2016-01-01 00:00:27  | 1220034700  | 32.8
2016-01-01 00:00:12  | 1220019900  | 15.9
2016-01-01 00:00:08  | 1220025800  | 21.1   
...  | ...  | ...  


파이썬 프로그램은 Host PC의 /home/csle/ksb-csle/examples 폴더에 존재하는 kangnam_producer.py 입니다. 파이썬 프로그램이 있는 폴더로 이동하여 아래 명령어를 실행합니다.

```
cd /home/csle/ksb-csle/examples
python3 kangnam_producer.py ./input/201601_kangnam_orgarnized_new.csv localhost:9092 traffic 0.01

첫번째 arg: 보내려는 csv 파일 경로
두번째 arg: 카프카 서버 접속 주소
세번째 arg: 토픽명
네번째 arg: 각 row 를 전송하는 주기 (초 단위)
```

아래 그림과 같이 파일로 부터 데이터를 한줄씩 읽어 Kafka로 보내는 것을 확인할 수 있습니다.

![스트림 데이터 생성](./images/2.6_kafkaInput.png)


### 기타 입력파일 업로드
전처리에 필요한 파일들을 HDFS repository에 웹툴킷을 이용하여 dataset/input/traffic 위치에 업로드 합니다. 아래 파일들은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 있습니다.

- trafficStreamingSplitSample.json

![파일 업로드](./images/2.6_fileUpload.png)


##워크플로우 생성하기
워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 두 개의 엔진을 생성합니다.

- 워크플로우 속성

속성  | 값  | 비고
--|---|--
name  | TrafficTimeseriesProcessing |   워크플로우 이름
description  | 강남 교통 실시간 센서스트림 처리 예제  |  워크플로우를 설명하는 글
isBatch  | false | 스트림 처리를 하는 워크플로우 이므로, false 로 지정
verbose  | false | 디버깅을 위해 로그정보를 보고자할 경우, true 로 지정


### 첫 번째 엔진 생성하기
스트림형태로 입력되는 교통 데이터를 입력받아 실시간으로 전처리를 수행하기 위해 StreamJoin 엔진을 선택합니다. StreamJoin 엔진은 내부적으로 Spark의 <a href="https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html">Structured Streaming</a>을 사용합니다.
시간이 순차적으로 들어오지 않아도 처리 가능하고, 입력데이터의 누락없이 데이터를 실시간으로 처리합니다. Trigger는 Writer에 있으며, Trigger 시점에 들어와있는 입력데이터를 가공하여 내보냅니다.

카프카로부터 속도 데이터를 입력받아서 LINK_ID 별로 1분 동안의 속도를 평균하여 내보냅니다. 30초 단위로 오버랩하여 missing value를 채우고, 잡음을 smoothing 하는 역할을 합니다.  

- 엔진 속성

순번  | 엔진 Type | NickName  | RunType | 설명
--|---|---|---|--
1  | StreamJoin  | StreamProcessingEngine  | 즉시실행 | 1분 단위로 속도 aggregation


#### Reader
KafkaPipeReader를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092 |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181 |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic  |  Kafka 큐의 이름
addTimestamp  | false  |  입력데이터를 받은 시간을 DataFrame 에 추가하고 싶을 경우, true 로 지정
timestampName  | PRCS_DATE  | 윈도우, watermark 등을 설정할 시간 정보가 들어있는 칼럼 이름 입력
watermark  | 2 minutes  | 늦게 들어오는 입력데이터를 얼마동안 기다릴 것인지 입력
sampleJsonPath  | dataset/input/traffic/<br>trafficStreamingSplitSample.json  | 입력데이터 형태 지정
failOnDataLoss  | false  |  



#### Writer
KafkaPipeWriter를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
mode | append | 새로 들어온 입력데이터에 대해서 처리함
trigger  | 5 seconds  |  5초 동안 들어온 입력데이터에 대해서 처리함
bootStrapServers  | localhost:9092  |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181  |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output1  |  Kafka 큐의 이름
checkpointLocation  | file:///home/csle/ksb-csle/examples/<br>output/kangnam/checkpoint/kafka1  |  checkpoint 파일을 저장할 폴더 지정
failOnDataLoss  | true  |  


#### Controller
StreamingGenericController 를 선택합니다.


#### Runner
SimpleSparkRunner 를 선택합니다.
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수  
excuterCores  | 2  | 익스큐터 코어 개수  
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores  | 2  |  Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전



#### Operator
첫번째 엔진에서는 3개의 Operator를 사용하여 입력데이터를 가공합니다.


1. **GroupByOperator**

field  |value   | 설명
--|---|--
timeColName  |  PRCS_DATE |  시간정보가 들어있는 칼럼 이름
keyColName  | LINK_ID  |  key 칼럼 이름
valColName  | PRCS_SPD  | 계산할 칼럼 이름
groupby  | AVG  |  평균 계산
window  |   |  아래의 표 참고

window 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
key  | PRCS_DATE  | 윈도우를 사용할 칼럼 이름  
windowLength | 1 minutes | 1분 단위 윈도우 설정
slidingInterval  | 30 seconds  |  30초 오버랩 설정


1분 단위의 윈도우(30초 오버랩)로 LINK_ID 별 평균 속도를 계산합니다.
LINK_ID, window (start, end), PRCS_SPD 로 칼럼이 나옵니다.


2. **SelectColumnsPipeOperator**

field  |value   | 설명
--|---|--
colName | LINK_ID <br> window.start <br> PRCS_SPD |  선택할 칼럼 이름
isRename  | false  |  

LINK_ID, window.start, PRCS_SPD 칼럼을 선택합니다.


3. **RenameColumnsPipeOperator**

field  |value   | 설명
--|---|--
selectedColumn | | 아래의 표 참고

selectedColumn 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
selectedColIndex | 1 | window.start 칼럼 선택 (0번부터 시작함)
newColName  | PRCS_DATE  |  변경할 칼럼 이름
newFieldType  | STRING  |  변경할 칼럼 타입

GroupByOperator 에 의해 window로 변경된 칼럼 이름을 원래대로 바꾸고, 카프카로 시간 정보를 제대로 내보내기 위해 타입을 STRING 으로 변경합니다.


### 두 번째 엔진 생성하기
missing value를 채우고 잡음을 smoothing 하기 위해, 한번 더 카프카로부터 속도 데이터를 입력받아서 LINK_ID 별로 5분 동안의 속도를 평균하여 내보냅니다.

첫 번째 엔진과 동일하게 구성하여 속성만 다르게 지정합니다. 이를 위해 **Engine 복제** 기능을 사용하여 첫번째 엔진을 복사합니다. 첫번째 엔진을 클릭한 후 마우스 오른쪽 버튼을 클릭하면 세부메뉴가 나타납니다. 이 세부메뉴에서 **Engine 복제** 메뉴를 클릭하면 동일한 엔진이 생성됩니다.  

- 엔진 속성

순번  | 엔진 Type | NickName  | RunType | 설명
--|---|---|---|--
2  | StreamJoin  | StreamProcessing2Engine  | 즉시실행 | 5분 단위로 속도 aggregation


#### Reader
KafkaPipeReader를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092 |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181 |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output1  |  Kafka 큐의 이름
addTimestamp  | false  |  입력데이터를 받은 시간을 DataFrame 에 추가하고 싶을 경우, true 로 지정
timestampName  | PRCS_DATE  | 윈도우, watermark 등을 설정할 시간 정보가 들어있는 칼럼 이름 입력
watermark  | 2 minutes  | 늦게 들어오는 입력데이터를 얼마동안 기다릴 것인지 입력
sampleJsonPath  | dataset/input/traffic/<br>trafficStreamingSplitSample.json  | 입력데이터 형태 지정
failOnDataLoss  | false  |  


#### Writer
KafkaPipeWriter를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
mode | append | 새로 들어온 입력데이터에 대해서 처리함
trigger  | 5 seconds  |  5초 동안 들어온 입력데이터에 대해서 처리함
bootStrapServers  | localhost:9092  |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181  |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output2  |  Kafka 큐의 이름
checkpointLocation  | file:///home/csle/ksb-csle/examples/<br>output/kangnam/checkpoint/kafka2  |  checkpoint 파일을 저장할 폴더 지정
failOnDataLoss  | true  |  


#### Controller
StreamingGenericController 를 선택합니다.


#### Runner
SimpleSparkRunner 를 선택합니다.
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수  
excuterCores  | 2  | 익스큐터 코어 개수  
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores  | 2  |  Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전



#### Operator
두 번째 엔진에서는 3개의 Operator를 사용하여 입력데이터를 가공합니다.


1. **GroupByOperator**

field  |value   | 설명
--|---|--
timeColName  |  PRCS_DATE |  시간정보가 들어있는 칼럼 이름
keyColName  | LINK_ID  |  key 칼럼 이름
valColName  | PRCS_SPD  | 계산할 칼럼 이름
groupby  | AVG  |  평균 계산
window  |   |  아래의 표 참고

window 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
key  | PRCS_DATE  | 윈도우를 사용할 칼럼 이름  
windowLength | 5 minutes | 5분 단위 윈도우 설정
slidingInterval  | 5 minutes  |  오버랩 없도록 설정


5분 단위의 윈도우(오버랩 없음)로 LINK_ID 별 평균 속도를 계산합니다.
LINK_ID, window (start, end), PRCS_SPD 로 칼럼이 나옵니다.


2. **SelectColumnsPipeOperator**

field  |value   | 설명
--|---|--
colName | LINK_ID <br> window.start <br> PRCS_SPD |  선택할 칼럼 이름
isRename  | false  |  

LINK_ID, window.start, PRCS_SPD 칼럼을 선택합니다.


3. **RenameColumnsPipeOperator**

field  |value   | 설명
--|---|--
selectedColumn | | 아래의 표 참고

selectedColumn 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
selectedColIndex | 1 | window.start 칼럼 선택 (0번부터 시작함)
newColName  | PRCS_DATE  |  변경할 칼럼 이름
newFieldType  | STRING  |  변경할 칼럼 타입

GroupByOperator 에 의해 window로 변경된 칼럼 이름을 원래대로 바꾸고, 카프카로 시간 정보를 제대로 내보내기 위해 타입을 STRING 으로 변경합니다.


<br>

![워크플로우 완성 화면](./images/2.6.5_workflow.png)

ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.

## 워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 스트림 데이터를 처리하므로 Batch 체크 박스는 해제하고 워크플로우를 제출합니다.


### 워크플로우 모니터링 하기

KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에서는 실행한 워크플로우들의 목록 및 동작 상태를 확인하고 제어할 수 있습니다. 위에서 실행한 워크플로우 이름을 클릭하면 워크플로우의 각 엔진들의 동작 상태 (status) 를 확인할 수 있습니다.

![워크플로우 동작 상태 확인](./images/2.6.5_monitoring.png)


WorkFlow History 탭을 선택하면, 프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.

![워크플로우 히스토리](./images/2.6.5_monitoring_history.png)

### 체크포인트 파일 확인

첫 번째 엔진과 두 번째 엔진의 KafkaPipeWriter 가 생성한 체크포인트 파일이 지정한 위치에 생성된 것을 확인할 수 있습니다.
워크플로우 종료 후 재시작 할 경우에는 체크포인트 파일을 삭제하고 실행해야 원하는 결과를 얻을 수 있습니다.

![체크포인트 파일 확인](./images/2.6.5_checkpoint.png)

### 속도 전처리 결과 확인하기
Kafka의 consumer를 이용하여 속도 전처리 결과를 확인합니다. 이를 위해 카프카가 설치된 폴더로 이동하여 아래 명령어를 실행합니다. 카프카는 ``home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1`` 위치에 설치되어 있습니다.

```
cd /home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --topic traffic_output2
```

아래 그림과 같이 전처리한 속도값을 Kafka로 보내는 것을 확인할 수 있습니다.

![전처리 결과 확인](./images/2.6.5_predictResult.png)


## 워크플로우 종료하기
KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 TrafficTimeseriesProcessing 워크플로우를 종료(<span style="color:red">&#9724;</span>)할 수 있습니다.

## 워크플로우 저장하기
워크플로우 편집 화면에서 작성한 워크플로우를 "Save Workflow" 메뉴를 이용하여 저장합니다.