공부/소소한 개발
[AIRFLOW, SPARK] AIRFLOW, SPARK 연동
무는빼주세요
2022. 8. 6. 15:04
1. Spark ETL py 파일 생성
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import *
#DB Connection 설정
source_dbcc = "jdbc:sqlserver"//서버주소;databaseName=카탈로그명;"
target_dbcc = "jdbc:sqlserver"//서버주소;databaseName=카탈로그명;"
#테이블명, login 설정
source_table = "테이블명"
target_table = "테이블명"
username = "로그인 id"
password = "password"
#Spark session 생성
conf = SparkConf().setAppName("ETL_TEST").set("packagets","sqljdbc_7.2/enu/mssql-jdbc-7.2.1.jre8.jar")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession
#Source에서 테이블 Read
data = spark.read\
.format("jdbc")\
.option("url", source_dbcc)\
.option("isolationLevel","READ_UNCOMMITTED")\ #격리수준 설정
.option("dbtable", source_table)\
.option("user", username)\
.option("password",password)\
.option("partitionColumn", 병럴 처리 기준으로 사용될 컬럼명칭)\
.option("lowerBound", 병럴 처리 조건의 lower 값 ex 0)\
.option("upperBound", 병럴 처리 조건의 upper 값 ex 100000)\
.option("numPartitions", 몇개의 병럴처리로 진행할 것인지 ex 5로 설정할 경우 0~20000, 20000~40000.. 80000~100000, 100000~ 으로 이관 진행)
.option("fetchSize", 한번에 몇개의 ROW를 읽을 것 인지 (디폴트는 10으로 알고있음), EX 10000)
.load()
#Target에 Data Write
try:
data.select(저장할 컬럼 지정 가능 ex "col1", "col2")\
.write.format("jdbc")\
.option("url", target_dbcc")\
.option("dbtable", target_table)\
.option("user", username)\
.option("password",password)\
.option("batchsize", 한번에 몇개의 데이터를 넣고 commit 하는지)\
save()
except ValueEroor as error :
print("Connecator write failed",error)
spark.stop() #연결 끊기
만약 Read에서 partionColumn, lowerBound, upperBound, numPartitions 를 설정하지 않으면 전체 데이터를 한번에 읽어오는 방식으로 진행이되는데 이는 병렬처리가 가능한 Spark의 장점을 이용하지 않는 방식이 된다.
추가로 R&D가 필요한 부분 : upperBound를 설정할 경우 해당 숫자까지가 아닌 그 이후의 숫자는 전체를 가져오게 되는데 일부의 데이터만 이관하고 싶을 경우 어떻게 처리하는지 파악이 필요
-> load().filter 옵션을 추가하여 조건 설정 가능 !
Airflow DAG 파일 생성
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
'owner' : '사용자이름',
'retries': 0,
'retry_delay': timedelta(seconds=20),
'depends_on_past': False
}
dag_spark = DAG(
'MS_ETL_TEST',
start_date=days_ago(2),
default_args=default_args,
schedule_interval='@once',
catchup=False,
is_paused_upon_creation=False,
}
cmd = 'spark-submit ~/airflow/폴더경로/spark파일명.py'
#시작 dummy
task_start = DummyOperator(
task_id = 'start',
dag = dag_spark,
)
#시작 후 다음단계 진행 dummy
task_next = DummyOperator(
task_id = 'next',
trigger_rule = 'all_success',
dag = dag_spark,
)
#종료 dummy
task_finish = DummyOperator(
task_id = 'finish',
trigger_rule = 'all_success',
dag = dag_spark,
)
#spark task dummy
task_spark = DummyOperator(
task_id = 'spark_task",
dag = dag_spark,
bash_command = cmd,
)
#의존관계 설정 방식
task_start >> task_next >> task_spark >> task_finish
간단하게 Spark 파일을 실행하는 DAG를 생성가능.