일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- 트리의 지름 파이썬
- SQL SERVER MIGRATION
- 프로그래머스 베스트앨범
- 백준 1516 게임 개발
- 베스트앨범 파이썬
- 프로그래머스 여행경로
- 백준 2146 다리 만들기
- 백준 1613 역사
- 백준 1043 거짓말 파이썬
- 램프 파이썬
- 백준 1167 트리의 지름 파이썬
- 역사 파이썬
- 백준 1238 파티 파이썬
- SWEA
- 순위 파이썬
- 백준 1034 램프 파이썬
- 백준 2352 반도체 설계 파이썬
- 다중 컬럼 NOT IN
- 등굣길 파이썬
- 프로그래머스 등굣길
- 게임 개발 파이썬
- 프로그래머스 순위
- SQL SERVER 장비교체
- 가장 긴 팰린드롬 파이썬
- 프로그래머스 순위 파이썬
- 다리 만들기 파이썬
- 반도체 설계 파이썬
- 가장 긴 바이토닉 부분 수열 파이썬
- 백준 11054.가장 긴 바이토닉 부분 수열
- 프로그래머스 가장 긴 팰린드롬
Archives
- Today
- Total
공부, 기록
[AIRFLOW, SPARK] AIRFLOW, SPARK 연동 본문
1. Spark ETL py 파일 생성
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import *
#DB Connection 설정
source_dbcc = "jdbc:sqlserver"//서버주소;databaseName=카탈로그명;"
target_dbcc = "jdbc:sqlserver"//서버주소;databaseName=카탈로그명;"
#테이블명, login 설정
source_table = "테이블명"
target_table = "테이블명"
username = "로그인 id"
password = "password"
#Spark session 생성
conf = SparkConf().setAppName("ETL_TEST").set("packagets","sqljdbc_7.2/enu/mssql-jdbc-7.2.1.jre8.jar")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession
#Source에서 테이블 Read
data = spark.read\
.format("jdbc")\
.option("url", source_dbcc)\
.option("isolationLevel","READ_UNCOMMITTED")\ #격리수준 설정
.option("dbtable", source_table)\
.option("user", username)\
.option("password",password)\
.option("partitionColumn", 병럴 처리 기준으로 사용될 컬럼명칭)\
.option("lowerBound", 병럴 처리 조건의 lower 값 ex 0)\
.option("upperBound", 병럴 처리 조건의 upper 값 ex 100000)\
.option("numPartitions", 몇개의 병럴처리로 진행할 것인지 ex 5로 설정할 경우 0~20000, 20000~40000.. 80000~100000, 100000~ 으로 이관 진행)
.option("fetchSize", 한번에 몇개의 ROW를 읽을 것 인지 (디폴트는 10으로 알고있음), EX 10000)
.load()
#Target에 Data Write
try:
data.select(저장할 컬럼 지정 가능 ex "col1", "col2")\
.write.format("jdbc")\
.option("url", target_dbcc")\
.option("dbtable", target_table)\
.option("user", username)\
.option("password",password)\
.option("batchsize", 한번에 몇개의 데이터를 넣고 commit 하는지)\
save()
except ValueEroor as error :
print("Connecator write failed",error)
spark.stop() #연결 끊기
만약 Read에서 partionColumn, lowerBound, upperBound, numPartitions 를 설정하지 않으면 전체 데이터를 한번에 읽어오는 방식으로 진행이되는데 이는 병렬처리가 가능한 Spark의 장점을 이용하지 않는 방식이 된다.
추가로 R&D가 필요한 부분 : upperBound를 설정할 경우 해당 숫자까지가 아닌 그 이후의 숫자는 전체를 가져오게 되는데 일부의 데이터만 이관하고 싶을 경우 어떻게 처리하는지 파악이 필요
-> load().filter 옵션을 추가하여 조건 설정 가능 !
Airflow DAG 파일 생성
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
'owner' : '사용자이름',
'retries': 0,
'retry_delay': timedelta(seconds=20),
'depends_on_past': False
}
dag_spark = DAG(
'MS_ETL_TEST',
start_date=days_ago(2),
default_args=default_args,
schedule_interval='@once',
catchup=False,
is_paused_upon_creation=False,
}
cmd = 'spark-submit ~/airflow/폴더경로/spark파일명.py'
#시작 dummy
task_start = DummyOperator(
task_id = 'start',
dag = dag_spark,
)
#시작 후 다음단계 진행 dummy
task_next = DummyOperator(
task_id = 'next',
trigger_rule = 'all_success',
dag = dag_spark,
)
#종료 dummy
task_finish = DummyOperator(
task_id = 'finish',
trigger_rule = 'all_success',
dag = dag_spark,
)
#spark task dummy
task_spark = DummyOperator(
task_id = 'spark_task",
dag = dag_spark,
bash_command = cmd,
)
#의존관계 설정 방식
task_start >> task_next >> task_spark >> task_finish
간단하게 Spark 파일을 실행하는 DAG를 생성가능.
'공부 > 소소한 개발' 카테고리의 다른 글
MSA (Micro Service Architecture) (0) | 2022.08.15 |
---|---|
타입스크립트 (0) | 2022.08.06 |
[AIRFLOW, SPARK] Airflow, SPARK 설치 (0) | 2022.08.06 |
JAVA 동시성 이슈 (0) | 2022.07.09 |
Kafka Replication (0) | 2022.06.18 |