[Aurora PostgreSQL] CDC 연결을 위한 Logical replication 기능 검토

2025. 11. 22. 15:42·공부/DATABASE

개요

개발의 CDC 기능 요청으로 인하여 CDC 기능과 논리복제 영향도, Aurora Postgresql에서 Debezium 사용의 특징 대하여 확인해보았습니다.

검토 사항

동작 방식

 

CDC Debezium은 PostgreSQL의 Logical Mode의 WAL Log를 읽어서 가져갑니다.

논리복제 : Logical Replication은 PostgreSQL의 변경된 데이터(INSERT, UPDATE, DELETE) 를 논리적 레벨(레코드 단위) 로 추출하여 다른 DB 노드에 전송하는 복제 방식

구성 요소 역할
Publication (게시) 원본 DB에서 복제할 테이블 및 변경 이벤트를 정의
Subscription (구독) 대상 DB에서 어떤 게시를 구독할지 설정
Replication Slot (슬롯) 원본 서버가 구독자가 어디까지 읽었는지 추적
WAL Logical Decoding WAL 로그를 논리적 형식으로 해석 (플러그인에 의해)

 

Aurora PostgreSQL의 경우 Logical replication 사용시 Aurora Storage에 추가적인 WAL LOG 파일을 저장하는데 이로 인한 성능 저하를 막기 위해 wal log cache 영역을 추가하였습니다.

 

참조 : https://docs.aws.amazon.com/ko_kr/AmazonRDS/latest/AuroraUserGuide/AuroraPostgreSQL.Replication.Logical-monitoring.html#AuroraPostgreSQL.Replication.Logical-write-through-cache

 

제약사항

유저 권한

rds_replication role 권한 (논리적 슬롯을 관리하고 논리적 슬롯을 사용하여 데이터를 스트리밍할 수 있는 권한)이 필요합니다.

GRANT rds_replication TO user;

참조

https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-permissions


블루그린 메이저 업그레이드

블루 DB 클러스터는 논리적 소스(게시자) 또는 복제본(구독자)일 수 없습니다.

그린 클러스터 생성 시 Slot이 있는 경우 해당 에러 발생 : Replica creation is canceled due to external replication.

게시만 있는 상태에서 블루 그린 클러스터 생성 이후 Slot 생성 가능.

SwitchOver 작업 시에는 Slot 있을 시 에러 발생 : Stop replication from an external database before you switch over.

 

자체적으로 논리 복제, Clone 기능을 사용하여 블루 그린 방식의 메이저 버전 업그레이드 가능.

https://docs.aws.amazon.com/ko_kr/AmazonRDS/latest/AuroraUserGuide/AuroraPostgreSQL.MajorVersionUpgrade.html

https://tech.instacart.com/creating-a-logical-replica-from-a-snapshot-in-rds-postgres-886d9d2c7343

https://www.instacart.com/company/how-its-made/zero-downtime-postgresql-cutovers/

 

메이저 버전 업그레이드 시나리오 및 테스트

자체 블루그린 구성 전체 흐름

  1. GREEN 클러스터 구축을 위한 게시, 슬롯 생성
  2. CLONE 기능을 사용하여 GREEN 클러스터 생성 (LSN 확인 필요)
  3. GREEN 클러스터에 게시, 슬롯 삭제
  4. GREEN 클러스터 버전 업그레이드 
  5. BLUE → GREEN 데이터 동기화를 위해 구독 설정
  6. 서비스 WRITE 중단
  7. CDC 커넥터 삭제
  8. DB 네이밍 스위칭
  9. 신규 DB에 게시 및 SLOT 생성
  10. 시퀀스 값 재설정
  11. 서비스 WRITE 재연동
  12. CDC 연동
--초기 CDC 운영
CREATE USER repl_user WITH LOGIN NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOREPLICATION CONNECTION LIMIT -1 PASSWORD '패스워드';
GRANT rds_replication TO repl_user;
GRANT INSERT, SELECT, DELETE, UPDATE on all tables in schema public to repl_user;
GRANT SELECT, USAGE, UPDATE on all SEQUENCES in schema public to repl_user;
CREATE PUBLICATION repl_pub FOR TABLE 테이블들;
SELECT pg_create_logical_replication_slot('repl_slot', 'pgoutput');

