Logical and Physical Replication
Replication allows a client to receive a continuous stream of updates from a PostgreSQL database, providing a near-realtime view of all changes as they occur. While this feature was originally developed to keep PostgreSQL standby replicas in sync with a primary, it can be used by arbitrary client applications.
Replication can be used anywhere where a constant change feed of database changes is required; for example, an external application can be notified in near-realtime of any changes that occurred in a particular database table. This can be useful for external auditing purposes, for replicating certain data somewhere else, for implement the outbox pattern (see Additional resources below), and various other usages.
Npgsql provides a 1st-class API for writing .NET replication clients, detailed below. While PostgreSQL supports both logical and physical replication, in the majority of cases .NET applications will want to use logical replication.
Logical replication
Logical replication is a means to stream messages generated by PostgreSQL logical decoding plugins to a client.
The default implementation that is used by PostgreSQL itself to perform logical server to server replication is the Logical Streaming Replication Protocol which uses the pgoutput
plugin, but PostgreSQL supports streaming messages generated by other plugins too and Npgsql supports receiving those.
General setup
To set up logical replication, follow the quick setup instructions in the PostgreSQL docs (note that a SUBSCRIPTION isn't required since the client isn't PostgreSQL):
Enable logical replication in your postgresql.conf
file:
wal_level = logical
Set up a replication user in your pg_hba.conf
file:
host replication repuser 0.0.0.0/0 md5
The user repuser
must exist in your cluster and either be a superuser or have the replication attribute set. See CREATE ROLE docs.
Logical Streaming Replication Protocol (pgoutput plugin)
The modern, recommended way to perform logical replication was introduced in PostgreSQL 10 - see the PostgreSQL documentation. This method, using the built-in pgoutput replication plugin, streams efficient, binary messages to represent database updates such as INSERT, UPDATE and DELETE (see the full list); Npgsql exposes these messages as an IAsyncEnumerable
which can easily be enumerated and consumed.
Create a publication, which defines the group of tables in the database you wish to replicate:
CREATE PUBLICATION blog_pub FOR TABLE blogs;
Create a replication slot, which will hold the state of the replication stream:
SELECT * FROM pg_create_logical_replication_slot('blog_slot', 'pgoutput');
If your application goes down, the slot persistently records the last data streamed to it, and allows resuming the application at the point where it left off.
At this point, everything is ready to start replicating! Create this simple .NET program with Npgsql:
await using var conn = new LogicalReplicationConnection("<connection_string>");
await conn.Open();
var slot = new PgOutputReplicationSlot("blog_slot");
// The following will loop until the cancellation token is triggered, and will print message types coming from PostgreSQL:
var cancellationTokenSource = new CancellationTokenSource();
await foreach (var message in conn.StartReplication(
slot, new PgOutputReplicationOptions("blog_pub", 1), cancellationTokenSource.Token))
{
Console.WriteLine($"Received message type: {message.GetType().Name}");
// Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually
// so that Npgsql can inform the server which WAL files can be removed/recycled.
conn.SetReplicationStatus(message.WalEnd);
}
For example, if you insert a new row into your blogs
table, you should see the following output:
Received message type: BeginMessage
Received message type: RelationMessage
Received message type: InsertMessage
Received message type: CommitMessage
Warning
Npgsql internally recycles the message instances it hands out. It is an error to use a message received from StartReplication
once the next message has been read.
The above was just a minimal "getting started" guide for logical replication - many additional configuration options and modes exist as well. Consult the PostgreSQL documentation for more details.
Test decoding (test_decoding plugin)
An additional logical replication plugin which Npgsql supports is test_decoding
. This plugin outputs textual representations of events, which are less efficient and need to be parsed; it is meant for testing that replication works rather than for building robust production apps. However, it can still be useful in some scenarios, especially in older PostgreSQL versions where pgoutput
wasn't yet introduced.
To use test_decoding
, first create a logical replication slot with test_decoding
as the plugin type.
SELECT * FROM pg_create_logical_replication_slot('blog_slot', 'test_decoding');
After that use the following:
await using var conn = new LogicalReplicationConnection("Host=localhost;Username=test;Password=test");
await conn.Open();
var slot = new TestDecodingReplicationSlot("blog_slot");
// The following will loop until the cancellation token is triggered, and will print message types coming from PostgreSQL:
var cancellationTokenSource = new CancellationTokenSource();
await foreach (var message in conn.StartReplication(slot, cancellationTokenSource.Token))
{
Console.WriteLine($"Message: {message.Data}");
// Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually
// so that Npgsql can inform the server which WAL files can be removed/recycled.
conn.SetReplicationStatus(message.WalEnd);
}
Inserting a row will produce the following string messages:
Message: BEGIN 230413
Message: table public.blogs: INSERT: id[integer]:2 name[text]:'blog1'
Message: COMMIT 230413
Warning
Npgsql internally recycles the message instances it hands out. It is an error to use a message received from StartReplication
once the next message has been read.
Physical replication
Finally, PostgreSQL also supports physical replication, which streams raw block data rather than logical events on changes. While useful for synchronizing PostgreSQL replicas and supported by Npgsql, this mode is unlikely to be useful for a typical .NET program client.
Additional resources
- See here for a great post on implementing the outbox pattern via PostgreSQL logical replication. The outbox pattern guarantees delivery of an event from the database to e.g. a queue.