--- html: toc: true offline: true export_on_save: html: true --- # 스트림 방식으로 예측하여 REST 방식으로 서빙하기 (RealtimeIngestToServingInTwoEngines) --- 온도센서 데이터를 수집한 후 전처리를 수행하고, 기계학습 모델에 의해 예측된 온도값을 클라이언트로 전달하는 예제를 설명합니다. ## 입력 데이터 준비하기 본 예제에서는 온도 센서 스트림 데이터를 HTTP로 입력 받는 것을 가정합니다. 이를 위해 Jmeter 도구를 활용합니다. Jmeter 설명 바로가기 ### 온도 스트림데이터 생성하기 워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야합니다. Jmeter를 활용한 온도 스트림데이터 생성하는 방법은 메뉴얼 하단의 "결과 확인하기" 를 확인해 주시기 바랍니다. ## 워크플로우 생성하기 워크플로우 편집화면에서 워크플로우를 작성합니다. 본 예제에서는 두 개의 엔진을 생성합니다. ### 첫 번째 엔진 생성하기 #### 엔진 선택 본 예제에서는 스트림형태로 입력되는 온도 데이터를 입력받아 실시간 온도 예측을 하는 서빙 엔진으로 전달하기 위해 Streamtostream 엔진을 선택합니다. #####Reader HTTP 서버로부터 전달되어 오는 센서데이터를 입력 받기 위해 HttpServerReader를 선택하고 아래표와 같은 속성을 지정합니다. field |value | 설명 --|---|-- ip | 0.0.0.0 | HTTP 서버의 IP 주소 path | / | HTTP 서버의 URL port | 53002 | HTTP 서버의 포트 번호 ##### Writer StreamToStream엔진에서 처리된 결과를 다음 엔진에게 Stream 형태로 전달할 것이므로, KafkaWriter를 선택합니다. field |value | 설명 --|---|-- topic | test3 | 온도 예측 결과를 받아 올때 사용할 Kafka 큐의 이름 zooKeeperConnect | csle1:2181 | zookeeper의 접속주소(IP, 포트번호) groupId | group3 | Kafka writer topic 그룹 아이디 bootStrapServer | csle1:9092 | Kafka 접속 주소(IP, 포트번호) ##### Controller Controller 로는 WindowedSparkSessionOrStreamController 를 선택합니다. WindowedSparkSessionOrStreamController 는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다. field |value | 설명 --|---|-- operationPeriod | 2 | Reader로 부터 데이터를 읽어올 주기 | 53002 | HTTP 서버의 포트번호 inputQueSize | 1 | 입력 큐의 크기 | | selectedColumnName | temperature | Spark dataframe에서 추출할 컬럼의 이름 | | windowSize | 10 | 큐에서 사용할 윈도우의 크기 | | ##### Runner SimpleSparkRunner 를 선택합니다. field |value | 설명 --|---|-- inJason | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달. sparkArgs | master | 아래의 표 참고 sparkArgs 설정은 다음과 같이 합니다. field |value | 설명 --|---|-- master | local[\*] | YARN 의 마스터 executerMemory | 1g | Spark 의 익스큐터 메모리 numExecutors | 4 | 익스큐터 개수 sparkVersion | 2.3.0 | KSB프레임워크의 Spark 버전 driverMemory | 1g | Spark 드라이버 메모리 ##### Operator 첫번째 엔진에서는 센서로부터 입력되는 온도 데이터를 여섯가지 전처리 기법을 적용하여 정제합니다. 본 예제에서 사용한 전처리 기법은 ColumnSelectOperator, MinMaxScalingOperator, TransposeOperator, SparkMLPredictOperator, MinMaxScalingOperator, ColumnSelectOperator 입니다. - ColumnSelectOperator field |value | 설명 --|---|-- selectedColumnId | 1 | 선택할 칼럼 ID - MinMaxScalingOperator field |value | 설명 --|---|-- selectedColumnId | 0 | 선택할 칼럼 ID max | 0.5 | 입력되는 값이 scaling 될 때 최대값 min | -0.5 | 입력되는 값이 scaling 될 때 최소값 withMinMaxRange | true | 위에서 설정한 max, min값 사용 여부 minRealValue | -20 | 온도 값에서 정상 범위로 사용할 실제 최소값 maxRealValue | 50 | 온도 값에서 정상 범위로 사용할 실제 최대값 - TransposeOperator field |value | 설명 --|---|-- selectedColumnName | temperature | transpose 연산을 적용할 컬럼의 이름 - SparkMLPredictOperator field |value | 설명 --|---|-- modelPath | file:///home/csle/ksb-csle/examples/autosparkml/test/automl_test | 학습 모델 경로 clsNameForModel | org.apache.spark.ml.PipelineModel | 학습 모델 형식 - MinMaxScalingOperator field |value | 설명 --|---|-- selectedColumnId | 13 | 선택할 칼럼 ID max | 50 | 입력되는 값이 rescaling 될 때 최대값 min | -20 | 입력되는 값이 rescaling 될 때 최소값 withMinMaxRange | true | 실제 최대/최소값 범위를 이용하여 rescaling 합니다. minRealValue | -0.5 | 온도값에서 정상범위로 사용할 최소값 maxRealValue | 0.5 | 온도값에서 정상범위로 사용할 최대값 - ColumnSelectOperator field |value | 설명 --|---|-- selectedColumnId | 13 | 선택할 칼럼 ID ### 두 번째 엔진 생성하기 #### 엔진 선택 스트림 형태로 전달되는 온도 예측값의 온디맨드 서빙을 하기 위해 OnDemandStreamServing 엔진을 선택합니다. (KSB프레임워크에서는 도커 컨테이너 내부의 지식베이스(KB)가 실행되고 있습니다.) ##### Reader 첫번째 엔진에서 입력되는 실시간 온도 예측값을 입력 받기 위해 KafkaOnDemandReader 를 선택합니다. field |value | 설명 --|---|-- topic | test3 | 실시간 온도 예측값을 받아올때 사용할 Kafka 큐의 이름 zooKeeperConnect | csle1:2181 | zookeeper의 접속주소(IP, 포트번호)를 groupId | group3 | Kafka writer topic 그룹 아이디 bootStrapServer | csle1:9092 | Kafka 접속 주소(IP, 포트번호) ##### Writer 이 엔진에서는 별도의 Writer를 사용하지 않습니다. ##### Controller Controller 로는 servingWithkbController를 선택합니다. ServingWithkbController는 추가적인 속성을 설정하지 않습니다. field |value | 설명 --|---|-- restfulActorName | ksb.csle.component.pipe.controller.actor.CompositiveServingActor | url | postQuery | ##### Runner REST 방식으로 엔진을 실행 하기위해서 ServingRunner를 선택합니다. field |value | 설명 --|---|-- | port | 18080 | 최적의 제어값을 서빙할 포트번호 | host | 0.0.0.0 | 최적의 제어값을 서빙할 IP | ##### Operator 이 엔진에서는 별도의 Operator를 설정하지 않습니다.