--메이저 버전 업그레이드
--BLUE 
--업그레이드 용 논리복제 생성
CREATE PUBLICATION blue_green_pub FOR ALL TABLES;
SELECT pg_create_logical_replication_slot('blue_green_slot', 'pgoutput');

--GREEN 인스턴스 생성 (Clone 사용)
--GREEN 사전 작업
--LSN 확인
SELECT aurora_volume_logical_start_lsn();
  aurora_volume_logical_start_lsn 
---------------------------------
 0/BD55A848
(1 row)
 
--SLOT 및 게시 제거
SELECT pg_drop_replication_slot('blue_green_slot');
SELECT pg_drop_replication_slot('repl_slot');
DROP PUBLICATION blue_green_pub;
DROP PUBLICATION  repl_pub;

--UPGRADE 진행 이후
--구독 연결
CREATE SUBSCRIPTION blue_green_sub 
CONNECTION 'postgres://계정명:패스워드@endpoint/databasename' PUBLICATION blue_green_pub 
WITH (copy_data = false, create_slot = false, enabled = false, connect = true, slot_name = 'blue_green_slot');

--LSN 설정을 위한 REPLICATION ORIGIN 확인
SELECT * FROM pg_replication_origin;
 roident |  roname   
---------+-----------
       1 | pg_188427

--LSN 설정 (ReplicationOrigin, START_LSN 순서)
SELECT pg_replication_origin_advance('pg_188427', '0/BD55A848');

--구독 실행
ALTER SUBSCRIPTION blue_green_sub ENABLE;

--BLUE에서 복제 연동 확인 (active 가 t)
SELECT * FROM pg_replication_slots;
    slot_name    |  plugin  | slot_type | datoid |   database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase 
-----------------+----------+-----------+--------+--------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
 blue_green_slot | pgoutput | logical   |  80522 | databasename | f         | t      |       9556 |      |     49248217 | 0/BD60C7A0  | 0/BD60E380          | reserved   |               | f
      repl_slot  | pgoutput | logical   |  80522 | databasename | f         | t      |       3791 |      |     49247724 | 0/BD5FAE18  | 0/BD5FE148          | reserved   |               | f

--DB CutOver 진행
--서비스 WRITE 중단
--CDC 커넥터 삭제
--DB 네이밍 스위칭
--GREEN DB에 게시 및 SLOT 생성
CREATE PUBLICATION repl_pub FOR TABLE 테이블들;
SELECT pg_create_logical_replication_slot('repl_slot', 'pgoutput');
--GREEN에서 시퀀스 값 수정
select setval('시퀀스 명',시퀀스 값)
--서비스 WRITE 재연동
--CDC 연동 (SNAPSHOT MODE NEVER 로 설정)
--정상 동작 확인

 

 

참조

https://docs.aws.amazon.com/ko_kr/AmazonRDS/latest/AuroraUserGuide/blue-green-deployments-considerations.html#blue-green-deployments-limitations-postgres-logical

https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#upgrading-postgresql

 


영향도

성능 저하

데이터 입력 조회 테스트

INSERT(단건 입력) + SELECT (LIMIT 50 조회)

 

변경 파라미터

  • rds.logical_replication  = 1
  • max_replication_slots = 20
  • max_wal_senders = 10
  • max_logical_replication_workers = 4

종합

 

  복제 연결 없음 별도 인스턴스에서 구독
(게시 역할만 진행)
동일 인스턴스에서 구독
(게시 + 구독 역할)
CONNECTION  OPERATION TPS AVG_LATENCY STDDEV _LATENCY CPU TPS AVG_LATENCY STDDEV_LATNECY CPU TPS AVG_LATENCY STDDEV_LATENCY CPU
1 INSERT 26 33.493 ms 0.779 ms 7 28 34.759 ms 0.741 ms 7 27 36.604 ms 0.812 ms 7
1 SELECT 29 38.383 ms 0.778 ms 29 34.142 ms 0.774 ms 29 34.370 ms 0.784 ms
25 INSERT 646 36.917 ms 1.811 ms 48 621 37.322 ms 2.268 ms 51 596 38.358 ms 2.527 ms 53
25 SELECT 668 34.414 ms 2.679 ms 655 34.549 ms 2.599 ms 677 34.214 ms 2.473 ms
50 INSERT 1227 38.177 ms 3.128 ms  70 1181 39.914 ms 3.777 ms 70 1183 40.161 ms 3.679 ms 75
50 SELECT 1356 34.915 ms 3.570 ms 1303 35.006 ms 3.755 ms 1314 35.098 ms 3.899 ms

 

 

