Newer
Older
---
html:
toc: true
offline: true
export_on_save:
html: true
---
# 실시간 시계열 교통속도 센서스트림 처리/학습/예측 및 주기적 모델 갱신하기 (TrafficEndToEnd)
---
교통센서로부터 시계열 속도 데이터를 수집한 후 전처리를 수행하고, 텐서플로우 기반 딥러닝 모델에 의해 예측된 속도를 클라이언트로 전달하는 한편, 전처리한 속도 데이터를 계속해서 파일에 저장한 후 텐서플로우 기반 딥러닝 모델을 주기적으로 학습하는 예제를 설명합니다.
<br>
![교통 시나리오](./images/2.6.7_TrafficSenario.png)
<br>
본 예제는 <a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.6.TrafficStreamingPredict.md">실시간 시계열 교통속도 센서스트림 처리 및 텐서플로우 모델 서빙하기</a> 매뉴얼에 따라 워크플로우를 작성하였다고 가정하고 이어서 설명을 합니다.
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
## 입력 데이터 준비하기
본 예제에서는 교통센서로부터 시계열 속도 데이터(LINK_ID, 속도, 날짜)를 Kafka로 입력 받는 것을 가정합니다. 초 단위로 측정된 센서 데이터가 무작위로 들어온다고 가정합니다. 또한 텐서플로우 기반의 15분 뒤 속도를 예측하는 모델이 파이썬 코드로 구현되어 있다고 가정합니다.
### 시계열 속도 스트림데이터 생성하기
본 예제를 위해 준비된 파일로부터 시계열 속도 데이터를 한줄씩 읽어 Kafka로 보내주는 파이썬 프로그램을 제공합니다.
입력파일은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 존재하는 ``201601_kangnam_orgarnized_new.csv`` 파일입니다. 초 단위로 측정한 LINK_ID 와 속도가 무작위로 섞여있습니다.
- 입력파일 형태 및 내용
PRCS_DATE | LINK_ID | PRCS_SPD
--|---|--
2016-01-01 00:00:27 | 1220034700 | 32.8
2016-01-01 00:00:12 | 1220019900 | 15.9
2016-01-01 00:00:08 | 1220025800 | 21.1
... | ... | ...
파이썬 프로그램은 Host PC의 /home/csle/ksb-csle/examples 폴더에 존재하는 kangnam_producer.py 입니다. 파이썬 프로그램이 있는 폴더로 이동하여 아래 명령어를 실행합니다.
```
cd /home/csle/ksb-csle/examples
python3 kangnam_producer.py ./input/201601_kangnam_orgarnized_new.csv localhost:9092 traffic 0.01
첫번째 arg: 보내려는 csv 파일 경로
두번째 arg: 카프카 서버 접속 주소
세번째 arg: 토픽명
네번째 arg: 각 row 를 전송하는 주기 (초 단위)
```
아래 그림과 같이 파일로 부터 데이터를 한줄씩 읽어 Kafka로 보내는 것을 확인할 수 있습니다.
![스트림 데이터 생성](./images/2.6_kafkaInput.png)
### 파이썬 코드 업로드
Host PC의 /home/csle/ksb-csle/components/src/main/python/kangnam 폴더를 HDFS repository에 웹툴킷을 이용하여 업로드 합니다.
dataset/tensorflowTrainSource 위치에 폴더를 업로드 합니다.
![코드 업로드](./images/2.6.2_codeUpload.png)
![코드 업로드](./images/2.6.2_codeUpload2.png)
`tensorflow_train.py` 파일은 텐서플로우 기반의 15분 뒤 속도를 예측하는 모델을 학습하는 코드가 구현되어 있는 파일입니다. (본 예제에서는 LSTM 알고리즘을 이용하여 15분 뒤 속도를 예측하는 모델을 사용합니다.) 프레임워크와 연동하여 학습이 되도록 파이썬 코드에 input 과 output 경로 등을 argument 로 받는 부분이 코딩되어야 합니다. 자세한 내용은 <a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.7.2.KSB_TfPyCoding_Guide.md">KSB 프레임워크과 연동을 위한 tensorflow 학습코드 작성하기</a> 를 참고합니다.
<a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.2.TrafficTraining.md">교통속도예측 텐서플로우 모델 학습하기</a> 매뉴얼에서 학습한 후 export 된 모델이 ``hdfs://csle1:9000/user/ksbuser_etri_re_kr/model/kangnam`` 위치에 저장되어 있을 경우, 아래 과정을 생략합니다.
그렇지 않을 경우, Host PC의 /home/csle/ksb-csle/examples/models/kangnam/model 폴더에 있는 텐서플로우 서빙용으로 export 한 모델을 HDFS repository에 웹툴킷을 이용하여 업로드합니다.
model/kangnam 위치에 model 폴더를 업로드 합니다.
![모델 업로드](./images/2.6.6_modelUpload.png)
모델이 아래 폴더 구조로 업로드 되어야 합니다.
![모델 업로드](./images/2.6.6_modelUpload2.png)
### 기타 입력파일 업로드
그 밖에 필요한 파일들을 HDFS repository에 웹툴킷을 이용하여 dataset/input/traffic 위치에 업로드 합니다. 아래 파일들은 Host PC의 /home/csle/ksb-csle/examples/input 폴더에 있습니다.
- traffic_kangnam_cols.txt
- traffic_kangnam_clos2.txt
- trafficStreamingSplitSample.json
![파일 업로드](./images/2.6_fileUpload.png)
워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 다섯 개의 엔진을 생성합니다.
- 워크플로우 속성
속성 | 값 | 비고
--|---|--
name | TrafficEndToEnd | 워크플로우 이름
description | 강남 교통 예측 및 학습 시나리오 | 워크플로우를 설명하는 글
isBatch | false | 스트림 처리를 하는 워크플로우 이므로, false 로 지정
verbose | false | 디버깅을 위해 로그정보를 보고자할 경우, true 로 지정
- 엔진 속성
순번 | 엔진 Type | NickName | RunType | 설명
--|---|---|---|--
1 | StreamJoin | StreamProcessingEngine | 즉시실행 | 1분 단위로 속도 aggregation
2 | StreamJoin | StreamProcessing2Engine | 즉시실행 | 5분 단위로 속도 aggregation
3 | StreamToStream | PredictEngine | 즉시실행| 속도 전처리 후 딥러닝 모델 기반 예측
4 | StreamToBatch | PreprocessingEngine | 즉시실행 | 딥러닝 모델 학습을 위한 전처리
5 | Batch | TrainEngine | 반복실행 | 딥러닝 모델 학습
앞의 세 개 엔진은 <a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/2.6.6.TrafficStreamingPredict.md">실시간 시계열 교통속도 센서스트림 처리 및 텐서플로우 모델 서빙하기</a> 매뉴얼을 참고하여 생성합니다. 워크플로우 편집화면 상단의 **Load Workflow** 메뉴를 이용하여 *TrafficStreamingPredict* 워크플로우를 불러옵니다.
![워크플로우 불러오기](./images/2.6.7_workflowLoad.png)
또는 **Workflow instances** 화면에서 *TrafficStreamingPredict* 워크플로우를 선택한 후 편집화면으로 드래그하여 워크플로우를 재사용할 수도 있습니다.
![워크플로우 재사용하기](./images/2.6.7_workflowInstances.png)
### 두 번째 엔진 수정하기
KafkaPipeWriter를 하나 더 추가합니다. StreamJoin 엔진은 여러 개의 Reader 와 Writer 를 가질 수 있습니다.
#### Writer
KafkaPipeWriter를 선택하고 아래표와 같은 속성을 지정합니다.
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
field |value | 설명
--|---|--
mode | append | 새로 들어온 입력데이터에 대해서 처리함
trigger | 5 seconds | 5초 동안 들어온 입력데이터에 대해서 처리함
bootStrapServers | localhost:9092 | Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect | localhost:2181 | zookeeper의 접속주소(IP, 포트번호)
topic | traffic_output2_copy | Kafka 큐의 이름
checkpointLocation | file:///home/csle/ksb-csle/examples/output/<br>kangnam/checkpoint/kafka2_copy | checkpoint 파일을 저장할 폴더 지정
failOnDataLoss | true |
### 네 번째 엔진 생성하기
스트림형태로 입력되는 속도 데이터를 입력 받아 가공한 뒤 배치형태의 파일로 내보내므로 StreamToBatch엔진을 선택합니다.
#### Reader
KafkaReader를 선택하고 아래표와 같은 속성을 지정합니다.
field |value | 설명
--|---|--
bootStrapServers | localhost:9092 | Kafka 접속 주소(IP, 포트번호)
zooKeeperConnect | csle1:2181 | zookeeper의 접속주소(IP, 포트번호)
groupId | traffic | Kafka topic 그룹 아이디
topic | traffic_output2_copy | Kafka 큐의 이름
#### Writer
FileWriter를 선택하고 아래표와 같은 속성을 지정합니다.
field |value | 설명
--|---|--
filePath | file:///home/csle/ksb-csle/examples/output/traffic_processing.csv | 출력 파일 경로
fileType | CSV | 출력 파일 타입
delimiter | , | 구분자
header | false | 헤더 포함 유무
saveMode | APPEND | 파일 저장 방식
- 출력 파일 경로 지정
HDFS 의 상대경로를 지정할 경우, 상대경로 앞에 **사용자 기본 파일경로** 가 워크플로우 실행 시 동적으로 생성되어 적용됩니다. **사용자 기본 파일경로** 는 ``hdfs://{host}:{port}/user/{user_id}/applications/{workflow_id}/output/`` 입니다.
스트림 처리를 통해 주기적으로 파일을 생성하는 경우 HDFS 를 지원하지 않습니다. 따라서 본 예제에서는 로컬파일 시스템 경로를 사용합니다. 로컬파일 시스템 경로를 지정할 경우, ``file:///`` 키워드를 앞에 붙여서 사용합니다.
#### Controller
SparkStreamController를 선택하고 아래표와 같은 속성을 지정합니다.
SparkStreamController는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다.
field |value | 설명
--|---|--
operationPeriod | 10 | Reader로 부터 데이터를 읽어올 주기, 초 단위
windowSize | 1530 | 큐에서 사용할 윈도우 크기 <br> 170 x 9
slidingSize | 1360 | 윈도우를 sliding 하는 크기 <br> 170 x 8
본 예제에서는 10초 주기마다 1530 개의 데이터를 잘라서 operator에게 전달합니다. 입력데이터는 총 170개의 LINK_ID 를 가집니다. 따라서 5분 간격으로 170개의 데이터가 들어오는 셈입니다. 그런데 스트림 데이터는 계속해서 들어오고 있으므로 첫 시작 데이터가 어디일지 가늠하기 어렵습니다. 따라서 필요한 8 주기 보다 한 주기를 더 가져와서 처리하고 (170 x 9 = 1530 개), 8 주기 씩 슬라이딩 합니다 (170 x 8 = 1360 개). 아래에서 설명할 GroupByFilterOperator 및 OrderByFilterOperator 를 이용하여 170 x 8 = 1360 개 데이터를 필터링해서 사용합니다.
#### Runner
SimpleSparkRunner 를 선택합니다.
field |value | 설명
--|---|--
inJason | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs | | 아래의 표 참고
sparkArgs 설정은 다음과 같이 합니다.
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
field |value | 설명
--|---|--
master | local[\*] | YARN 의 마스터
numExecutors | 2 | 익스큐터 개수
excuterCores | 2 | 익스큐터 코어 개수
executerMemory | 1g | Spark 의 익스큐터 메모리
driverMemory | 1g | Spark 드라이버 메모리
totalCores | 2 | Spark 전체 할당 가능 코어수
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전
#### Operator
세 번째 엔진에서는 6개의 Operator를 사용하여 입력데이터를 가공합니다.
1. **ColumnSelectOperator**
field |value | 설명
--|---|--
selectedColumnName | PRCS_DATE <br> LINK_ID <br> PRCS_SPD | 선택할 칼럼 이름
2. **GroupByFilterOperator**
field |value | 설명
--|---|--
keyColName | PRCS_DATE | 총계처리를 할 key 칼럼 이름
groupby | COUNT | 총계처리 방법 선택
condition | EQUAL | 총계처리 후 Filter 방법 선택
value | 170 | Filter 의 값 입력
1530 개 데이터에서 LINK_ID 가 170 개가 되지 않는 시간의 데이터를 버리기 위해서, PRCS_DATE 칼럼을 COUNT 하여 170 이 되는 조건의 데이터만 남깁니다. 그러면 170 x 8 = 1360 개의 데이터만 남습니다. 그런데 우연히 데이터를 시작시점부터 잘 받았을 경우, 170 x 9 = 1530 개의 데이터가 남습니다.
3. **OrderByFilterOperator**
field |value | 설명
--|---|--
keyColName | PRCS_DATE | 정렬을 할 key 칼럼 이름
method | ASC | 정렬 방법 선택 (ASC: 오름차순, DESC: 내림차순)
value | 1360 | 선택할 Row 의 개수
PRCS_DATE 칼럼을 정렬하여 8 주기의 데이터 (170 x 8 = 1360 개) 만 남깁니다.
4. **MinMaxScalingOperator**
field |value | 설명
--|---|--
selectedColumnId | 2 | 선택할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
max | 0.5 | 데이터가 scaling 될 때 최대값
min | -0.5 | 데이터가 scaling 될 때 최소값
withMinMaxRange | true | 위에서 설정한 max, min 값 사용 여부
maxRealValue | 100 | 데이터에서 정상 범위로 사용할 최소값
minRealValue | 0 | 데이터에서 정상 범위로 사용할 최대값
5. **PivotOerator**
field |value | 설명
--|---|--
selectedColumnId | 1 | Pivot 할 칼럼 ID, LINK_ID 선택 (0부터 시작)
groupByColumn | 0 | 총계처리를 할 key 칼럼 ID, PRCS_DATE 선택 (0부터 시작)
valueColumn | 2 | 총계처리를 할 칼럼 ID, PRCS_SPD 선택 (0부터 시작)
method | AVG | 총계처리 방법 선택
시간 및 170 개의 LINK_ID 값을 칼럼으로 가지고, 시간에 따른 평균속도를 행으로 가지도록 DataFrame 을 변경합니다.
![Pivot 결과](./images/2.6_pivotResult.png)
6. **ColumnSelectWithFileOperator**
field |value | 설명
--|---|--
columnIdPath | dataset/input/traffic/<br>traffic_kangnam_cols2.txt | 선택할 칼럼 이름이 정의된 텍스트파일 경로 및 이름 입력
텐서플로우 기반 속도 예측 모델의 입력 순서에 맞도록 칼럼을 선택합니다. 본 예제에서는 칼럼의 개수가 많으므로 ColumnSelectOperator 를 사용하여 선택할 칼럼 이름을 일일이 입력하는 것이 매우 번거로우므로 ColumnSelectWithFileOperator 를 사용합니다. 칼럼 이름을 쉼표(,) 로 구분하여 입력한 텍스트파일로 부터 칼럼 이름을 읽어들여 칼럼을 선택합니다.
### 다섯 번째 엔진 생성하기
텐서플로우를 이용하여 배치 형태로 딥러닝 모델을 생성하기 위해 Batch 엔진을 선택합니다. 다섯 번째 엔진은 딥러닝 모델을 주기적으로 생성하기 위해 **RUNTYPE** 을 **반복 실행** 으로 설정합니다. **주기** 는 **매분** 으로 설정합니다. 매분은 테스트용으로 5분 단위로 엔진이 submit 되어 실행됩니다.
![RunType 설정](./images/2.6.7_runTypeSetting.png)
#### Reader
FileReader를 선택하고 아래표와 같은 속성을 지정합니다. 앞의 엔진에서 출력한 파일을 지정합니다.
field |value | 설명
--|---|--
filePath | file:///home/csle/ksb-csle/examples/output/traffic_processing.csv | 파일 경로
fileType | CSV | 파일 타입
delimiter | , | 구분자
header | false | header 포함 유무
saveMode | | 사용 하지 않음
#### Writer
FileWriter를 선택하고 아래표와 같은 속성을 지정합니다. 학습한 딥러닝 모델이 최종 저장될 폴더를 지정합니다. 다른 엔진에서 학습한 딥러닝 모델을 사용하도록 하기 위해 HDFS 절대경로를 지정합니다. 상대경로를 지정할 경우, 동적으로 생성한 **사용자 기본 파일경로** 가 앞에 추가되므로 다른 엔진에서 정확한 파일 경로를 지정하는 것이 어렵습니다.
field |value | 설명
--|---|--
filePath | hdfs://csle1:9000/user/ksbuser_etri_re_kr/model/kangnam | 저장 경로
fileType | CSV | 파일 타입
delimiter | , | 구분자
header | false | header 포함 유무
saveMode | | 사용 하지 않음
#### Controller
Controller는 외부 Tensorflow 호출하기 위해 ExternalAnalysisController을 선택합니다.
#### Runner
TensorflowRunner를 선택합니다.
field |value | 설명
--|---|--
pyEntryPath | dataset/tensorflowTrainSource/kangnam/tensorflow_train.py | 파이썬 코드 경로
cluster | false | cluster 환경 실행 여부
inJson | false | Json 형태 파라미터 전달 유무
tfVersion | r1.6 | Tensorflow 버전 정보
#### Operator
DLTrainOperator를 선택하고 아래표와 같은 속성을 지정합니다.
field |value | 설명
--|---|--
modelPath | file:///home/csle/ksb-csle/examples/output/kangnam/model | 내부적으로 학습된 모델을 저장할 경로 <br> 반드시 로컬 경로만 지정
additionalParams | | 아래의 표 참고
additionalParams 설정은 [+] 버튼을 두 번 클릭하여 다음과 같이 합니다. 파이썬 코드에서 argument 로 받는 부분입니다.
field |value | 설명
--|---|--
paramName | isTrain | 학습 유무
paramValue | True |
field |value | 설명
--|---|--
paramName | num_epochs | 학습 반복 횟수
paramValue | 2 |
<br>
<br>
![워크플로우 완성 화면](./images/2.6.7_workflow.png)
ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.
## 워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 스트림 데이터를 처리하므로 Batch 체크 박스는 해제하고 워크플로우를 제출합니다.
### 워크플로우 모니터링 하기
KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에서는 실행한 워크플로우들의 목록 및 동작 상태를 확인하고 제어할 수 있습니다. 위에서 실행한 워크플로우 이름을 클릭하면 워크플로우의 각 엔진들의 동작 상태 (status) 를 확인할 수 있습니다.
![워크플로우 동작 상태 확인](./images/2.6.7_monitoring.png)
다섯 번째 엔진은 주기적으로 (5분 마다) 실행되므로, 동작 상태(status) 가 완료(Completed)였다가 실행 중일 때는 실행 중(Inprogress) 으로 바뀌는 것을 확인할 수 있습니다.
![워크플로우 동작 상태 확인](./images/2.6.7_monitoring2.png)
WorkFlow History 탭을 선택하면, 프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.
그리고 본 예제에서는 다섯 번째 엔진이 5분 마다 동작한 내역을 확인할 수 있습니다.
![워크플로우 히스토리](./images/2.6.7_monitoring_history.png)
### 학습한 후 export 된 모델 확인하기
다섯 번째 엔진의 FileWriter 에서 지정한 위치에 학습된 모델이 생성된 것을 확인합니다. 다섯 번째 엔진이 5분 마다 실행되어 모델을 주기적으로 생성하는 것을 확인할 수 있습니다. 모델의 버전은 0000 부터 자동으로 1씩 증가합니다.
본 예제에서는 **학습된 모델 업로드** 에서 올려둔 0001 버전 이후부터 자동으로 모델이 생성된 것을 확인할 수 있습니다.
![학습된 모델 확인](./images/2.6.7_modelSave.png)
### 체크포인트 파일 확인
첫 번째 엔진과 두 번째 엔진의 KafkaPipeWriter 가 생성한 체크포인트 파일이 지정한 위치에 생성된 것을 확인할 수 있습니다.
워크플로우 종료 후 재시작 할 경우에는 체크포인트 파일을 삭제하고 실행해야 원하는 결과를 얻을 수 있습니다.
![체크포인트 파일 확인](./images/2.6.7_checkpoint.png)
### 속도 예측 결과 확인하기
Kafka의 consumer를 이용하여 속도 예측 결과를 확인합니다. 이를 위해 카프카가 설치된 폴더로 이동하여 아래 명령어를 실행합니다. 카프카는 ``home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1`` 위치에 설치되어 있습니다.
```
cd /home/csle/ksb-csle/tools/kafka_2.11-0.10.0.1
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --bootstrap-server localhost:9092 --topic kangnam_output
```
아래 그림과 같이 예측된 속도값을 Kafka로 보내는 것을 확인할 수 있습니다.
![예측 결과 확인](./images/2.6_predictResult.png)
## 워크플로우 종료하기
KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 TrafficEndToEnd 워크플로우를 종료(<span style="color:red">◼</span>)할 수 있습니다.