![워크플로우 완성 화면](./images/2.5.8_01.png) ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다. ## 워크플로우 실행 및 모니터링하기 ### 워크플로우 실행하기 위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 이 때, StreamToStream 엔진과 OnDemandStreamServing 엔진은 배치형태로 실행되지 않는 엔진들이므로 Batch 체크 박스는 해제하고 워크플로우를 제출해야합니다. ![Batch체크 해제로 변경할것](./images/2.5.8_02.png) #### 서빙 URL 확인하기 Dashboard 탭으로 이동하면, 위에서 실행한 OnDemandStreamServing엔진이 서비스 하고있는 URL을 확인할 수 있습니다. ![서빙 URL 확인](./images/2.5.8_03.png) ### 워크플로우 모니터링 하기 #### 워크플로우 상태 확인 KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에는 위에서 실행한 두 엔진의 동작 상태 (status) 가 실행 중(Inprogress)인 것을 확인 할 수 있습니다. ![워크플로우 동작 상태 확인](./images/2.5.8_04.png) #### 워크플로우 로그 보기 WorkFlow History 탭을 선택하면, KSB프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다. ![워크플로우 히스토리](./images/2.5.8_05.png) ![워크플로우 동작 로그](./images/2.5.8_06.png) ## 결과 확인하기 #### 온도 스트림데이터 생성하기 워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야 합니다. 이를 위해 위에서 설명한 Jmeter 도구를 활용합니다. ![Jmeter로 온도 스트림데이터 입력하기](./images/2.5.8_07.png) #### 서빙 기능을 이용해 예측결과 받아오기 실시간으로 예측된 온도값을 받아오기 위해서는 예측 모델이 서빙 되고 있는 서버에 접근하여 예측결과를 받아와야합니다. REST 형태로 실시간 서빙되고 있는 모델에 접근하기 위해 Postman을 이용합니다. 서빙 중인 모델의 URL 은 "서빙 URL 확인하기" 에서 확인할 수 있습니다. ``` Method: GET URL: http://localhost:18080/query ``` Postman 설명 바로가기 ![Jmeter로 온도 스트림데이터 입력하기](./images/2.5.8_08.png) ## 워크플로우 종료하기 KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 RealtimeIngestToServingInTwoEngines 워크플로우를 종료()할 수 있습니다.