2.5.9.RealtimeIngestToServingWithKbInTwoEngines.md 10.1 KB
Newer Older
HooYoungAhn's avatar
HooYoungAhn committed
---
html:
  toc: true
  offline: true
export_on_save:
  html: true
---
# 스트림 예측결과를 지식추론하여 REST 방식으로 서빙하기 (RealtimeIngestToServingWithKbInTwoEngines)
---

온도센서 데이터를 수집한 후 전처리를 수행하고, 기계학습 모델에 의해 예측된 온도 값으로부터 최적의 제어를 추론하여 제어 메세지를 클라이언트로 전달하는 예제를 설명합니다.

## 입력 데이터 준비하기
본 예제에서는 온도 센서 스트림 데이터를 HTTP로 입력 받는 것을 가정합니다. 이를 위해 Jmeter 도구를 활용합니다.

<a href="https://csleoss.etri.re.kr/images/contents/manual_1.0/jmeter.html">Jmeter 설명 바로가기</a>

### 온도 스트림데이터 생성하기
워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야합니다. Jmeter를 활용한 온도 스트림데이터 생성하는 방법은 메뉴얼 하단의 "결과 확인하기" 를 확인해주시기 바랍니다.



##워크플로우 생성하기
워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 두 개의 엔진을 생성합니다.

### 첫 번째 엔진 생성하기
####엔진 선택
스트림형태로 제공되는 온도 데이터를 입력받아 실시간 온도 예측을 하는 서빙 엔진으로 전달하기 위해 Streamtostream 엔진을 선택합니다.
#####Reader
HTTP 서버로부터 전달되어 오는 센서데이터를 입력 받기 위해 HttpServerReader를 선택하고 아래표와 같은 속성을 지정합니다.
field  |value   | 설명
--|---|--
ip  |  0.0.0.0 |  HTTP 서버의 IP 주소
path  |  /  |  HTTP 서버의 URL
port  |  53002 |  HTTP 서버의 포트 번호


#####Writer
StreamToStream엔진에서 처리된 결과를 다음 엔진에게 Stream 형태로 전달해야 하므로 KafkaWriter를 선택합니다.
field  |value   | 설명
--|---|--
topic  | test3 | 온도 예측 결과를 받아 올때 사용할 Kafka 큐의 이름  
zooKeeperConnect  | csle1:2181 |  zookeeper의 접속주소(IP, 포트번호)
groupId  | group3  | Kafka writer topic 그룹 아이디
bootStrapServer  | csle1:9092  | Kafka 접속 주소(IP, 포트번호)   


#####Controller
Controller 로는 WindowedSparkSessionOrStreamController 를 선택합니다. WindowedSparkSessionOrStreamController 는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다.
field  |value   | 설명
--|---|--
operationPeriod  |  2 | Reader로 부터 데이터를 읽어올 주기   |  53002 |  HTTP 서버의 포트번호를 입력합니다.
inputQueSize  | 1  | 입력 큐의 크기  |   |  
selectedColumnName  |  temperature | Spark dataframe에서 추출할 컬럼의 이름  |   |  
windowSize  |  10 | 큐에서 사용할 윈도우의 크기  |   |  

#####Runner
SimpleSparkRunner 를 선택합니다.
field  |value   | 설명
--|---|--
inJason  | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드 라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달.
sparkArgs  | master | 아래의 표 참고

sparkArgs 설정은 다음과 같이 합니다.
field  |value   | 설명
--|---|--
master | local[\*] | YARN 의 마스터
executerMemory | 1g | Spark 의 익스큐터 메모리
numExecutors | 4 | 익스큐터 개수.
sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전
driverMemory | 1g | Spark 드라이버 메모리



#####Operator
첫번째 엔진에서는 센서로부터 입력되는 온도 데이터를 여섯가지 전처리 기법을 적용하여 정제합니다. 본 예제에서 사용한 전처리 기법은 ColumnSelectOperator, MinMaxScalingOperator, TransposeOperator, SparkMLPredictOperator, MinMaxScalingOperator, ColumnSelectOperator 입니다.

- ColumnSelectOperator

field  |value   | 설명
--|---|--
selectedColumnId  |  1 |  선택할 칼럼 ID

- MinMaxScalingOperator

field  |value   | 설명
--|---|--
selectedColumnId |  0 |  선택할 칼럼 ID
max  | 0.5  |  입력되는 값이 scaling 될 때 최대값
min  |  -0.5 |  입력되는 값이 scaling 될 때 최소값
withMinMaxRange  |  true | 위에서 설정한 mxx, min값 사용 여부
minRealValue  | -20  |  온도 값의 정상 범위로 사용할 실제 최소값
maxRealValue  |  50 |   온도 값의 정상 범위로 사용할 실제 최대값

- TransposeOperator

field  |value   | 설명
--|---|--
selectedColumnName | temperature | transpose 연산을 적용할 컬럼의 이름을 입력합니다.

- SparkMLPredictOperator

field  |value   | 설명
--|---|--
modelPath | file:///home/csle/ksb-csle/examples/autosparkml/test/automl_test | 학습 모델 경로
clsNameForModel  | org.apache.spark.ml.PipelineModel | 학습 모델 형식



- MinMaxScalingOperator