→ CPU, Latency의 변화가 미미함

 

관련 AWS 케이스 오픈 정리

논리복제 사용에 따른 primary 인스턴스 영향도 등에 대하여 확인

Aurora PostgreSQL 는 일반적인 WAL 대신 자체적인 로그 레코드 시스템을 사용합니다. 이는 Aurora의 분산 스토리지 시스템에 최적화된 방식입니다 
일반적인 PostgreSQL 에서는 데이터베이스 충돌시 복구를 위해 WAL을 사용하기 때문에 필수적으로 WAL이 기록됩니다. 그러나 Aurora PostgreSQL 는 스토리지가 분리된 특유의 아키텍처를 활용하여 자체 복구 메커니즘을 사용합니다. 
따라서 일반 PostgreSQL과 달리 복구를 위해 WAL 파일을 유지할 필요가 크지 않기 때문에 WAL 파일을 별도로 생성하지 않는 것이 기본값입니다.
다만 논리적 복제 사용시 변경 데이터의 추적이 필요하기 때문에, 논리적 복제가 활성화 되면 WAL 데이터를 스토리지에 기록하기 시작합니다. TransactionLogsDiskUsage 지표를 통해 WAL 데이터의 사용량을 확인하실 수 있을 것으로 생각됩니다. 
기존 Aurora 스토리지로 인한 로그 레코드를 유지하면서 추가적으로 논리적 복제 기능을 위한 wal 로그를 Aurora 스토리지에 저장합니다.  
논리적 복제 활성화시 고려해야 할 Primary 인스턴스에서의 영향도는 CPU 및 I/O 자원 소모 및 디스크 사용량 증가가 있습니다.


1.
인지하고 계시겠지만 PostgreSQL 의 기본 동작 방식은 클라이언트가 데이터를 변경하면 변경 사항이 WAL 버퍼에 기록되고, COMMIT 등으로 인해 WAL 파일로 저장됩니다. 
Aurora PostgreSQL 에서는 COMMIT 수행시 WAL 데이터가 스토리지에 기록됩니다. 다만 WAL 데이터를 스토리지에 기록하는 과정이 기존에 COMMIT 발생시 일반 로그 레코드를 스토리지에 기록하는 과정과 분리되어 있다고 명시되어 있지 않습니다. 
이는 저희 쪽에서 확인 가능한 내부 문서에도 명시되어 있지 않기에 더 자세한 내용을 안내드리기 어려우며 그 이상의 정보는 결국 외부와 공유 불가능하기에 확인이 불가한 것으로 생각됩니다. 
그러나 Aurora PostgreSQL의 동작 방식 등 확인 가능한 정보를 고려해 보았을 때 로그 레코드와 WAL 데이터가 같이 처리될 것으로 생각됩니다.  
그렇기에 WAL 데이터의 저장으로 인해 커밋 지연이 발생한다기 보다, WAL 데이터의 생성으로 인한 CPU 및 I/O 자원 소모 등의 영향도가 있을 것으로 생각됩니다. 


2.
인지하고 계시겠지만 Aurora 데이터는 SSD를 사용하는 단일 가상 볼륨인 클러스터 볼륨에 저장됩니다.
기본적으로 로그 레코드는 이 단일 스토리지 볼륨에 저장되며, WAL 데이터도 동일하게 스토리지에 기록된다 안내되어 있습니다.
WAL 데이터 (트랜잭션 로그) 를 위해 특정 디스크 영역이 따로 할당되는 것은 아니며, 별도로 존재한다고 명시되어 있는 내용은 확인되지 않았습니다. 


3.
앞서 안내드린 것처럼 논리적 복제 활성화시 고려해야 할 Primary 인스턴스에서의 영향도는 CPU 및 I/O 자원 소모 및 디스크 사용량 증가가 있습니다.

 

 

복제 지연 영향도

데이터가 대량으로 쌓이면서 이를 읽어가는게 늦어질 경우 영향도 및 모니터링 방안

 

WAL Size 모니터링 지표

