--- html: toc: true offline: true export_on_save: html: true --- # Operator 컴퍼넌트 개발하기 --- 데이터 처리용 Operator 컴퍼넌트을 만들기 위한 절차는 다음과 같습니다. - 신규 컴퍼넌트용 Protocol Buffer 메세지 작성 - 새로 작성한 메세지를 포함한 프레임워크 코어 새로 빌드 - Operator 컴퍼넌트 스칼라 코드 작성 - 컴퍼넌트 빌드하여 프레임워크에 배포 아래에는 Operator 컴퍼넌트를 개발하고 이를 실행 중인 KSB 프레임워크 상에 배포하는 과정을 설명합니다. 자세한 설명을 위해서 column selection을 수행하는 간단한 오퍼레이터인 *ColumnSelectWithFileOperator* 개발을 대상으로 설명을 하도록 하겠습니다. 단, 이미 *ColumnSelectWithFileOperator*이 등록되어 있으므로, 여기서는 **MyColumnSelectWithFileOperator**로 변경해서 신규 컴퍼넌트로 작성하여 등록하는 과정을 진행해 보도록 하겠습니다. ## 컴퍼넌트용 Protocol Buffer 메세지 작성 우선, 개발할 MyColumnSelectWithFileOperator에 전달될 파라메터를 Google의 Protocol buffer 메세지 방식으로 작성해야 합니다. 이를 위해서 다운로드한 KSB 프레임워크 오픈소스를 ScalaIDE를 이용하여 프로젝트를 열어 **ksb-core-digest_2.11** 프로젝트가 있는지 확인합니다. **ksb-core-digest_2.11** 모듈은 엔진을 생성하고 구동하기 위한 컴퍼넌트들의 파라메터를 정의한 Protocol buffer 메시지들을 모아놓은 모듈이며, 다음과 같은 구조를 같습니다. protofiles 예제로 작성할 MyColumnSelectWithFileOperator 오퍼레이터는 table 형태의 dataframe의 필드 중 특정 필드를 선택하는 오퍼레이터로서 선택하고자 하는 필드명을 파라메터로 입력받는 대신 미리 작성해 둔 필드명을 정의한 파일을 읽어 해당 필드만 선택하여 추출하는 오퍼레이터입니다. ksb.csle.common.custom.proto 패키지 내에 있는 **custom_operator_proto.proto** 파일을 열어 아래와 같이 **MySelectColumnsWithFileInfo** 메세지를 작성합니다. ```protobuf syntax = "proto2"; // 아래 ksb.csle.common.custom.proto 가 아니라 ksb.csle.common.proto 임을 유의 !!!! package ksb.csle.common.proto; message MySelectColumnsWithFileInfo { required string columnIdPath = 1; } ``` 작성한 Protocol Buffer 메세지가 스트림 처리용으로 사용될 예정이므로 **stream_control_proto.proto** 을 열어 StreamOperatorInfo 내에 oneof_operators 필드 내에 MySelectColumnsWithFileInfo 유형의 mySelectColumnsWithFile 필드로 등록합니다. ```protobuf message StreamOperatorInfo { required int32 id = 1; required int32 prevId = 2; required string clsName = 4; oneof oneof_operators { FilterInfo filter = 10; MissingValueImputationInfo missingValueImputation= 11; GroupbyInfo groupby = 12; ReshapeWithConcatInfo reshapeWithConcat = 13; PivotInfo pivot = 14; AddTimeIndexColumnInfo addTimeIndexColumn = 15; ... // for MyColumnSelectWithFileOperator. MySelectColumnsWithFileInfo mySelectColumnsWithFile = 1000; } } ``` ## 새로운 프레임워크 코어 빌드하기 신규 컴퍼넌트를 추가하기 위해서는 본 페이지를 통해 신규 컴퍼넌트를 인식할 수 있도록 Core에 대한 바이너리를 신규로 빌드하여 사용해야 합니다. **매뉴얼 > Core 빌드** 페이지로 이동합니다. 우리가 수정한 proto 파일(**stream_control_proto.proto** 와 **custom_operator_proto**)을 아래 그림과 같이 선택하고 빌드요청 버튼을 클릭한다. 요청 후 약 20분 정도가 경과하면 자신의 로그인 계정의 메일로 신규 빌드된 core 바이너리가 도착합니다(참고: 요청은 순차적으로 처리되므로 미리 대기중인 요청이 있다면 20분 이상 걸릴 수 있음). ## 새로운 프레임워크 코어로 프로젝트 빌드하기 메일에서 바이너리를 다운로드한 후, 확장자를 .jar로 변경하고, KSB 개발 프로젝트 디렉토리의 최상위 디렉토리아래 libs 디렉토리 (ksb-oss_dist/libs)에  ksb-core_2.11-${project.version}-SNAPSHOT.jar 파일을 업데이트 합니다. 아래와 같이 Maven 명령을 실행하여 전체 프로젝트를 새로 빌드한다. ```sh $ mvn generate-sources package install -P gpu -DskipTests -Drat.skip=true ``` ## 신규 컴퍼넌트 스칼라 코드 작성 다음으로 작성된 메시지를 이용하여 신규컴퍼넌트 클래스를 개발합니다. **ksb-components-custom_2.11** 모듈을 열어 아래와 같이 MyColumnSelectWithFileOperator 오퍼레이터를 구현합니다. 이 오퍼레이터는 BaseDataOperator 추상클래스를 상속받아 operator 메소드를 구현해야 합니다. BaseDataOperator 클래스의 정의에 따라 operate(df: DataFrame)를 반드시 override 해야 프레임워크 상에서 Controller에 의해 수행이 가능합니다. 자세한 사항은 KSB 프레임워크 Scala API를 참조하시기 바랍니다. ```scala package ksb.csle.component.operator.custom import scala.collection.JavaConversions._ import org.apache.spark.sql.DataFrame import ksb.csle.common.base.operator.BaseDataOperator import ksb.csle.common.proto.StreamOperatorProto._ // 새로 만든 메세지 객체 임포트 import ksb.csle.common.proto.CustomOperatorProto.MySelectColumnsWithFileInfo import ksb.csle.common.proto.StreamControlProto.StreamOperatorInfo class MyColumnSelectWithFileOperator( o: StreamOperatorInfo ) extends BaseDataOperator[StreamOperatorInfo, DataFrame](o) { private val p = o.getMySelectColumnsWithFile private val path = p.getColumnIdPath private var columnNames: Array[String] = null private def selectColumns(df: DataFrame): DataFrame = { logger.debug(s"OpId ${o.getId} : SelectColumns") if (columnNames == null) { columnNames = df.sparkSession.sparkContext.textFile(path) .map(_.split(",")).collect.flatten } val result = df.select(columnNames.head, columnNames.tail:_*) logger.debug(result.show.toString) logger.debug(result.printSchema.toString) result } override def operate(df: DataFrame): DataFrame = selectColumns(df) } object MyColumnSelectWithFileOperator { def apply(o: StreamOperatorInfo): MyColumnSelectWithFileOperator = new MyColumnSelectWithFileOperator(o) } ``` ## 신규 컴퍼넌트 빌드하여 jar 파일 생성하기 작성한 스칼라 오퍼레이터 클래스를 빌드하여 새로운 .jar를 생성하기 위하여 component-custom 프로젝트 위치에서 maven 명령(아래)을 이용하여 새로운 오퍼레이터가 포함된 바이너리(jar)를 빌드합니다. ```sh $ cd component-custom $ mvn generate-sources package install -P gpu -DskipTests -Drat.skip=true ``` 결과로 target 디렉토리에 **ksb-component-custom_2.11-{version}.jar** 가 생성된 것을 확인합니다. ## 신규 컴퍼넌트를 실행 프레임워크에 배포 새로 작성한 메세제와 오퍼레이터가 포함된 바이너리를 KSB 프레임워크 실행바이너리가 있는 위치에 배포합니다. (레파지토리에 배포가 완료됨과 동시에 별도의 설치과정 없이 클라이언트의 요청 시 KSB 프레임워크에 의해 동적으로 클러스터 상의 노드들에 배포되어 엔진을 생성하고 구동하는데 적용이 됩니다.) 새로 배포해야할 KSB 프레임워크의 패키지는 아래와 같이 두 개의 jar에 해당합니다. - ksb-csle/libs/**ksb-core_2.11-{version}.jar** -- 신규메세지 포함된 core 바이너리로서 core 빌드 요청결과로 받은 파일 - ksb-csle/component-custom/target/**ksb-component-custom_2.11-{version}.jar** -- 새로 작성한 컴퍼넌트 포함. 콘솔창에서 아래와 같이 maven 명령을 실행 위의 ksb-component-custom_2.11 빌드파일을 생성할 수 있습니다. 상기의 빌드된 패키지를 KSB 프레임워크 상에서 지정한 아카이브 디렉토리에 복사해서 넣어줌으로써 배포가 완료가 완료됩니다. 참고로, 아카이브 위치는 별도의 설정이 없다면 내부적으로 ```${KSB_HOME}/jars/${ksb-vsersion}``` 디렉토리를 기본값으로 사용합니다.