공부, 기록

[AIRFLOW, SPARK] AIRFLOW, SPARK 연동 본문

공부/소소한 개발

[AIRFLOW, SPARK] AIRFLOW, SPARK 연동

무는빼주세요 2022. 8. 6. 15:04

1. Spark ETL py 파일 생성

 

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import *

#DB Connection 설정
source_dbcc = "jdbc:sqlserver"//서버주소;databaseName=카탈로그명;"
target_dbcc = "jdbc:sqlserver"//서버주소;databaseName=카탈로그명;"

#테이블명, login 설정
source_table = "테이블명"
target_table = "테이블명"
username = "로그인 id"
password = "password"

#Spark session 생성

conf = SparkConf().setAppName("ETL_TEST").set("packagets","sqljdbc_7.2/enu/mssql-jdbc-7.2.1.jre8.jar")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

#Source에서 테이블 Read
data = spark.read\
	.format("jdbc")\
    .option("url", source_dbcc)\
    .option("isolationLevel","READ_UNCOMMITTED")\ #격리수준 설정
    .option("dbtable", source_table)\
    .option("user", username)\
    .option("password",password)\
    .option("partitionColumn", 병럴 처리 기준으로 사용될 컬럼명칭)\
    .option("lowerBound", 병럴 처리 조건의 lower 값 ex 0)\
    .option("upperBound", 병럴 처리 조건의 upper 값 ex 100000)\
    .option("numPartitions", 몇개의 병럴처리로 진행할 것인지 ex 5로 설정할 경우 0~20000, 20000~40000.. 80000~100000, 100000~ 으로 이관 진행)
    .option("fetchSize", 한번에 몇개의 ROW를 읽을 것 인지 (디폴트는 10으로 알고있음), EX 10000)
    .load()
  
#Target에 Data Write
try:
data.select(저장할 컬럼 지정 가능 ex "col1", "col2")\
	.write.format("jdbc")\
    .option("url", target_dbcc")\
    .option("dbtable", target_table)\
    .option("user", username)\
    .option("password",password)\
    .option("batchsize", 한번에 몇개의 데이터를 넣고 commit 하는지)\
    save()
except ValueEroor as error : 
	print("Connecator write failed",error)
    
 spark.stop() #연결 끊기

만약 Read에서 partionColumn, lowerBound, upperBound, numPartitions 를 설정하지 않으면 전체 데이터를 한번에 읽어오는 방식으로 진행이되는데 이는 병렬처리가 가능한 Spark의 장점을 이용하지 않는 방식이 된다.

추가로 R&D가 필요한 부분 : upperBound를 설정할 경우 해당 숫자까지가 아닌 그 이후의 숫자는 전체를 가져오게 되는데 일부의 데이터만 이관하고 싶을 경우 어떻게 처리하는지 파악이 필요 

-> load().filter 옵션을 추가하여 조건 설정 가능 !

 

Airflow DAG 파일 생성

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

default_args = {
	'owner' : '사용자이름',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}

dag_spark = DAG(
	'MS_ETL_TEST',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='@once',
    catchup=False,
    is_paused_upon_creation=False,
}

cmd = 'spark-submit ~/airflow/폴더경로/spark파일명.py'

#시작 dummy
task_start = DummyOperator(
	task_id = 'start',
    dag = dag_spark,
)

#시작 후 다음단계 진행 dummy
task_next = DummyOperator(
	task_id = 'next',
    trigger_rule = 'all_success',
    dag = dag_spark,
)

#종료 dummy
task_finish = DummyOperator(
	task_id = 'finish',
    trigger_rule = 'all_success',
    dag = dag_spark,
)

#spark task dummy
task_spark = DummyOperator(
	task_id = 'spark_task",
    dag = dag_spark,
    bash_command = cmd,
)

#의존관계 설정 방식

task_start >> task_next >> task_spark >> task_finish

간단하게 Spark 파일을 실행하는 DAG를 생성가능.

'공부 > 소소한 개발' 카테고리의 다른 글

MSA (Micro Service Architecture)  (0) 2022.08.15
타입스크립트  (0) 2022.08.06
[AIRFLOW, SPARK] Airflow, SPARK 설치  (0) 2022.08.06
JAVA 동시성 이슈  (0) 2022.07.09
Kafka Replication  (0) 2022.06.18