OldestReplicationSlotLag 수신된 WAL(Write-Ahead Log) 데이터를 기준으로 가장 지연된 복제본의 지연 크기.
TransactionLogsDiskUsage Aurora PostgreSQL DB 인스턴스에서 트랜잭션 로그가 차지하는 디스크 공간입니다.
Aurora PostgreSQL에서는 트랜잭션 로그가 아닌 로그 레코드를 사용합니다. 트랜잭션 로그를 사용하지 않는 경우 이 지표의 값은 -1입니다.

WAL크기 제어 파라미터

max_slot_wal_keep_size (MB) 복제 슬롯은 실패한 것으로 표시되고 디스크에서 WAL이 이 정도의 공간을 차지하는 경우 삭제 또는 재활용을 위해 세그먼트가 해제됩니다.

 

 

구독을 중지하고 데이터 입력 발생 → OldestReplicationSlotLag,지표가 상승하는 것을 확인할 수 있습니다.

Aurora 스토리지의 인스턴스간 복제 지연에는 영향이 없는 것으로 보입니다.

 

 

복제 지연이 된 상태에서 복제 연동을 하여 성능 테스트를 진행 (지연이 없던 상황과 유사한 성능 지표)

CONNECTION  OPERATION TPS AVG_LATENCY STDDEV_LATENCY CPU
25 INSERT 631 37.523 ms 2.070 ms 48
25 SELECT 696 34.335 ms 2.559 ms

 

복제 슬롯 관련 상태 확인 쿼리

SELECT * FROM pg_replication_slots;
    slot_name     |  plugin  | slot_type | datoid |  database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase 
------------------+----------+-----------+--------+-------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------
 logical_test_sub | pgoutput | logical   |  20532 | ACCOUNTDB   | f         | f      |            |      |     49518234 | 1/A00DCE8   | 1/A00DE20           | reserved   |               | f


SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag, active FROM pg_replication_slots ;
    slot_name     | replicationslotlag | active 
------------------+--------------------+--------
 logical_test_sub | 293 MB             | f

 

max_slot_wal_keep_size 사이즈를 초과하였을 때 현상

databasename=> show max_slot_wal_keep_size ;
 max_slot_wal_keep_size 
------------------------
 50MB
(1 row)
--부하 발생 
--동기화 상태 확인
databasename=> SELECT now() AS CURRENT_TIME, slot_name, active, active_pid, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),
confirmed_flush_lsn)) AS diff_size, pg_wal_lsn_diff(pg_current_wal_lsn(),
confirmed_flush_lsn) AS diff_bytes FROM pg_replication_slots WHERE slot_type = 'logical';
         current_time          |   slot_name    | active | active_pid | diff_size | diff_bytes 
-------------------------------+----------------+--------+------------+-----------+------------
 2026-01-06 06:57:30.596541+00 |      repl_slot | t      |       5301 | 20 MB     |   20900944
 2026-01-06 06:57:30.596541+00 | maxsize_slot   | f      |            | 38 MB     |   39844144
(2 rows)

--정상인 상태
databasename=> select * from pg_replication_slots ;                                                                             
   slot_name    |  plugin  | slot_type | datoid |   database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting 
----------------+----------+-----------+--------+--------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
      repl_slot | pgoutput | logical   |  80522 | databasename | f         | t      |       5301 |      |     50429467 | 4/E351D50   | 4/E699508           | reserved   |       5733480 | f         | f
 maxsize_slot   | pgoutput | logical   |  80522 | databasename | f         | f      |            |      |     50425374 | 4/D4887D0   | 4/D488828           | reserved   |       5733480 | f         | f
(2 rows)

databasename=> SELECT now() AS CURRENT_TIME, slot_name, active, active_pid, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),
confirmed_flush_lsn)) AS diff_size, pg_wal_lsn_diff(pg_current_wal_lsn(),
confirmed_flush_lsn) AS diff_bytes FROM pg_replication_slots WHERE slot_type = 'logical';
         current_time          |   slot_name    | active | active_pid | diff_size | diff_bytes 
-------------------------------+----------------+--------+------------+-----------+------------
 2026-01-06 06:57:46.900697+00 |      repl_slot | t      |       5301 | 30 MB     |   31403848
 2026-01-06 06:57:46.900697+00 | maxsize_slot   | f      |            | 48 MB     |   50347048


