2.6.6.TrafficStreamingPredict.md 23.4 KB
Newer Older
HooYoungAhn's avatar
HooYoungAhn committed
---
html:
  toc: true
  offline: true
export_on_save:
  html: true
---
# 실시간 시계열 교통속도 센서스트림 처리 및 텐서플로우 모델 서빙하기 (TrafficStreamingPredict)
---
교통센서로부터 시계열 속도 데이터를 수집한 후 전처리를 수행하고, 텐서플로우 기반 딥러닝 모델에 의해 예측된 속도를 클라이언트로 전달하는 예제를 설명합니다.

## 입력 데이터 준비하기
본 예제에서는 교통센서로부터 시계열 속도 데이터(LINK_ID, 속도, 날짜)를 Kafka로 입력 받는 것을 가정합니다. 초 단위로 측정된 센서 데이터가 무작위로 들어온다고 가정합니다. 또한 텐서플로우 기반의 15분 뒤 속도를 예측하는 모델이 미리 학습되어 export 되어 있다고 가정합니다.

### 시계열 속도 스트림데이터 생성하기
본 예제를 위해 준비된 파일로부터 시계열 속도 데이터를 한줄씩 읽어 Kafka로 보내주는 파이썬 프로그램을 제공합니다.

입력파일은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 존재하는 ``201601_kangnam_orgarnized_new.csv`` 파일입니다. 초 단위로 측정한 LINK_ID 와 속도가 무작위로 섞여있습니다.

- 입력파일 형태 및 내용

PRCS_DATE  | LINK_ID  | PRCS_SPD
--|---|--
2016-01-01 00:00:27  | 1220034700  | 32.8
2016-01-01 00:00:12  | 1220019900  | 15.9
2016-01-01 00:00:08  | 1220025800  | 21.1   
...  | ...  | ...  


파이썬 프로그램은 Host PC의 /home/csle/ksb-csle/examples 폴더에 존재하는 kangnam_producer.py 입니다. 파이썬 프로그램이 있는 폴더로 이동하여 아래 명령어를 실행합니다.

```
cd /home/csle/ksb-csle/examples
python3 kangnam_producer.py ./input/201601_kangnam_orgarnized_new.csv localhost:9092 traffic 0.01

첫번째 arg: 보내려는 csv 파일 경로
두번째 arg: 카프카 서버 접속 주소
세번째 arg: 토픽명
네번째 arg: 각 row 를 전송하는 주기 (초 단위)
```

아래 그림과 같이 파일로 부터 데이터를 한줄씩 읽어 Kafka로 보내는 것을 확인할 수 있습니다.

![스트림 데이터 생성](./images/2.6_kafkaInput.png)


### 학습된 모델 업로드
<a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.2.TrafficTraining.md">교통속도예측 텐서플로우 모델 학습하기</a> 매뉴얼에서 학습한 후 export 된 모델이 ``hdfs://csle1:9000/user/ksbuser_etri_re_kr/model/kangnam`` 위치에 저장되어 있을 경우, 아래 과정을 생략합니다.
HooYoungAhn's avatar
HooYoungAhn committed
그렇지 않을 경우, Host PC의 /home/csle/ksb-csle/examples/models/kangnam/model 폴더에 있는 텐서플로우 서빙용으로 export 한 모델을 HDFS repository에 웹툴킷을 이용하여 업로드합니다.
model/kangnam 위치에 model 폴더를 업로드 합니다.

![모델 업로드](./images/2.6.6_modelUpload.png)

모델이 아래 폴더 구조로 업로드 되어야 합니다.

![모델 업로드](./images/2.6.6_modelUpload2.png)


### 기타 입력파일 업로드
그 밖에 필요한 파일들을 HDFS repository에 웹툴킷을 이용하여 dataset/input/traffic 위치에 업로드 합니다. 아래 파일들은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 있습니다.
- traffic_kangnam_cols.txt
- trafficStreamingSplitSample.json

![파일 업로드](./images/2.6_fileUpload.png)


## 워크플로우 생성하기
HooYoungAhn's avatar
HooYoungAhn committed
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 세 개의 엔진을 생성합니다.

- 워크플로우 속성

속성  | 값  | 비고
--|---|--
name  | TrafficStreamingPredict |   워크플로우 이름
description  | 강남 교통 예측 시나리오  |  워크플로우를 설명하는 글
isBatch  | false | 스트림 처리를 하는 워크플로우 이므로, false 로 지정
verbose  | false | 디버깅을 위해 로그정보를 보고자할 경우, true 로 지정