field  |value   | 설명
--|---|--
selectedColumnId | 13 |  선택할 칼럼 ID
max  | 50  |  입력되는 값이 rescaling 될 때 최대값
min  |  -20 |  입력되는 값이 rescaling 될 때 최소값
withMinMaxRange  |  true | 위에서 설정한 max, min 값 사용 여부
minRealValue  | -0.5  |  온도 값의 정상 범위로 사용할 최소값
maxRealValue  |  0.5 |   온도 값의 정상 범위로 사용할 최대값


- ColumnSelectOperator

field  |value   | 설명
--|---|--
selectedColumnId  | 13 |  선택할 칼럼 ID

### 두 번째 엔진 생성하기
####엔진 선택
스트림 형태로 전달되는 온도 예측값을 온디맨드 서빙을 하기 위해 OnDemandStreamServing 엔진을 선택합니다. (KSB프레임워크에서는 도커 컨테이너 내부의 지식베이스(KB)가 실행되고 있습니다.)

#####Reader
첫번째 엔진에서 입력되는 실시간 온도 예측값을 입력 받기 위해 KafkaOnDemandReader 를 선택합니다.
field  |value   | 설명
--|---|--
topic  | test3 | 실시간 온도 예측값을 받아올때 사용할 Kafka 큐의 이름  
zooKeeperConnect  | csle1:2181 |  zookeeper의 접속주소(IP, 포트번호)
groupId  | group3  | Kafka writer topic 그룹 아이디
bootStrapServer  | csle1:9092  | Kafka 접속 주소(IP, 포트번호)


#####Writer
이 엔진에서는 Writer를 사용하지 않습니다.



#####Controller
Controller 로는 servingWithkbController를 선택합니다. servingWithkbController는 추가적인 속성을 설정하지 않습니다.



#####Runner
REST 방식으로 엔진을 실행 하기위해서는 ServingRunner를 선택합니다.
field  |value   | 설명
--|---|--  |  
port  | 18080  | 최적의 제어값을 서빙할 포트번호 |  
host  | 0.0.0.0  | 최적의 제어값을 서빙할 IP  |  



#####Operator
두 번째 엔진에서는 ControlContextQueryOperator를 사용합니다.

field  |value   | 설명
--|---|--  |  
controlQuery  | 18080  | 세부 설정 아래 참고 |  
URI  | recommendDeviceControl  | 최적의 제어값을 서빙할 URI  |  

ControlQuery 의 세부 설정은 아래와 같이 합니다.

field  |value   | 설명
--|---|--  |  
resourceID  | temperature  | 센서의 리소스명  |  
thingId  | a1234  | 센서의 아이디 |
values(name)  | temperature | 질의 속성 |
|values(value)   | ?  | 예측된 센서 온도가 계속 변경되므로 물음표를 입력
|values(time)   |  ?  | 예측된 센서 데이터값이 입력되는 시간이 계속 변경되므로 물음표를 입력    |


<br>
<br>

![워크플로우 완성 화면](./images/2.5.9_01.png)

ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다.

##워크플로우 실행 및 모니터링하기
### 워크플로우 실행하기
위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 이 때, StreamToStream 엔진과 OnDemandStreamServing 엔진은 배치형태로 실행되지 않는 엔진들이므로 Batch 체크 박스는 해제하고 워크플로우를 제출해야합니다.

![Batch체크 해제로 변경할것](./images/2.5.9_02.png)



#### 서빙 URL 확인하기
Dashboard 탭으로 이동하면, 위에서 실행한 OnDemandStreamServing엔진이 서비스 하고있는 URL을 확인할 수 있습니다.
![서빙 URL 확인](./images/2.5.9_03.png)




### 워크플로우 모니터링 하기

#### 워크플로우 상태 확인
KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면, Workflow 탭이 선택되어 있습니다. Workflow 탭에는 위에서 실행한 두 엔진의 동작 상태 (status) 가 실행 중(Inprogress)인 것을 확인 할 수 있습니다.

![워크플로우 동작 상태 확인](./images/2.5.9_04.png)

#### 워크플로우 로그 보기
WorkFlow History 탭을 선택하면, KSB프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다.

![워크플로우 히스토리](./images/2.5.9_05.png)

![워크플로우 동작 로그](./images/2.5.9_06.png)

##결과 확인하기
#### 온도 스트림데이터 생성하기
워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야 합니다. 이를 위해 위에서 설명한 Jmeter 도구를 활용합니다.

![Jmeter로 온도 스트림데이터 입력하기](./images/2.5.9_07.png)


#### 서빙 기능 이용해 예측결과 받아오기
실시간으로 예측된 온도값을 받아오기 위해서는 예측 모델이 서빙 되고 있는 서버에 접근하여 예측결과를 받아와야합니다. REST 형태로 실시간 서빙되고 있는 모델에 접근하기 위해 Postman 을 이용합니다.
서빙 중인 모델의 URL 은 "서빙 URL 확인하기" 에서 확인할 수 있습니다.

```
Method: GET
URL: http://localhost:18080/query
```

<a href="https://csleoss.etri.re.kr/images/contents/manual_1.0/postman.html">Postman 설명 바로가기</a>

![Jmeter로 온도 스트림데이터 입력하기](./images/2.5.9_08.png)

## 워크플로우 종료하기

KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 RealtimeIngestToServingWithKbInTwoEngines 워크플로우를 종료(<span style="color:red">&#9724;</span>)할 수 있습니다.