--safe_wal_size이 음수로 변경되며 unreserved 상태로 변경
databasename=> select * from pg_replication_slots ;
   slot_name    |  plugin  | slot_type | datoid |   database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting 
----------------+----------+-----------+--------+--------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
      repl_slot | pgoutput | logical   |  80522 | databasename | f         | t      |       5301 |      |     50429467 | 4/E351D50   | 4/E699508           | unreserved |      -4771496 | f         | f
 maxsize_slot   | pgoutput | logical   |  80522 | databasename | f         | f      |            |      |     50425374 | 4/D4887D0   | 4/D488828           | unreserved |      -4771496 | f         | f

--repl_slot 은 정상적으로 동기화가 진행되고 있음
databasename=> SELECT now() AS CURRENT_TIME, slot_name, active, active_pid, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),
confirmed_flush_lsn)) AS diff_size, pg_wal_lsn_diff(pg_current_wal_lsn(),
confirmed_flush_lsn) AS diff_bytes FROM pg_replication_slots WHERE slot_type = 'logical';
         current_time          |   slot_name    | active | active_pid | diff_size | diff_bytes 
-------------------------------+----------------+--------+------------+-----------+------------
 2026-01-06 06:58:35.083865+00 |      repl_slot | t      |       5301 | 13 kB     |      13808
 2026-01-06 06:58:35.083865+00 | maxsize_slot   | f      |            | 48 MB     |   50359200

--LOST 발생
databasename=> select * from pg_replication_slots ;
   slot_name    |  plugin  | slot_type | datoid |   database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting 
----------------+----------+-----------+--------+--------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------
      repl_slot | pgoutput | logical   |  80522 | databasename | f         | t      |       5301 |      |     50429467 | 4/E351D50   | 4/1048BDD8          | unreserved |      -4856152 | f         | f
 maxsize_slot   | pgoutput | logical   |  80522 | databasename | f         | f      |            |      |     50425374 | 4/D4887D0   | 4/D488828           | lost       |               | f         | f



databasename=> select *, pg_size_pretty(safe_wal_size) from pg_replication_slots ;
   slot_name    |  plugin  | slot_type | datoid |   database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting | pg_size_pretty 
----------------+----------+-----------+--------+--------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------+----------------
      repl_slot | pgoutput | logical   |  80522 | databasename | f         | t      |       5301 |      |     50429467 | 4/E351D50   | 4/1048BDD8          | unreserved |      -6209632 | f         | f           | -6064 kB
(1 row)

--정상화
databasename=> select *, pg_size_pretty(safe_wal_size) from pg_replication_slots ;
   slot_name    |  plugin  | slot_type | datoid |   database   | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | conflicting | pg_size_pretty 
----------------+----------+-----------+--------+--------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------+-----------+-------------+----------------
      repl_slot | pgoutput | logical   |  80522 | databasename | f         | t      |       5301 |      |     50429934 | 4/1048D6A8  | 4/105E9B38          | reserved   |      60888960 | f         | f           | 58 MB
(1 row)

 

wal_status

  • reserved : 정상적인 상태
  • extended : max_wal_size를 초과하였으나 max_slot_wal_keep_size 에 의해 wal을 유지하고 있는 상태
  • unreserved : max_slot_wal_keep_size이 0이상으로 설정되어 있을 때 슬롯의 복제가 뒤처져서 WAL을 더 이상 유지하지 않는 상태
  • lost : WAL이 제거되어 SLOT을 사용할 수 없는 상태

※ WAL SIZE 관련 파라미터

NAME DESC 기본 값 Aurora 기본 값 파라미터 변경 가능 여부
max_wal_size 이 값에 도달하면 체크포인트가 트리거됩니다 (전체 pg_wal 디렉토리의 사이즈) 1GB 1GB X
min_wal_size 이 값 이하로 WAL 디스크 사용량이 떨어지지 않도록 이전 WAL 파일을 재사용(recycle)합니다.
→ 해당 값으로 인하여 TransactionLogsDiskUsage 가 512MB 전후를 유지합니다.
80MB 512MB X
wal_keep_size pg_wal 디렉터리에 최소한으로 보관할 WAL 파일의 총 크기입니다. 0 (비활성화) 0 X
wal_segment_size wal 물리 파일의 개별 크기 16MB 64MB X
max_slot_wal_keep_size 각 복제 슬롯이 보관할 수 있는 WAL 파일의 최대 크기를 제한합니다. -1 -1 O

 

