--- html: toc: true offline: true export_on_save: html: true --- # 스트림 방식으로 ML모델 예측결과 얻기 (RealtimeIngestToPredictInSingleEngine) --- 온도센서 데이터를 수집한 후 전처리를 수행하여 Kafka 스트림으로 내보내는 예제를 설명합니다. ## 입력 데이터 준비하기 본 예제에서는 온도 센서 스트림 데이터를 HTTP로 입력 받는 것을 가정합니다. 이를 위해 Jmeter 도구를 활용합니다. Jmeter 설명 바로가기 ### 온도 스트림데이터 생성하기 워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야 합니다. Jmeter를 활용한 온도 스트림 데이터 생성 방법은 메뉴얼 하단의 "결과 확인하기" 를 확인해 주시기 바랍니다. ##워크플로우 생성하기 워크플로우 편집 화면을 이용하여 아래의 과정을 통해 워크플로우를 생성합니다. ### 엔진 생성하기 ####엔진 선택 본 예제에서는 스트림형태로 입력되는 온도 데이터를 입력받아 실시간 온도를 예측해서 스트림 형태로 출력하므로 StreamToStream엔진을 선택합니다. #####Reader HTTP 서버로부터 전달되어 오는 센서데이터를 입력 받기 위해 HttpServerReader를 선택하고 아래표와 같은 속성을 지정합니다. field |value | 설명 --|---|-- ip | 0.0.0.0 | HTTP 서버의 IP 주소 path | / | HTTP 서버의 URL port | 53002 | HTTP 서버의 포트 번호 #####Writer StreamToStream엔진에서 처리된 결과를 Kafka topic 으로 출력 할 것이므로 KafkaWriter를 선택합니다. field |value | 설명 --|---|-- topic | test3 | 온도 예측 결과를 받아서 저장할 때 사용할 Kafka 큐의 이름 zooKeeperConnect | csle1:2181 | zookeeper의 접속주소(IP, 포트번호) groupId | group3 | Kafka writer topic 그룹 아이디 bootStrapServer | csle1:9092 | Kafka 접속 주소(IP, 포트번호) #####Controller Controller 로는 WindowedSparkSessionOrStreamController 를 선택합니다. WindowedSparkSessionOrStreamController 는 스트림으로 입력되는 데이터를 큐에 저장하고, 일정 주기 마다 정해진 윈도우 크기로 잘라서 operator 에게 전달합니다. field |value | 설명 --|---|-- operationPeriod | 2 | Reader로 부터 데이터를 읽어올 주기 | 53002 | HTTP 서버의 포트번호 inputQueSize | 1 | 입력큐의 크기 | | selectedColumnName | temperature | Spark dataframe에서 추출할 컬럼의 이름 | | windowSize | 10 | 큐에서 사용할 윈도우의 크기 | | #####Runner SimpleSparkRunner 를 선택합니다. field |value | 설명 --|---|-- inJason | false | false로 설정하는 경우 json 형태의 파라메타를 커맨드라인 파라미터 형태로 변환하여 호출되는 외부 시스템에 전달. True 이면 json 형태의 파라메타 형태 그대로 외부시스템에 전달. sparkArgs | master | 아래의 표 참고 sparkArgs 설정은 다음과 같이 합니다. field |value | 설명 --|---|-- master | local[\*] | YARN 의 마스터 executerMemory | 1g | Spark 의 익스큐터 메모리 numExecutors | 4 | 익스큐터 개수 sparkVersion | 2.3.0 | KSB프레임워크의 현재 Spark 버전 driverMemory | 1g | Spark 드라이버 메모리 #####Operator 센서로부터 입력되는 온도 데이터를 여섯가지 전처리 기법을 적용하여 정제합니다. 본 예제에서 사용한 전처리 기법은 ColumnSelectOperator, MinMaxScalingOperator, TransposeOperator, SparkMLPredictOperator, MinMaxScalingOperator, ColumnSelectOperator 입니다. - ColumnSelectOperator field |value | 설명 --|---|-- selectedColumnId | 1 | 선택할 칼럼 ID - MinMaxScalingOperator field |value | 설명 --|---|-- selectedColumnId | 0 | 선택할 칼럼 ID max | 0.5 | 입력되는 값이 scaling 될 때 최대값 min | -0.5 | 입력되는 값이 scaling 될 때 최소값 withMinMaxRange | true | 위에서 설정한 max, min 값 사용 여부 minRealValue | -20 | 온도 값에서 정상 범위로 사용할 최소값 maxRealValue | 50 | 온도 값에서 정상 범위로 사용할 최대값 - TransposeOperator field |value | 설명 --|---|-- selectedColumnName | temperature | transpose 연산을 적용할 컬럼의 이름 - SparkMLPredictOperator field |value | 설명 --|---|-- modelPath | file:///home/csle/ksb-csle/examples/autosparkml/test/automl_test | 학습 모델의 모델 경로 clsNameForModel | org.apache.spark.ml.PipelineModel | 학습 모델의 형식 - MinMaxScalingOperator field |value | 설명 --|---|-- selectedColumnId | 13 | 선택할 칼럼 ID max | 50 | 입력되는 값이 rescaling 될 때 최대값 min | -20 | 입력되는 값이 rescaling 될 때 최소값 withMinMaxRange | true | 위에서 설정한 max, min 값 사용 여부 minRealValue | -0.5 | 온도값에서 정상 범위로 사용할 최소값 maxRealValue | 0.5 | 온도값에서 정상 범위로 사용할 최대값 - ColumnSelectOperator field |value | 설명 --|---|-- selectedColumnId | 13 | 선택할 칼럼 ID

