patroni.postgresql.slots module

Replication slot handling.

Provides classes for the creation, monitoring, management and synchronisation of PostgreSQL replication slots.

class patroni.postgresql.slots.SlotsAdvanceThread(slots_handler: SlotsHandler)

Bases: Thread

Daemon process :class:Thread object for advancing logical replication slots on replicas.

This ensures that slot advancing queries sent to postgres do not block the main loop.

on_promote() None

Reset state of the daemon.

run() None

Thread main loop entrypoint.

Note

Thread will wait until a sync is scheduled from outside, normally triggered during the HA loop or a wakeup call.

schedule(advance_slots: Dict[str, Dict[str, int]]) Tuple[bool, List[str]]

Trigger a synchronisation of slots.

This is the main entrypoint for Patroni HA loop wakeup call.

Parameters:

advance_slots – dictionary containing slots that need to be advanced

Returns:

tuple of failure status and a list of slots to be copied

sync_slot(cur: Union[cursor, Cursor[Any]], database: str, slot: str, lsn: int) None

Execute a pg_replication_slot_advance query and store success for scheduled synchronisation task.

Parameters:
  • cur – database connection cursor.

  • database – name of the database associated with the slot.

  • slot – name of the slot to be synchronised.

  • lsn – last known LSN position

sync_slots() None

Synchronise slots for all scheduled databases.

sync_slots_in_database(database: str, slots: List[str]) None

Synchronise slots for a single database.

Parameters:
  • database – name of the database.

  • slots – list of slot names to synchronise.

class patroni.postgresql.slots.SlotsHandler(postgresql: Postgresql)

Bases: object

Handler for managing and storing information on replication slots in PostgreSQL.

Variables:
  • pg_replslot_dir – system location path of the PostgreSQL replication slots.

  • _logical_slots_processing_queue – yet to be processed logical replication slots on the primary

check_logical_slots_readiness(cluster: Cluster, replicatefrom: Optional[str]) bool

Determine whether all known logical slots are synchronised from the leader.

  1. Retrieve the current catalog_xmin value for the physical slot from the cluster leader, and

  2. using previously stored list of “unready” logical slots, those which have yet to be checked hence have no stored slot attributes,

  3. store logical slot catalog_xmin when the physical slot catalog_xmin becomes valid.

Parameters:
  • cluster – object containing stateful information for the cluster.

  • replicatefrom – name of the member that should be used to replicate from.

Returns:

False if any issue while checking logical slots readiness, True otherwise.

copy_logical_slots(cluster: Cluster, create_slots: List[str]) None

Create logical replication slots on standby nodes.

Parameters:
  • cluster – object containing stateful information for the cluster.

  • create_slots – list of slot names to copy from the primary.

drop_replication_slot(name: str) Tuple[bool, bool]

Drop a named slot from Postgres.

Parameters:

name – name of the slot to be dropped.

Returns:

a tuple of active and dropped. active is True if the slot is active, dropped is True if the slot was successfully dropped. If the slot was not found return False for both.

get_local_connection_cursor(**kwargs: Any) Iterator[Union[cursor, Cursor[Any]]]

Create a new database connection to local server.

Create a non-blocking connection cursor to avoid the situation where an execution of the query of pg_replication_slot_advance takes longer than the timeout on a HA loop, which could cause a false failure state.

Parameters:

kwargs – Any keyword arguments to pass to psycopg.connect().

Yields:

connection cursor object, note implementation varies depending on version of psycopg.

ignore_replication_slot(cluster: Cluster, name: str) bool

Check if slot name should not be managed by Patroni.

Parameters:
  • cluster – cluster state information object.

  • name – name of the slot to ignore

Returns:

True if slot name matches any slot specified in ignore_slots configuration, otherwise will pass through and return result of CitusHandler.ignore_replication_slot().

load_replication_slots() None

Query replication slot information from the database and store it for processing by other tasks.

Note

Only supported from PostgreSQL version 9.4 onwards.

Store replication slot name, type, plugin, database and datoid. If PostgreSQL version is 10 or newer also store catalog_xmin and confirmed_flush_lsn.

When using logical slots, store information separately for slot synchronisation on replica nodes.

on_promote() None

Entry point from HA cycle used when a standby node is to be promoted to primary.

Note

If logical replication slot synchronisation is enabled then slot advancement will be triggered. If any logical slots that were copied are yet to be confirmed as ready a warning message will be logged.

process_permanent_slots(slots: List[Dict[str, Any]]) Dict[str, int]

Process replication slot information from the host and prepare information used in subsequent cluster tasks.

Note

This methods solves three problems.

The cluster_info_query from :class:Postgresql is executed every HA loop and returns information about all replication slots that exists on the current host.

Based on this information perform the following actions:

  1. For the primary we want to expose to DCS permanent logical slots, therefore build (and return) a dict that maps permanent logical slot names to confirmed_flush_lsn.

  2. detect if one of the previously known permanent slots is missing and schedule resync.

  3. Update the local cache with the fresh catalog_xmin and confirmed_flush_lsn for every known slot.

This info is used when performing the check of logical slot readiness on standbys.

Parameters:

slots – replication slot information that exists on the current host.

Returns:

dictionary of logical slot names to confirmed_flush_lsn.

schedule(value: Optional[bool] = None) None

Schedule the loading of slot information from the database.

Parameters:

value – the optional value can be used to unschedule if set to False or force it to be True. If it is omitted the value will be True if this PostgreSQL node supports slot replication.

schedule_advance_slots(slots: Dict[str, Dict[str, int]]) Tuple[bool, List[str]]

Wrapper to ensure slots advance daemon thread is started if not already.

Parameters:

slots – dictionary containing slot information.

Returns:

tuple with the result of the scheduling of slot advancement: failed and list of slots to copy.

sync_replication_slots(cluster: Cluster, nofailover: bool, replicatefrom: Optional[str] = None, paused: bool = False) List[str]

During the HA loop read, check and alter replication slots found in the cluster.

Read physical and logical slots found on the primary, then compare to those configured in the DCS. Drop any slots that do not match those required by configuration and are not configured as permanent. Create any missing physical slots. If we are the leader then logical slots too, otherwise if logical slots are known and active create them on replica nodes.

Parameters:
  • cluster – object containing stateful information for the cluster.

  • nofailoverTrue if this node has been tagged to not be a failover candidate.

  • replicatefrom – the tag containing the node to replicate from.

  • pausedTrue if the cluster is in maintenance mode.

Returns:

list of logical replication slots names that should be copied from the primary.

patroni.postgresql.slots.compare_slots(s1: Dict[str, Any], s2: Dict[str, Any], dbid: str = 'database') bool

Compare 2 replication slot objects for equality.

..note ::

If the first argument is a physical replication slot then only the type of the second slot is compared. If the first argument is another type (e.g. logical) then dbid and plugin are compared.

Parameters:
  • s1 – First slot dictionary to be compared.

  • s2 – Second slot dictionary to be compared.

  • dbid – Optional attribute to be compared when comparing logical replication slots.

Returns:

True if the slot type of s1 and s2 is matches, and the type of s1 is physical, OR the types match AND the dbid and plugin attributes are equal.