ys memos
Blog

PostgreSQLのPublicationを使った逐次データ取得の実装例


rust

2025/11/07


PostgreSQLのテーブル変更をリアルタイムで検知したい。そんな時、定期的にSELECTするpollingではなく、PostgreSQLのpublicationを購読する方法を試してみた。

Rustのsupabaseのetlライブラリを使えば、データベース側からプッシュで変更通知を受け取れる。pollingと違って無駄なクエリもなく、リアルタイムで変更を検知できるのが強み。

この記事では、実際にRustでpublication subscriberを実装する手順を紹介する。


INSERTされたものを取得する場合、pollingによって新規のレコードをチェックすることも可能である。 しかし、この手法を用いると、「DB負荷」「原理的に入る遅延」「不要なクエリを実行し続ける」といった問題が発生する。

一方、publicationを使うと、以下のようなメリットがある。

  • 変更が発生した時だけ通知が来る
  • データベース側からプッシュしてくれる
  • ほぼリアルタイムで変更を検知

WAL (Write Ahead Log)を直接購読する手法もあるが、これはClientサイドでのフィルタが必要になることから必要な通信以外のデータを受信することになるので、要件によって選定する必要がある。


PostgreSQLのlogical replication機能の一部。テーブルの変更(INSERT/UPDATE/DELETE)を「公開」し、それを「購読」することでリアルタイムにイベントを受け取れる。

通常はPostgreSQL同士のレプリケーションで使われるが、今回はアプリケーション側で購読して変更を処理する用途で使う。

このようなクエリを実行すると publication を作成できる。(詳細なオプションはこちら

CREATE PUBLICATION my_publication FOR TABLE logs;


実行環境の準備。再現性のため、Docker Composeを記しておく。

compose.yml
services:
  postgres:
    image: postgres:18.0-trixie
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: mydb
      # Logical Decoding を有効にする設定
    command:
      -c wal_level=logical
      -c max_replication_slots=4
      -c max_wal_senders=4
      -c log_connections=on
      -c log_disconnections=on
      -c log_statement=all
      -c log_replication_commands=on
      # -c wal_keep_size=64MB
  adminer:
    image: adminer
    container_name: adminer
    ports:
      - "8080:8080"
    environment:
      ADMINER_DEFAULT_SERVER: postgres

そして、TableとPUBLICATIONとSUBSCRIPTIONを準備する。

CREATE TABLE IF NOT EXISTS logs (
  id SERIAL PRIMARY KEY,
  timestamp TIMESTAMPTZ DEFAULT now(),
  level TEXT NOT NULL,
  message TEXT NOT NULL
);

ALTER TABLE logs REPLICA IDENTITY FULL;

DROP PUBLICATION IF EXISTS my_publication;
CREATE PUBLICATION my_publication FOR TABLE logs;

CREATE SUBSCRIPTION my_subscription
    CONNECTION 'host=localhost port=5432 user=user password=pass dbname=mydb'
    PUBLICATION my_publication;

依存関係を追加:

Cargo.toml
[package]
name = "pg-etl"
version = "0.1.0"
edition = "2021"

[dependencies]
etl = { git = "https://github.com/supabase/etl", rev = "76514d7cab5f93d51b48a0eff1bd98ee3d14fb9c" }
localtrace = "0.1.9"
postgres-protocol = "0.6"
tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "*", features = ["with-uuid-0_8"] }

src/main.rs
use etl::{
    config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
    pipeline::Pipeline,
    store::both::memory::MemoryStore,
};

#[derive(Clone)]
struct WorkContainer {}

impl etl::destination::Destination for WorkContainer {
    fn name() -> &'static str {
        "WorkContainer"
    }

    async fn truncate_table(&self, table_id: etl::types::TableId) -> etl::error::EtlResult<()> {
        println!("Truncating table {}", table_id);
        Ok(())
    }

    async fn write_table_rows(
        &self,
        table_id: etl::types::TableId,
        table_rows: Vec<etl::types::TableRow>,
    ) -> etl::error::EtlResult<()> {
        println!("Writing {} rows to table {}", table_rows.len(), table_id);
        Ok(())
    }

    async fn write_events(&self, events: Vec<etl::types::Event>) -> etl::error::EtlResult<()> {
        println!("Writing {} events", events.len());
        for event in &events {
            match event {
                etl::types::Event::Begin(_) => {}
                etl::types::Event::Commit(_) => {}
                etl::types::Event::Insert(insert_event) => {
                    println!("Insert event: {:?}", insert_event);
                }
                etl::types::Event::Update(update_event) => {
                    println!("Update event: {:?}", update_event);
                }
                etl::types::Event::Delete(delete_event) => {
                    println!("Delete event: {:?}", delete_event);
                }
                etl::types::Event::Relation(relation_event) => {
                    println!("Relation event: {:?}", relation_event);
                }
                etl::types::Event::Truncate(truncate_event) => {
                    println!("Truncate event: {:?}", truncate_event);
                }
                etl::types::Event::Unsupported => {}
            }
        }
        Ok(())
    }
}

#[tokio::main]
async fn main() -> localtrace::Result<()> {
    match run().await {
        Ok(_) => (),
        Err(e) => {
            eprintln!("Error: {}", e);
            std::process::exit(1);
        }
    }
    Ok(())
}

async fn run() -> localtrace::Result<()> {
    let pg = PgConnectionConfig {
        host: "localhost".to_string(),
        port: 5432,
        name: "mydb".to_string(),
        username: "user".to_string(),
        password: Some("pass".into()),
        tls: TlsConfig {
            enabled: false,
            trusted_root_certs: String::new(),
        },
    };

    let store = MemoryStore::new();

    let config = PipelineConfig {
        id: 1,
        publication_name: "my_publication".to_string(),
        pg_connection: pg,
        batch: BatchConfig {
            max_size: 1000,
            max_fill_ms: 1,
        },
        table_error_retry_delay_ms: 2000,
        table_error_retry_max_attempts: 5,
        max_table_sync_workers: 4,
    };

    let work = WorkContainer {};

    let mut pipeline = Pipeline::new(config, store, work);
    pipeline.start().await?;
    pipeline.wait().await?;
    Ok(())
}

subscriberを起動

$ cargo run

別のターミナルでテーブルに変更を加えてみる

BEGIN;
INSERT INTO logs (level, message) VALUES ('INFO', 'booting...');
COMMIT;

Rust側でリアルタイムに変更イベントが表示されればOK。


supabase/etlとしては、起動時にすべてスキャンするようになっているようで、初回起動時の挙動は要件に合わせて調整するとよさそう。


Polling方式と比較して、PostgreSQLのpublication/subscriptionを使うことで、効率的かつリアルタイムなデータ変更検知が実現できた。supabase/etlライブラリにより、Rustでも比較的簡単に実装できる。

これ、何がいいかというと、queueとしての活用の可能性が広がる点やoutboxパターンへの活用など、PostgreSQLをより強力なデータプラットフォームとして利用できる点にある。ぜひ試してみてほしい。


関連タグを探す