Newer
Older
---
html:
toc: true
offline: true
export_on_save:
html: true
---
# 스트림 방식으로 ML모델 예측결과 얻기 (RealtimeIngestToPredictInSingleEngine)
---
온도센서 데이터를 수집한 후 전처리를 수행하여 Kafka 스트림으로 내보내는 예제를 설명합니다.
## 입력 데이터 준비하기
본 예제에서는 온도 센서 스트림 데이터를 HTTP로 입력 받는 것을 가정합니다. 이를 위해 Jmeter 도구를 활용합니다.
<a href="https://etrioss.kr/thkimetri/ksb19.03-manual/blob/master/manual1903/jmeter.md">Jmeter 설명 바로가기</a>
### 온도 스트림데이터 생성하기
워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야 합니다. Jmeter를 활용한 온도 스트림 데이터 생성 방법은 메뉴얼 하단의 "결과 확인하기" 를 확인해 주시기 바랍니다.
워크플로우 편집 화면을 이용하여 아래의 과정을 통해 워크플로우를 생성합니다.
### 엔진 생성하기
본 예제에서는 스트림형태로 입력되는 온도 데이터를 입력받아 실시간 온도를 예측해서 스트림 형태로 출력하므로 StreamToStream엔진을 선택합니다.
HTTP 서버로부터 전달되어 오는 센서데이터를 입력 받기 위해 HttpServerReader를 선택하고 아래표와 같은 속성을 지정합니다.
field |value | 설명
--|---|--
ip | 0.0.0.0 | HTTP 서버의 IP 주소
path | / | HTTP 서버의 URL
port | 53002 | HTTP 서버의 포트 번호
StreamToStream엔진에서 처리된 결과를 Kafka topic 으로 출력 할 것이므로 KafkaWriter를 선택합니다.
field |value | 설명
--|---|--
topic | test3 | 온도 예측 결과를 받아서 저장할 때 사용할 Kafka 큐의 이름
zooKeeperConnect | csle1:2181 | zookeeper의 접속주소(IP, 포트번호)
groupId | group3 | Kafka writer topic 그룹 아이디
bootStrapServer | csle1:9092 | Kafka 접속 주소(IP, 포트번호)
Controller 로는 WindowedSparkSessionOrStreamController 를 선택합니다. WindowedSparkSessionOrStreamController 는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다.
field |value | 설명
--|---|--
operationPeriod | 2 | Reader로 부터 데이터를 읽어올 주기 | 53002 | HTTP 서버의 포트번호
inputQueSize | 1 | 입력큐의 크기 | |
selectedColumnName | temperature | Spark dataframe에서 추출할 컬럼의 이름 | |
windowSize | 10 | 큐에서 사용할 윈도우의 크기 | |
field |value | 설명
--|---|--
inJason | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs | master | 아래의 표 참고
sparkArgs 설정은 다음과 같이 합니다.
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
field |value | 설명
--|---|--
master | local[\*] | YARN 의 마스터
executerMemory | 1g | Spark 의 익스큐터 메모리
numExecutors | 4 | 익스큐터 개수
sparkVersion | 2.3.0 | KSB프레임워크의 현재 Spark 버전
driverMemory | 1g | Spark 드라이버 메모리
#####Operator
센서로부터 입력되는 온도 데이터를 여섯가지 전처리 기법을 적용하여 정제합니다. 본 예제에서 사용한 전처리 기법은 ColumnSelectOperator, MinMaxScalingOperator, TransposeOperator, SparkMLPredictOperator, MinMaxScalingOperator, ColumnSelectOperator 입니다.
- ColumnSelectOperator
field |value | 설명
--|---|--
selectedColumnId | 1 | 선택할 칼럼 ID
- MinMaxScalingOperator
field |value | 설명
--|---|--
selectedColumnId | 0 | 선택할 칼럼 ID
max | 0.5 | 입력되는 값이 scaling 될 때 최대값
min | -0.5 | 입력되는 값이 scaling 될 때 최소값
withMinMaxRange | true | 위에서 설정한 max, min 값 사용 여부
minRealValue | -20 | 온도 값에서 정상 범위로 사용할 최소값
maxRealValue | 50 | 온도 값에서 정상 범위로 사용할 최대값
- TransposeOperator
field |value | 설명
--|---|--
selectedColumnName | temperature | transpose 연산을 적용할 컬럼의 이름
- SparkMLPredictOperator
field |value | 설명
--|---|--
modelPath | file:///home/csle/ksb-csle/examples/autosparkml/test/automl_test | 학습 모델의 모델 경로
clsNameForModel | org.apache.spark.ml.PipelineModel | 학습 모델의 형식
- MinMaxScalingOperator
field |value | 설명
--|---|--
selectedColumnId | 13 | 선택할 칼럼 ID
max | 50 | 입력되는 값이 rescaling 될 때 최대값
min | -20 | 입력되는 값이 rescaling 될 때 최소값
withMinMaxRange | true | 위에서 설정한 max, min 값 사용 여부
minRealValue | -0.5 | 온도값에서 정상 범위로 사용할 최소값
maxRealValue | 0.5 | 온도값에서 정상 범위로 사용할 최대값
- ColumnSelectOperator
field |value | 설명
--|---|--
selectedColumnId | 13 | 선택할 칼럼 ID
<br>
<br>
![워크플로우 완성 화면](./images/2.5.7_01.png)
ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.
## 워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 이 때, StreamToStream 엔진은 배치형태로 실행되지 않는 엔진들이므로 Batch 체크 박스는 해제하고 워크플로우를 실행해야합니다.
![Batch체크 해제로 변경할것](./images/2.5.7_02.png)
### 워크플로우 모니터링 하기
#### 워크플로우 상태 확인
KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에는 위에서 실행한 두 엔진의 동작 상태 (status) 가 실행 중(Inprogress)인 것을 확인 할 수 있습니다.
![워크플로우 동작 상태 확인](./images/2.5.7_03.png)
#### 워크플로우 로그 보기
WorkFlow History 탭을 선택하면, KSB프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.
![워크플로우 동작 로그](./images/2.5.7_04.png)
#### 온도 스트림데이터 생성하기
워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야 합니다. 이를 위해 위에서 설명한 Jmeter 도구를 활용합니다.
![Jmeter로 온도 스트림데이터 입력하기](./images/2.5.7_05.png)
#### 스트림 형태의 온도 예측결과 받아오기
Kafka writer를 통해서 예측 결과가 Kafka topic 에 잘 저장되었는지 확인하기 위해서는 Kafka consumer를 이용합니다. Kafka consumer 명령은 아래와 같습니다.
![Kafka consumer 로 결과 받아오기](./images/2.5.7_06.png)
## 워크플로우 종료하기
KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 RealtimeIngestToPredictInSingleEngine 워크플로우를 종료(<span style="color:red">◼</span>)할 수 있습니다.