### 첫 번째 엔진 생성하기
스트림형태로 입력되는 교통 데이터를 입력받아 실시간으로 전처리를 수행하기 위해 StreamJoin 엔진을 선택합니다. StreamJoin 엔진은 내부적으로 Spark의 <a href="https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html">Structured Streaming</a>을 사용합니다.
시간이 순차적으로 들어오지 않아도 처리 가능하고, 입력데이터의 누락없이 데이터를 실시간으로 처리합니다. Trigger는 Writer에 있으며, Trigger 시점에 들어와있는 입력데이터를 가공하여 내보냅니다.

카프카로부터 속도 데이터를 입력받아서 LINK_ID 별로 1분 동안의 속도를 평균하여 내보냅니다. 30초 단위로 오버랩하여 missing value를 채우고, 잡음을 smoothing 하는 역할을 합니다.  

- 엔진 속성

순번  | 엔진 Type | NickName  | RunType | 설명
--|---|---|---|--
1  | StreamJoin  | StreamProcessingEngine  | 즉시실행 | 1분 단위로 속도 aggregation


#### Reader
KafkaPipeReader를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092 |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181 |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic  |  Kafka 큐의 이름
addTimestamp  | false  |  입력데이터를 받은 시간을 DataFrame 에 추가하고 싶을 경우, true 로 지정
timestampName  | PRCS_DATE  | 윈도우, watermark 등을 설정할 시간 정보가 들어있는 칼럼 이름 입력
watermark  | 2 minutes  | 늦게 들어오는 입력데이터를 얼마동안 기다릴 것인지 입력
sampleJsonPath  | dataset/input/traffic/<br>trafficStreamingSplitSample.json  | 입력데이터 형태 지정
failOnDataLoss  | false  |  



#### Writer
KafkaPipeWriter를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
mode | append | 새로 들어온 입력데이터에 대해서 처리함
trigger  | 5 seconds  |  5초 동안 들어온 입력데이터에 대해서 처리함
bootStrapServers  | localhost:9092  |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181  |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output1  |  Kafka 큐의 이름
checkpointLocation  | file:///home/csle/ksb-csle/examples/<br>output/kangnam/checkpoint/kafka1  |  checkpoint 파일을 저장할 폴더 지정
failOnDataLoss  | true  |  


#### Controller
StreamingGenericController 를 선택합니다.


#### Runner
SimpleSparkRunner 를 선택합니다.
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수  
excuterCores  | 2  | 익스큐터 코어 개수  
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores  | 2  |  Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전



#### Operator
첫번째 엔진에서는 3개의 Operator를 사용하여 입력데이터를 가공합니다.


1. **GroupByOperator**

field  |value   | 설명
--|---|--
timeColName  |  PRCS_DATE |  시간정보가 들어있는 칼럼 이름
keyColName  | LINK_ID  |  key 칼럼 이름
valColName  | PRCS_SPD  | 계산할 칼럼 이름
groupby  | AVG  |  평균 계산
window  |   |  아래의 표 참고

window 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
key  | PRCS_DATE  | 윈도우를 사용할 칼럼 이름  
windowLength | 1 minutes | 1분 단위 윈도우 설정
slidingInterval  | 30 seconds  |  30초 오버랩 설정


1분 단위의 윈도우(30초 오버랩)로 LINK_ID 별 평균 속도를 계산합니다.
LINK_ID, window (start, end), PRCS_SPD 로 칼럼이 나옵니다.


2. **SelectColumnsPipeOperator**

field  |value   | 설명
--|---|--
colName | LINK_ID <br> window.start <br> PRCS_SPD |  선택할 칼럼 이름
isRename  | false  |  

LINK_ID, window.start, PRCS_SPD 칼럼을 선택합니다.


3. **RenameColumnsPipeOperator**

field  |value   | 설명
--|---|--
selectedColumn | | 아래의 표 참고

selectedColumn 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
selectedColIndex | 1 | window.start 칼럼 선택 (0번부터 시작함)
newColName  | PRCS_DATE  |  변경할 칼럼 이름
newFieldType  | STRING  |  변경할 칼럼 타입

GroupByOperator 에 의해 window로 변경된 칼럼 이름을 원래대로 바꾸고, 카프카로 시간 정보를 제대로 내보내기 위해 타입을 STRING 으로 변경합니다.


### 두 번째 엔진 생성하기
missing value를 채우고 잡음을 smoothing 하기 위해, 한번 더 카프카로부터 속도 데이터를 입력받아서 LINK_ID 별로 5분 동안의 속도를 평균하여 내보냅니다.

