2025/11/07
はじめに
PostgreSQLのテーブル変更をリアルタイムで検知したい。そんな時、定期的にSELECTするpollingではなく、PostgreSQLのpublicationを購読する方法を試してみた。
Rustのsupabaseのetlライブラリを使えば、データベース側からプッシュで変更通知を受け取れる。pollingと違って無駄なクエリもなく、リアルタイムで変更を検知できるのが強み。
この記事では、実際にRustでpublication subscriberを実装する手順を紹介する。
publicationのメリット
INSERTされたものを取得する場合、pollingによって新規のレコードをチェックすることも可能である。 しかし、この手法を用いると、「DB負荷」「原理的に入る遅延」「不要なクエリを実行し続ける」といった問題が発生する。
一方、publicationを使うと、以下のようなメリットがある。
- 変更が発生した時だけ通知が来る
- データベース側からプッシュしてくれる
- ほぼリアルタイムで変更を検知
WAL (Write Ahead Log)を直接購読する手法もあるが、これはClientサイドでのフィルタが必要になることから必要な通信以外のデータを受信することになるので、要件によって選定する必要がある。
PostgreSQL Publicationとは
PostgreSQLのlogical replication機能の一部。テーブルの変更(INSERT/UPDATE/DELETE)を「公開」し、それを「購読」することでリアルタイムにイベントを受け取れる。
通常はPostgreSQL同士のレプリケーションで使われるが、今回はアプリケーション側で購読して変更を処理する用途で使う。
このようなクエリを実行すると publication を作成できる。(詳細なオプションはこちら)
CREATE PUBLICATION my_publication FOR TABLE logs;
実装例
1. PostgreSQL側の準備
実行環境の準備。再現性のため、Docker Composeを記しておく。
services:
postgres:
image: postgres:18.0-trixie
ports:
- "5432:5432"
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
# Logical Decoding を有効にする設定
command:
-c wal_level=logical
-c max_replication_slots=4
-c max_wal_senders=4
-c log_connections=on
-c log_disconnections=on
-c log_statement=all
-c log_replication_commands=on
# -c wal_keep_size=64MB
adminer:
image: adminer
container_name: adminer
ports:
- "8080:8080"
environment:
ADMINER_DEFAULT_SERVER: postgres
そして、TableとPUBLICATIONとSUBSCRIPTIONを準備する。
CREATE TABLE IF NOT EXISTS logs (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT now(),
level TEXT NOT NULL,
message TEXT NOT NULL
);
ALTER TABLE logs REPLICA IDENTITY FULL;
DROP PUBLICATION IF EXISTS my_publication;
CREATE PUBLICATION my_publication FOR TABLE logs;
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=localhost port=5432 user=user password=pass dbname=mydb'
PUBLICATION my_publication;
2. Rustプロジェクトのセットアップ
依存関係を追加:
[package]
name = "pg-etl"
version = "0.1.0"
edition = "2021"
[dependencies]
etl = { git = "https://github.com/supabase/etl", rev = "76514d7cab5f93d51b48a0eff1bd98ee3d14fb9c" }
localtrace = "0.1.9"
postgres-protocol = "0.6"
tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "*", features = ["with-uuid-0_8"] }
3. Subscriberの実装
use etl::{
config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
pipeline::Pipeline,
store::both::memory::MemoryStore,
};
#[derive(Clone)]
struct WorkContainer {}
impl etl::destination::Destination for WorkContainer {
fn name() -> &'static str {
"WorkContainer"
}
async fn truncate_table(&self, table_id: etl::types::TableId) -> etl::error::EtlResult<()> {
println!("Truncating table {}", table_id);
Ok(())
}
async fn write_table_rows(
&self,
table_id: etl::types::TableId,
table_rows: Vec<etl::types::TableRow>,
) -> etl::error::EtlResult<()> {
println!("Writing {} rows to table {}", table_rows.len(), table_id);
Ok(())
}
async fn write_events(&self, events: Vec<etl::types::Event>) -> etl::error::EtlResult<()> {
println!("Writing {} events", events.len());
for event in &events {
match event {
etl::types::Event::Begin(_) => {}
etl::types::Event::Commit(_) => {}
etl::types::Event::Insert(insert_event) => {
println!("Insert event: {:?}", insert_event);
}
etl::types::Event::Update(update_event) => {
println!("Update event: {:?}", update_event);
}
etl::types::Event::Delete(delete_event) => {
println!("Delete event: {:?}", delete_event);
}
etl::types::Event::Relation(relation_event) => {
println!("Relation event: {:?}", relation_event);
}
etl::types::Event::Truncate(truncate_event) => {
println!("Truncate event: {:?}", truncate_event);
}
etl::types::Event::Unsupported => {}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> localtrace::Result<()> {
match run().await {
Ok(_) => (),
Err(e) => {
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
Ok(())
}
async fn run() -> localtrace::Result<()> {
let pg = PgConnectionConfig {
host: "localhost".to_string(),
port: 5432,
name: "mydb".to_string(),
username: "user".to_string(),
password: Some("pass".into()),
tls: TlsConfig {
enabled: false,
trusted_root_certs: String::new(),
},
};
let store = MemoryStore::new();
let config = PipelineConfig {
id: 1,
publication_name: "my_publication".to_string(),
pg_connection: pg,
batch: BatchConfig {
max_size: 1000,
max_fill_ms: 1,
},
table_error_retry_delay_ms: 2000,
table_error_retry_max_attempts: 5,
max_table_sync_workers: 4,
};
let work = WorkContainer {};
let mut pipeline = Pipeline::new(config, store, work);
pipeline.start().await?;
pipeline.wait().await?;
Ok(())
}
4. 動作確認
subscriberを起動
$ cargo run
別のターミナルでテーブルに変更を加えてみる
BEGIN;
INSERT INTO logs (level, message) VALUES ('INFO', 'booting...');
COMMIT;
Rust側でリアルタイムに変更イベントが表示されればOK。
注意点
supabase/etlとしては、起動時にすべてスキャンするようになっているようで、初回起動時の挙動は要件に合わせて調整するとよさそう。
おわりに
Polling方式と比較して、PostgreSQLのpublication/subscriptionを使うことで、効率的かつリアルタイムなデータ変更検知が実現できた。supabase/etlライブラリにより、Rustでも比較的簡単に実装できる。
これ、何がいいかというと、queueとしての活用の可能性が広がる点やoutboxパターンへの活用など、PostgreSQLをより強力なデータプラットフォームとして利用できる点にある。ぜひ試してみてほしい。