DB FAIL OVER 발생 시 영향도

FAIL OVER 이후 기능 확인 및 복제 재연결 영향도

별도 작업 없이 Slot에 기록된 restart_lsn 부터 진행 됩니다.

 

트러블 슈팅

failover 이후 커넥션이 정상화되는 시간이 1~30분으로 매번 차이가 심하여 해당 부분을 일정화하기 위하여 테스트를 진행하였습니다.

fail over 이후 기존 primary에 커넥터가 붙으면 wal_level이 replica로 되어 커넥터가 복구 불가능한 오류로 커넥터를 중단 (ava.sql.SQLException: Postgres server wal_level property must be 'logical' but is: 'replica')

PRIMARY 중단 → 커넥션 예외 발생 → 커넥션 연결 재시도 → 기존 Primary에 붙음 → 커넥터 중단

MSK Connector의 내부적인 재시작 프로세스가 존재하며 일반적으로 5~15분마다 커넥션 상태릃 확인합니다.

재연결 딜레이 (slot.retry.delay.ms)를 1분으로 설정할 경우 (1분 내로 페일오버가 된다는 가정) 페일오버 이후 1분 이내로 데이터가 정상 수집 됨


큰 사이즈의 데이터 타입, 부하에 따른 영향도

건당 4kb 데이터 입력시 별도 차이 확인 안됨


DDL 쿼리 사용시

DDL 쿼리를 사용할 경우 Failover 상황과 마찬가지로 커넥터 연결이 에러가 발생한 후 별도 작업 없이 재시작이 진행됩니다. (컬럼 추가, 삭제, 컬럼 타입 변경,인덱스 생성)

rdb -> debezium connector (source)-> kafka -> debezium snowflake connector => (sink)  (이하 snowflake 테이블)

해당 케이스는 Kafka에 Json 형식으로 데이터를 저장한 후 Sink 구문에서는 컬럼명을 지정하여 PK 기반으로 Merge 구문으로 작성되어있어서 신규 컬럼의 데이터가 Snowflake에 적용되지 않은 상태로 데이터가 이관되었습니다.

또한 기존 값을 변경하는 (default 값 추가) 작업 등은 Sink 되지 않으며 이는 Snowflake에서 작업이 필요합니다

즉 원본 RDB에 DDL 작업이 있을 경우 Snowflake와 Sink 영역의 코드 변경 작업이 병행이 필요합니다.

 


신규 복제 구성 

  1. 초기 데이터를 동기화 하는 작업이 스냅샷 모드에 따라 진행이 됩니다.
  2. 스냅샷 방식을 사용하지 않을 경우 복제 생성으로 인하여 초기 DB에 발생하는 작업은 없습니다. 다만 과거 데이터를 커스텀 작업이 별도로 필요합니다.
  3. 스냅샷 방식은 쿼리 커스텀이 가능하며 기본 설정은 SELECT * FROM TABLE 로 처리되고 내부적으로 COPY 구문을 사용합니다.
  4. 기존 테이블 이관시 아래와 같이 작업을 진행할 경우 누락되는 데이터 없이 신규 복제가 구성 가능합니다.
    게시 생성 → 복제 Slot 생성 → 데이터 이관 → 복제 Slot에 커넥터 연결 (snapshot.mode 를 no_data 로 설정 필요)
    ※ snapshot.mode no_data  :  커넥터는 스냅샷을 생성하지 않습니다. Kafka 오프셋 토픽에 이전에 저장된 LSN이 있는 경우, 커넥터는 해당 위치에서 변경 사항 스트리밍을 계속합니다. LSN이 저장되어 있지 않으면, 커넥터는 서버에 PostgreSQL 논리적 복제 슬롯이 생성된 지점부터 변경 사항 스트리밍을 시작합니다. 모든 관련 데이터가 WAL에 반영되어 있다는 것을 확신하는 경우에만 이 스냅샷 모드를 사용하십시오.

Debezium의 Incremental Snapshot 기능

특징

pk를 청크 단위로 나누어 데이터를 이관하는 시스템 (기본 값 1024, 청크 단위는 조절 가능하지만 각 조회간 delay 설정은 지원 없음)

