공부, 기록

[ClickHouse] 빠른 분석을 위한 컬럼기반 DBMS, Spark, Spring Boot 연동 본문

공부/DATABASE

[ClickHouse] 빠른 분석을 위한 컬럼기반 DBMS, Spark, Spring Boot 연동

무는빼주세요 2023. 2. 5. 14:04

신규 프로젝트로 구글 Analytics 처럼 내부 데이터를 Snakey Diagram, Funnel 분석, Segment 분석 등을 할 수 있는 툴을 만들어야했다.

기존에 내부 데이터 통계 툴에 신규 탭으로 추가될 예정이었고 기술 스택도 크게 차이가 없을 것이라 판단했다. (기존 : 스프링부트 + SQL Server)

그러나 해당 툴은 대용량 데이터 (10억건 이상) 의 데이터에 비정규 조건으로 조회가 들어오며 또한 실시간으로 API가 데이터를 반환해주어야했다. 이를 기존의 스택으로 빠르게 응답해주기는 어렵다고 판단하였고.. 대안으로 2가지 방법을 생각해보았다.

1. Spark 를 사용한 데이터 분석

2. 신규 DBMS(통계용)를 스프링부트에 연결하여 분석

 

1의 경우 파이썬 API 개발을 통하여 PySpark를 활용 빠르게 데이터를 분석하고자 하였다. 

2의 경우 카카오 컨퍼런스에서 보았던 ClickHouse DBMS를 구축하고 데이터 파이프라인을 추가하여 실시간으로 스프링부트에서 API를 호출하고자 하였다.

 

차후에도 분석용 DBMS가 필요할 경우가 있을 것이라 판단하였고 또한 Clickhouse에 분석용 함수들이 강력하다고 판단이 되어 2의 방안으로 선택하였다.

 

ClickHouse는 2가지 구성으로 사용할 수 있는데 클러스터 방식과 오픈소스 방식인 self-managed 방식이 있다. 당연히 오픈소스 방식을 선택하였고 리눅스 서버에 구성하였다.

 

서버를 올리는데 까지는 무난하게 성공하였고 이제는 데이터 파이프라인을 구축하여야 했다.

초기에는 jdbc bridge clickhouse 를 통하여 데이터를 가져올 수 있었다 (https://github.com/ClickHouse/clickhouse-jdbc-bridge)

하지만 데이터를 주기적으로 이관을 시켜줘야했는데 이를 위해서 AIRFLOW와 PySpark를 연동하여 데이터 파이프라인을 만들어 줄 필요가 있었다.
Spark, ClickHouse, SQL Server 연동
pip install clickhouse-connect 
pip install pandas

 

 

from pyspark import SparkContext, SparkConf, SQLContext 
import clickhouse_connect 
if __name__ == '__main__': 
from pyspark.sql import SparkSession 
sql_connection1 = "jdbc:sqlserver://ip:port;databaseName=db명칭;" 
table_name = "테이블명" 
username = "서비스 계정명" 
password = "패스워드"
conf = SparkConf() \ 
.setAppName("clickhouse_test") \ 
.set("packages","sqljdbc_7.2/enu/mssql-jdbc-7.2.1.jre8.jar")\ 
.set("LogLevel","DEBUG") 
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc) 
spark = sqlContext.sparkSession 
df = spark.read\ 
.format("jdbc")\ 
.option("url", sql_connection1) \ 
.option("dbtable", table_name) \ 
.option("user", username) \
.option("password", password)\ 
.option("isolationLevel", "READ_UNCOMMITTED")\ 
.load()
 #read된 데이터를 table 형식으로 임시 사용 
applog = df.createOrReplaceTempView("appLogTmp") 
#sql문 처리 
query = spark.sql("select 쿼리") 
pandasDF= query.toPandas() 
client = clickhouse_connect.get_client(host='ip', port='port', username = 'default', password = '비밀번호!') 
client.insert_df('tracking.app_log_live', pandasDF) 
result = client.query('SELECT count(*) FROM tracking.app_log_live')
spark.stop()

다음은 기존 Spring Boot 구성에 ClickHouse DataSource를 추가하여야 했다. 이전에 2개의 SQL Server가 DataSource로 구성했던 방법과 동일하게 추가하여 DB 서버에 연결 및 API 통신이 가능하였다.

 

도메인 설정
@Table(name = "테이블 명", schema = "스키마 명")

Repository 구현
public AnalyticsRepositoryImpl(@Qualifier("ClickHouseEntityManagerFactory") EntityManager entityManager) {
    super(AppLogLive.class);
    this.entityManager = entityManager;
}

CONFIG 설정1
@Slf4j
@Configuration
@EnableJpaRepositories(
        entityManagerFactoryRef = "ClickHouseEntityManagerFactory",
        transactionManagerRef = "ClickHouseTransactionManager",
        basePackages = "com.패키지.**.repository.clickhousedb"
)
@RequiredArgsConstructor
public class ClickHouseConfig {

    @Bean(name = "ClickHouseDataSource")
    @ConfigurationProperties(prefix="spring.clickhouse-datasource")
    public DataSource ClickHouseDataSource() {
        return DataSourceBuilder.create().build();
    }


    @Bean(name = "ClickHouseEntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean ClickHouseEntityManagerFactory(
            EntityManagerFactoryBuilder builder,
            @Qualifier("ClickHouseDataSource") DataSource ClickHouseDataSource) {
        return builder
                .dataSource(ClickHouseDataSource)
                .packages("com.패키지.**.domain.clickhousedb")
                .persistenceUnit("ClickHouseEntityManager")
                .build();
    }

    @Bean(name = "ClickHouseTransactionManager")
    public PlatformTransactionManager ClickHouseTransactionManager(
            @Qualifier("ClickHouseEntityManagerFactory") EntityManagerFactory ClickHouseEntityManagerFactory) {
        return new JpaTransactionManager(ClickHouseEntityManagerFactory);
    }

}

CONFIG 설정2 QueryDSL 설정
public class ClickHouseQuerydslRepositorySupport extends QuerydslRepositorySupport {

    public ClickHouseQuerydslRepositorySupport(Class<?> domainClass) {
        super(domainClass);
    }

    @Override
    @PersistenceContext(unitName = "ClickHouseEntityManager")
    public void setEntityManager(EntityManager entityManager) {
        super.setEntityManager(entityManager);
    }

}

.yml 설정

clickhouse-datasource:
    driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
    username: default
    password: 비밀번호
    jdbc-url: jdbc:clickhouse://IP:PORT
    database : DB명
```

 

이제 비정규형으로 프론트와 통신할 API 구성이 남았다....