첫 번째 엔진과 동일하게 구성하여 속성만 다르게 지정합니다. 이를 위해 **Engine 복제** 기능을 사용하여 첫번째 엔진을 복사합니다. 첫번째 엔진을 클릭한 후 마우스 오른쪽 버튼을 클릭하면 세부메뉴가 나타납니다. 이 세부메뉴에서 **Engine 복제** 메뉴를 클릭하면 동일한 엔진이 생성됩니다.  

- 엔진 속성

순번  | 엔진 Type | NickName  | RunType | 설명
--|---|---|---|--
2  | StreamJoin  | StreamProcessing2Engine  | 즉시실행 | 5분 단위로 속도 aggregation


#### Reader
KafkaPipeReader를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092 |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181 |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output1  |  Kafka 큐의 이름
addTimestamp  | false  |  입력데이터를 받은 시간을 DataFrame 에 추가하고 싶을 경우, true 로 지정
timestampName  | PRCS_DATE  | 윈도우, watermark 등을 설정할 시간 정보가 들어있는 칼럼 이름 입력
watermark  | 2 minutes  | 늦게 들어오는 입력데이터를 얼마동안 기다릴 것인지 입력
sampleJsonPath  | dataset/input/traffic/<br>trafficStreamingSplitSample.json  | 입력데이터 형태 지정
failOnDataLoss  | false  |  


#### Writer
KafkaPipeWriter를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
mode | append | 새로 들어온 입력데이터에 대해서 처리함
trigger  | 5 seconds  |  5초 동안 들어온 입력데이터에 대해서 처리함
bootStrapServers  | localhost:9092  |  Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect  | localhost:2181  |  zookeeper의 접속주소(IP, 포트번호)
topic  | traffic_output2  |  Kafka 큐의 이름
checkpointLocation  | file:///home/csle/ksb-csle/examples/<br>output/kangnam/checkpoint/kafka2  |  checkpoint 파일을 저장할 폴더 지정
failOnDataLoss  | true  |  


#### Controller
StreamingGenericController 를 선택합니다.


#### Runner
SimpleSparkRunner 를 선택합니다.
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수  
excuterCores  | 2  | 익스큐터 코어 개수  
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores  | 2  |  Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전



#### Operator
두 번째 엔진에서는 3개의 Operator를 사용하여 입력데이터를 가공합니다.


1. **GroupByOperator**

field  |value   | 설명
--|---|--
timeColName  |  PRCS_DATE |  시간정보가 들어있는 칼럼 이름
keyColName  | LINK_ID  |  key 칼럼 이름
valColName  | PRCS_SPD  | 계산할 칼럼 이름
groupby  | AVG  |  평균 계산
window  |   |  아래의 표 참고

window 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
key  | PRCS_DATE  | 윈도우를 사용할 칼럼 이름  
windowLength | 5 minutes | 5분 단위 윈도우 설정
slidingInterval  | 5 minutes  |  오버랩 없도록 설정


5분 단위의 윈도우(오버랩 없음)로 LINK_ID 별 평균 속도를 계산합니다.
LINK_ID, window (start, end), PRCS_SPD 로 칼럼이 나옵니다.


2. **SelectColumnsPipeOperator**

field  |value   | 설명
--|---|--
colName | LINK_ID <br> window.start <br> PRCS_SPD |  선택할 칼럼 이름
isRename  | false  |  

LINK_ID, window.start, PRCS_SPD 칼럼을 선택합니다.


3. **RenameColumnsPipeOperator**

field  |value   | 설명
--|---|--
selectedColumn | | 아래의 표 참고

selectedColumn 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
selectedColIndex | 1 | window.start 칼럼 선택 (0번부터 시작함)
newColName  | PRCS_DATE  |  변경할 칼럼 이름
newFieldType  | STRING  |  변경할 칼럼 타입

GroupByOperator 에 의해 window로 변경된 칼럼 이름을 원래대로 바꾸고, 카프카로 시간 정보를 제대로 내보내기 위해 타입을 STRING 으로 변경합니다.



### 세 번째 엔진 생성하기
스트림형태로 입력되는 속도 데이터를 입력 받아 전처리 한 후, 15분 뒤 속도를 예측해서 스트림 형태로 내보내므로 StreamToStream엔진을 선택합니다.

- 엔진 속성

순번  | 엔진 Type | NickName  | RunType | 설명
--|---|---|---|--
3  | StreamToStream  | PredictEngine  | 즉시실행| 속도 전처리 후 딥러닝 모델 기반 예측


#### Reader
KafkaReader를 선택하고 아래표와 같은 속성을 지정합니다.

