3.3.2.HowToMakeScenarioWithNewReader.md 6.77 KB
Newer Older
HooYoungAhn's avatar
HooYoungAhn committed
---
html:
  toc: true
  offline: true
export_on_save:
  html: true
---
# 새로 개발한 Reader 컴퍼넌트를 이용하여 워크플로우 작성 및 실행하기
---

여기에서는 만들어진 컴펀너트를 사용하여 워크플로우를 작성하고 실행하는 방법을 설명합니다.
KSB 웹툴킷을 이용하지 않고, KSBClientSDK를 이용하여 워크플로우를 작성하는 방법을 중심으로, 워크플로우의 시나리오를 작성하고 실행하기 위한 코드를 작성하고 이를 실행하는 과정을 설명합니다. 전체 순서는 다음과 같습니다.

- 데이터셋 업로드 (Optional)
-	워크플로우 작성하기
-	작성한 워크플로우 실행하기


## 데이터 업로드
데이터의 업로드는 KSB 웹툴킷 인터페이스를 이용하여 업로드 하는 방법과 command-line 인터페이스를 통해 업로드하는 방법이 있습니다. KSB 웹툴킷을 이용한 방법은 사용자 가이드에 있는 방법과 동일합니다. 본 예제에서는 개발환경과 함께 배포된 **ksb-csle/examples/input/adult.csv** 파일을 입력으로,  **ksb-csle/examples/input/columnSelection_adult.csv** 파일을 column selection을 위한 오퍼레이터의 파라메터로 사용할 것이므로 HDFS에 업로드 후 진행하도록 하겠습니다.


## 워크플로우 작성 및 제출하기
KSB 프레임워크는 워크플로우 작성 및 실행 요청을 위한 방법으로 scala 기반의 KSB clientSDK환경과 그래픽(GUI) 기반의 환경을 제공합니다.

본 예제 코드는 csv 파일을 입력받아 필터링 오퍼레이션과 그룹핑하는 오퍼레이션을 거쳐 파일로 저장하는 데이터처리 엔진의 사양을 작성하고 하나의 워크플로우로 구성한 예제입니다. (자세한 내용은 아래 예제 커맨트 참조)

- KSB ClientSDK와 ScalaIDE를 이용한 워크플로우 작성:
다음은 scala 기반의 clientSDK를 이용하여 신규로 개발한  포함하는 워크플로우를 정의하는 구문의 일부를 보여줍니다. 각각의 reader, writer, operator, runner, controller의 구체적인 정의는 아래 전체 예제의 주석을 참고하시기 바랍니다.