스냅샷 생성간 변경 데이터에 대한 스트리밍 데이터 캡처를 병렬로 진행함(DML에 대한 LOCK 없음)

증분 스냅샷은 READ 결과를 버퍼 영역으로 먼저 보낸 뒤 스트리밍 데이터(WAL LOG)를 사용하여 충돌이 있는지 체크한 뒤 카프카로 보내는 과정을 거침 (충돌이 없으면 READ한 값, 충돌이 있으면 최신 변경 값)

 

구성 방법

별도 추가적인 구성은 필요 없으나 incremental snapshot 을 사용하기 위한 테이블과 해당 테이블에 debezium 계정의 read, write 권한, publication 에 추가 필요.

                                           Table "public.debezium_signal"
 Column |          Type           | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
--------+-------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
 id     | character varying(42)   |           | not null |         | extended |             |              | 
 type   | character varying(32)   |           | not null |         | extended |             |              | 
 data   | character varying(2048) |           |          |         | extended |             |              | 
Indexes:
    "debezium_signal_pkey" PRIMARY KEY, btree (id)
Publications:
    "repl_pub"
Access method: heap


--아래와 같이 데이터 입력하여 트리거 가능
--filter 에 따라 조회 방식 설정
databasename=> select * from debezium_signal  order by 3 limit 1;
    id    |       type       |                                                                                      data                                                                                       
----------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ad-hoc-1 | execute-snapshot | {"data-collections": ["public.table"], "type": "incremental", "additional-conditions": [{"data-collection": "public.table", "filter": "sn>0"}]}
(1 row)

debezium_signal=> select query, calls from pg_stat_statements where query like '%table%';
                                                                                                                             query                                                                                                                             | calls 
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------
 INSERT INTO table (컬럼들) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) RETURNING sn |   487
 SELECT * FROM "public"."table" WHERE (sn>$1) ORDER BY "sn" LIMIT $2                                                                                                                                                         |     1
 SELECT * FROM "public"."table" WHERE (sn>$1) ORDER BY "sn" DESC LIMIT $2                                                                                                                                                    |     1
 UPDATE public.table SET column = $1 WHERE sn = $2                                                                                                                                                                                     |     2
 SELECT * FROM "public"."table" WHERE "sn" > $1 AND "sn" <= $2 AND (sn>$3) ORDER BY "sn" LIMIT $4                                                                                                    |   430

SELECT * FROM "public"."table" WHERE "sn" > $1 AND "sn" <= $2 AND (sn>$3) ORDER BY "sn" LIMIT $4 구문이 호출되어 데이터를 읽어가고 LIMIT은 DEBEZIUM CONFIG에서 설정한 incremental.snapshot.chunk.size 값으로 호출됨

table 테이블을 50,000,000 건 생성한 상태에서 테스트 진행.
데이터 약 SEQ 기준 10,000건 이관 되었을 때 아래와 같이 쿼리 실행

delete from table where sn = 1000;
update table set column = 'minjae_test' where sn = 1001;

update table set column = 'minjae_test' where sn = 4000001;
update table set column = 'minjae_test' where sn = 4000002;
update table set column = 'minjae_test' where sn = 4000003;
update table set column = 'minjae_test' where sn = 3000001;
update table set column = 'minjae_test' where sn = 3000002;
update table set column = 'minjae_test' where sn = 3000003;

delete from table where sn = 4000000;
delete from table where sn = 4000004;
delete from table where sn = 4000005;
delete from table where sn = 4000006;
delete from table where sn = 3000000;
delete from table where sn = 3000004;
delete from table where sn = 3000005;
delete from table where sn = 3000006;

update table set column = 'minjae_test' where sn = 40000000;

--Target DB와 동일한 형태의 데이터 일관성이 유지되는 것 확인

 

주의사항

WAL LOG 에는 STATEMENTS 기반이 아닌 ROW 레벨의 데이터가 적재되고 해당 부분을 디코딩하여 LOGICAL REPLICATION 으로 넘어갑니다.

즉 INSERT는 AFTER 값, UPDATE는 AFTER 값 (REPLICA IDENTITY FULL 설정시 BEFORE도 함께 표시) DELETE는 BEFORE 값이 저장되는데 이 부분이 변경된 값이나 새로 생성된 값이 아닌 해당 TUPLE 의 전체 내용이 저장됩니다.