field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092  | Kafka 접속 주소(IP, 포트번호)  
zooKeeperConnect  | csle1:2181 |  zookeeper의 접속주소(IP, 포트번호)  
groupId  | traffic  | Kafka topic 그룹 아이디   
topic  | traffic_output2 | Kafka 큐의 이름


#### Writer
KafkaWriter를 선택하고 아래표와 같은 속성을 지정합니다.

field  |value   | 설명
--|---|--
bootStrapServers  | localhost:9092  | Kafka 접속 주소(IP, 포트번호)  
zooKeeperConnect  | csle1:2181 |  zookeeper의 접속주소(IP, 포트번호)  
groupId  | traffic  | Kafka topic 그룹 아이디   
topic  | kangnam_output | Kafka 큐의 이름


#### Controller
SparkStreamController를 선택하고 아래표와 같은 속성을 지정합니다.
SparkStreamController는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다.

field  |value   | 설명
--|---|--
operationPeriod  |  5 | Reader로 부터 데이터를 읽어올 주기, 초 단위
windowSize  | 1530  | 큐에서 사용할 윈도우 크기 <br> 170 x 9  
slidingSize  |  170 | 윈도우를 sliding 하는 크기   

본 예제에서는 5초 주기마다 1530 개의 데이터를 잘라서 operator에게 전달합니다. 입력데이터는 총 170개의 LINK_ID 를 가집니다. 따라서 5분 간격으로 170개의 데이터가 들어오는 셈입니다. 그런데 170개 데이터의 순서는 무작위로 들어옵니다. 또한 스트림 데이터는 계속해서 들어오고 있으므로 첫 시작 데이터가 어디일지 가늠하기 어렵습니다. 따라서 필요한 8 주기 보다 한 주기를 더 가져와서 처리하고 (170 x 9 = 1530 개), 1 주기 씩 슬라이딩 합니다. 아래에서 설명할 GroupByFilterOperator 및 OrderByFilterOperator 를 이용하여 170 x 8 = 1360 개 데이터를 필터링해서 사용합니다.

#### Runner
SimpleSparkRunner 를 선택합니다.
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수  
excuterCores  | 2  | 익스큐터 코어 개수  
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores  | 2  |  Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전



#### Operator
세 번째 엔진에서는 9개의 Operator를 사용하여 입력데이터를 가공합니다.


1. **ColumnSelectOperator**

field  |value   | 설명
--|---|--
selectedColumnName | PRCS_DATE <br> LINK_ID <br> PRCS_SPD |  선택할 칼럼 이름

2. **GroupByFilterOperator**

field  |value   | 설명
--|---|--
keyColName  | PRCS_DATE  |  총계처리를 할 key 칼럼 이름
groupby  | COUNT  |  총계처리 방법 선택
condition  | EQUAL  |  총계처리 후 Filter 방법 선택
value  | 170  | Filter 의 값 입력

1530 개 데이터에서 LINK_ID 가 170 개가 되지 않는 시간의 데이터를 버리기 위해서, PRCS_DATE 칼럼을 COUNT 하여 170 이 되는 조건의 데이터만 남깁니다. 그러면 170 x 8 = 1360 개의 데이터만 남습니다. 그런데 우연히 데이터를 시작시점부터 잘 받았을 경우, 170 x 9 = 1530 개의 데이터가 남습니다.

3. **OrderByFilterOperator**

field  |value   | 설명
--|---|--
keyColName  | PRCS_DATE  |  정렬을 할 key 칼럼 이름
method  | ASC  |  정렬 방법 선택 (ASC: 오름차순, DESC: 내림차순)
value  | 1360  | 선택할 Row 의 개수

PRCS_DATE 칼럼을 정렬하여 8 주기의 데이터 (170 x 8 = 1360 개) 만 남깁니다.

4. **MinMaxScalingOperator**

field  |value   | 설명
--|---|--
selectedColumnId |  2 |  선택할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
max  | 0.5  |  데이터가 scaling 될 때 최대값
min  |  -0.5 |  데이터가 scaling 될 때 최소값
withMinMaxRange  |  true | 위에서 설정한 max, min 값 사용 여부
maxRealValue  | 100  |  데이터에서 정상 범위로 사용할 최소값
minRealValue  |  0 |   데이터에서 정상 범위로 사용할 최대값


5. **PivotOerator**

field  |value   | 설명
--|---|--
selectedColumnId  | 1  | Pivot 할 칼럼 ID, LINK_ID 선택 (0부터 시작)
groupByColumn  | 0  |  총계처리를 할 key 칼럼 ID, PRCS_DATE 선택 (0부터 시작)
valueColumn  | 2  |  총계처리를 할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
method  | AVG  |  총계처리 방법 선택