![워크플로우 완성 화면](./images/2.5.7_01.png) ksbuser@etri.re.kr 계정으로 접속하면 예제 워크플로우가 만들어져있습니다. 불러오기해서도 돌려볼 수 있습니다. ##워크플로우 실행 및 모니터링하기 ### 워크플로우 실행하기 위에서 작성한 워크플로우를 실행하기 위해서는 워크플로우 편집기 상단의 실행 버튼을 누릅니다. 이 때, StreamToStream 엔진은 배치형태로 실행되지 않는 엔진들이므로 Batch 체크 박스는 해제하고 워크플로우를 실행해야합니다. ![Batch체크 해제로 변경할것](./images/2.5.7_02.png) ### 워크플로우 모니터링 하기 #### 워크플로우 상태 확인 KSB 웹툴킷 상단 메뉴의 Monitoring 탭을 클릭하면 Workflow 탭이 선택되어있습니다. Workflow 탭에는 위에서 실행한 두 엔진의 동작 상태 (status) 가 실행 중(Inprogress)인 것을 확인 할 수 있습니다. ![워크플로우 동작 상태 확인](./images/2.5.7_03.png) #### 워크플로우 로그 보기 WorkFlow History 탭을 선택하면, KSB프레임워크에서 워크플로우가 동작하며 발생시킨 로그 정보를 확인 할 수 있습니다. ![워크플로우 동작 로그](./images/2.5.7_04.png) ##결과 확인하기 #### 온도 스트림데이터 생성하기 워크플로우 실행 결과를 확인하기 위해서는 온도데이터를 스트림 형태로 발생시켜야 합니다. 이를 위해 위에서 설명한 Jmeter 도구를 활용합니다. ![Jmeter로 온도 스트림데이터 입력하기](./images/2.5.7_05.png) #### 스트림 형태의 온도 예측결과 받아오기 Kafka writer를 통해서 예측 결과가 Kafka topic 에 잘 저장되었는지 확인하기 위해서는 Kafka consumer를 이용합니다. Kafka consumer 명령은 아래와 같습니다. ![Kafka consumer 로 결과 받아오기](./images/2.5.7_06.png) ## 워크플로우 종료하기 KSB 웹툴킷 상단 "Monitoring" 메뉴의 "Workflow" 탭에서 실행 중인 RealtimeIngestToPredictInSingleEngine 워크플로우를 종료()할 수 있습니다.