이로 인하여 네트워크 트래픽이 증가하고 KAFKA 쪽 디스크 사이즈에 영향이 될 수 있음. 특히 컬럼 사이즈가 큰 경우 (JSON, TEXT 등) 에는 주의가 필요할 것으로 보입니다.

EX )

 

OPERATION BEFORE AFTER
INSERT   {
  "columna": "test=",
  "columnb": "test==",
  "columnc": null,
  "updated_at": "2025-07-24T20:03:54.000000Z",
  "columnd": "testtest",
  "columne": "testtest"
}
UPDATE   {
  "columna ": "test=",
  "columnb ": "test==",
  "columnc ": "minjae_test",
  "updated_at": "2025-07-24T20:03:54.000000Z",
  "columnd": "testtest",
  "columne": "testtest"
}
DELETE {
  "columna ": "test=",
  "columnb ": "test==",
  "columnc ": "minjae_test",
  "updated_at": "2025-07-24T20:03:54.000000Z",
  "columnd": "testtest",
  "columne": "testtest"
}
 

 

 

REPLICA IDENTITY 가 DEFAULT로 되어있는 경우 TOAST 테이블에 저장된 큰 사이즈의 컬럼의 경우 데이터 입력 또는 해당 컬럼의 데이터 업데이트에만 WAL LOG에서 확인이 됩니다.

FULL로 설정할 경우 항상 큰 사이즈 컬럼의 데이터도 포함되어 생성됩니다.

EX)

OPERATION AFTER
초기 입력 {
    "before": null,
    "after": {
        "sn": 50977713,
         "columna": "test=",
         "columnb": "test==",
          "columnc": null,
          "updated_at": "2025-07-24T20:03:54.000000Z",
          "columnd": "testtest",
          "columne": "testtest"
          "big_col": "테스트데이터입니다테스트데이터입니다..."
    },

    "op": "c",
다른 컬럼 업데이트 {
    "before": null,
    "after": {
       "sn": 50977713,
         "columna": "test=",
         "columnb": "test==",
          "columnc": null,
          "updated_at": "2025-07-24T20:03:54.000000Z",
          "columnd": "testtest",
          "columne": "testtest"
        "big_col": "__debezium_unavailable_value"
}

    "op": "u",
큰 사이즈 컬럼 업데이트   {
    "before": null,
     "after": {
       "sn": 50977713,
         "columna": "test=",
         "columnb": "test==",
          "columnc": null,
          "updated_at": "2025-07-24T20:03:54.000000Z",
          "columnd": "testtest",
          "columne": "testtest"
        "big_col": "테스트데이터입니다...test"
    },

    "op": "u",

 

 

'공부 > DATABASE' 카테고리의 다른 글

[Aurora PostgreSQL] Tuple_fetched, Tuple_returned  (0) 2025.12.20
[Aurora PostgreSQL] HOT  (0) 2025.12.20
[ElastiCache] Auth 와 TLS  (0) 2025.11.22
[Aurora Postgresql] VACUUM 정리  (0) 2025.10.08
[Aurora PostgreSQL] 성능 개선 도우미(PI), pg_stat_statements 지표  (0) 2025.10.08
'공부/DATABASE' 카테고리의 다른 글
  • [Aurora PostgreSQL] Tuple_fetched, Tuple_returned
  • [Aurora PostgreSQL] HOT
  • [ElastiCache] Auth 와 TLS
  • [Aurora Postgresql] VACUUM 정리
무는빼주세요
무는빼주세요
내 머리를 믿지 말자
  • 무는빼주세요
    공부, 기록
    무는빼주세요
  • 전체
    오늘
    어제
  • 링크

    • 링크드인 (LinkedIn)
    • 분류 전체보기 (258)
      • 일상 (0)
      • 코딩 (77)
      • 공부 (180)
        • DATABASE (128)
        • 도커,쿠버네티스 (1)
        • 소소한 개발 (38)
        • 클라우드 영역 (1)
        • CS 영역 (11)
      • 포럼 (0)
  • 최근 글

  • 인기 글

  • hELLO· Designed By정상우.v4.10.5
무는빼주세요
[Aurora PostgreSQL] CDC 연결을 위한 Logical replication 기능 검토
상단으로

티스토리툴바