```scala

package ksb.examples.custom

import scala.util.{Try, Success, Failure}

import org.apache.logging.log4j.scala.Logging
import com.google.protobuf.Message

import ksb.csle.common.proto.CustomDatasourceProto._
import ksb.csle.common.proto.StreamOperatorProto._
import ksb.csle.common.proto.DatasourceProto._
import ksb.csle.common.proto.RunnerProto._
import ksb.csle.common.proto.WorkflowProto._
import ksb.csle.common.proto.CustomOperatorProto._
import ksb.csle.common.proto.StreamControlProto._
import ksb.csle.common.proto.OndemandControlProto._
import ksb.csle.common.proto.SharedProto._
import ksb.csle.common.utils.ProtoUtils
import ksb.csle.common.utils.config.ConfigUtils

import ksb.csle.tools.client._

object MyFileReaderExample extends Logging {
  val appId: String = "Data-TrafficExample"

  def main(args: Array[String]) {
    val workflowJson = ProtoUtils.msgToJson(workflow.asInstanceOf[Message])
    println(workflowJson)
    val client = new SimpleCsleClient("csle1", 19999)
    Try (client.submit(
        workflowJson,
        "ksbuser@etri.re.kr",
        this.getClass.getSimpleName.replace("$",""),
        this.getClass.getSimpleName.replace("$",""))) match {
      case Success(id) => logger.info("submit success:" + id)
      case Failure(e) => logger.error("submit error", e)
    }
    client.close()
  }

  // ClientSDK를 이용한 워크플로우 사양 작성
  private def workflow = {

    // 새로 개발한 MyFileReader를 이용하기 위하여 proto 메세지 인스턴스를 생성하고 이를 MyFileReader를 클래스에 바인딩하기 위한 사양을 정의한다.
    val infileInfo = MyFileInfo.newBuilder()
      .addFilePath(
        //s"file:///home/csle/ksb-csle/examples/input/adult.csv"
        "hdfs://csle1:9000/user/ksbuser_etri_re_kr/dataset/input/adult.csv"
        .replaceAll("\\\\", "/"))
      .setFileType(MyFileInfo.FileType.CSV)
      .setDelimiter(";")
      .setHeader(true)
      .build
    val reader = BatchReaderInfo.newBuilder()
      .setId(1)
      .setPrevId(0)
      .setClsName("ksb.csle.component.reader.custom.MyFileReader")
      .setMyFileReader(infileInfo)

    val selectedColumnsWithFileInfo =
      SelectColumnsWithFileInfo.newBuilder()
      .setColumnIdPath(
          //s"file:///home/csle/ksb-csle/examples/input/columnSelection_adult.csv")
          "hdfs://csle1:9000/user/ksbuser_etri_re_kr/dataset/input/columnSelection_adult.csv")
      .build
    val operator = StreamOperatorInfo.newBuilder()
      .setId(1)
      .setPrevId(0)
      .setClsName("ksb.csle.component.operator.reduction.ColumnSelectWithFileOperator")
      .setSelectColumnsWithFile(selectedColumnsWithFileInfo)
      .build

    val outfileInfo = FileInfo.newBuilder()
      .addFilePath(
          //s"file:///home/csle/ksb-csle/examples/output/output_columnSelect_adult.csv"
          "hdfs://csle1:9000/user/ksbuser_etri_re_kr/output/output_columnSelect_adult.csv"
          .replaceAll("\\\\", "/"))
      .setFileType(FileInfo.FileType.CSV)
      .setDelimiter(",")
      .setHeader(true)
      .build
    val writer = BatchWriterInfo.newBuilder()
      .setId(11)
      .setPrevId(10)
      .setClsName("ksb.csle.component.writer.FileWriter")
      .setFileWriter(outfileInfo)

    val runner = StreamRunnerInfo.newBuilder()
      .setClsName("ksb.csle.component.runner.SimpleSparkRunner")
      .setSparkRunner(
          SparkRunnerInfo.getDefaultInstance)

    val controller = StreamControllerInfo.newBuilder()
      .setClsName("ksb.csle.component.controller.SparkSessionOrStreamController")
      .setSparkSessionOrStreamController(SimpleBatchOrStreamControllerInfo.getDefaultInstance)

    // 각 컴퍼넌트 사양을 조합하여 하나의 엔진 사양을 구성
    val dataEngineInfo = BatchToBatchStreamEngineInfo.newBuilder()
      .setController(controller)
      .setReader(reader)
      .setWriter(writer)
      .addOperator(operator)
      .setRunner(runner)
      .build

    val runType = RunType.newBuilder()
      .setId(1)
      .setPeriodic(Periodic.ONCE)
      .build()

    // 작성한 엔진사양들을 이용하여 하나의 워크플로우 흐름 조합
    WorkflowInfo.newBuilder()
      .setBatch(true)
      .setMsgVersion("v1.0")
      .setKsbVersion("v19.03")
      .setVerbose(true)
      .addRuntypes(runType)
      .addEngines(
          EngineInfo.newBuilder()
            .setId(1)
            .setPrevId(0)
            .setEngineNickName("DataEngine")
            .setBatchToBatchStreamEngine(dataEngineInfo))
      .build
  }
}

```

마지막으로 위의 예제를 이클립스 상에서 실행하고 writer 출력경로 상에 파일을 열어 column selection 결과를 확인합니다.