시간 및 170 개의 LINK_ID 값을 칼럼으로 가지고, 시간에 따른 평균속도를 행으로 가지도록 DataFrame 을 변경합니다.   

![Pivot 결과](./images/2.6_pivotResult.png)

6. **ColumnSelectWithFileOperator**

field  |value   | 설명
--|---|--
columnIdPath  | dataset/input/traffic/<br>traffic_kangnam_cols.txt  |  선택할 칼럼 이름이 정의된 텍스트파일 경로 및 이름 입력

텐서플로우 기반 속도 예측 모델의 입력 순서에 맞도록 칼럼을 선택합니다. 본 예제에서는 칼럼의 개수가 많으므로 ColumnSelectOperator 를 사용하여 선택할 칼럼 이름을 일일이 입력하는 것이 매우 번거로우므로 ColumnSelectWithFileOperator 를 사용합니다. 칼럼 이름을 쉼표(,) 로 구분하여 입력한 텍스트파일로 부터 칼럼 이름을 읽어들여 칼럼을 선택합니다.

7. **VectorAssembleColumnAddOperator**

field  |value   | 설명
--|---|--
vectorAssembleColumnName  | in1  | 추가할 벡터 칼럼 이름 입력

DataFrame의 모든 칼럼을 하나의 벡터로 만들어서 벡터타입의 칼럼을 마지막에 추가합니다.  

8. **FlattenOperator**

field  |value   | 설명
--|---|--
columnName  | in1  | 칼럼 이름 입력

선택한 칼럼의 데이터를 하나의 벡터로 만들어서 벡터 값만 남깁니다. 텐서플로우 모델의 입력형태로 데이터를 변환하는 과정입니다.

9. **TensorflowPredictOperator**

field  |value   | 설명
--|---|--
modelServerUri  | model/kangnam | 모델 경로
modelName  |  kangnam_traffic  | 모델 이름
signatureName  | predict_speed  | 모델 시그네쳐 이름

텐서플로우 서빙용으로 export 한 모델의 경로 및 export 할 때 정의한 모델 이름과 시그네쳐 이름을 이용하여 텐서플로우 모델을 서빙용으로 띄웁니다 (처음에 한번만 수행함). 그리고 서빙용 모델에 데이터를 보내고 예측된 속도 값을 리턴받아서 내부적으로 DataFrame 형태로 변환합니다.   

<br>
<br>

![워크플로우 완성 화면](./images/2.6.6_workflow.png)

ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.

## 워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 스트림 데이터를 처리하므로 Batch 체크 박스는 해제하고 워크플로우를 제출합니다.


### 워크플로우 모니터링 하기

KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에서는 실행한 워크플로우들의 목록 및 동작 상태를 확인하고 제어할 수 있습니다. 위에서 실행한 워크플로우 이름을 클릭하면 워크플로우의 각 엔진들의 동작 상태 (status) 를 확인할 수 있습니다.

![워크플로우 동작 상태 확인](./images/2.6.6_monitoring.png)


WorkFlow History 탭을 선택하면, 프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.

![워크플로우 히스토리](./images/2.6.6_monitoring_history.png)

### 체크포인트 파일 확인

첫 번째 엔진과 두 번째 엔진의 KafkaPipeWriter 가 생성한 체크포인트 파일이 지정한 위치에 생성된 것을 확인할 수 있습니다.
워크플로우 종료 후 재시작 할 경우에는 체크포인트 파일을 삭제하고 실행해야 원하는 결과를 얻을 수 있습니다.

![체크포인트 파일 확인](./images/2.6.6_checkpoint.png)

### 속도 예측 결과 확인하기
Kafka의 consumer를 이용하여 속도 예측 결과를 확인합니다. 이를 위해 카프카가 설치된 폴더로 이동하여 아래 명령어를 실행합니다. 카프카는 ``home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1`` 위치에 설치되어 있습니다.

```
cd /home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --topic kangnam_output
```

아래 그림과 같이 예측된 속도값을 Kafka로 보내는 것을 확인할 수 있습니다.

![예측 결과 확인](./images/2.6_predictResult.png)


## 워크플로우 종료하기
KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 TrafficStreamingPredict 워크플로우를 종료(<span style="color:red">&#9724;</span>)할 수 있습니다.

## 워크플로우 저장하기
워크플로우 편집 화면에서 작성한 워크플로우를 "Save Workflow" 메뉴를 이용하여 저장합니다.