混乱世界中的稳定:Postgres 如何使事务原子化

原子性( “ACID” 特性)声明,对于一系列的数据库操作,要么所有操作一起提交,要么全部回滚;不允许中间状态存在。对于那些需要去适应混乱的现实世界的代码来说,简直是天赐良物。

那些改变数据并继续恶化下去的故障将被取代,这些改变会被恢复。当你在处理着百万级请求的时候,可能会因为间歇性的问题导致连接的断断续续或者出现一些其它的突发情况,从而导致一些不便,但不会打乱你的数据。

众所周知 Postgres 的实现中提供了强大的事务语义化。虽然我已经用了好几年,但是有些东西我从来没有真正理解。Postgres 有着稳定出色的工作表现,让我安心把它当成一个黑盒子 -- 惊人地好用,但是内部的机制却是不为人知的。

这篇文章是探索 Postgres 如何保持它的事务及原子性提交,和一些可以让我们深入理解其内部机制的关键概念[1]

管理并发访问

假如你建立了一个简易数据库,这个数据库读写硬盘上的 CSV 文件。当只有一个客户端发起请求时,它会打开文件,读取信息并写入信息。一切运行非常完美,有一天,你决定强化你的数据库,给它加入更加复杂的新特性 - 多客户端支持!

不幸地是,当两个客户端同时试图去操作数据的时候,新功能立即被出现的问题所困扰。当一个 CSV 文件正在被一个客户端读取,修改,和写入数据的时候,如果另一个客户端也尝试去做同样的事情,这个时候就会发生冲突。

客户端之间的资源争夺会导致数据丢失。这是并发访问出现的常见问题。可以通过引入并发控制来解决。曾经有过许多原始解决方案。例如我们让来访者带上独占锁去读写文件,或者我们可以强制让所有访问都需要通过流控制点,从而实现同一时间只能运行其一。但是这些方法不仅运行缓慢,而且由于不能纵向扩展,从而使数据库不能完全支持 ACID 特性。现代数据库有一个完美的解决办法,MVCC (多版本并行控制系统)。

在 MVCC,语句在事务里面执行。它会创建一个新版本,而不会直接覆写数据。有需求的客户端仍然可以使用原始的数据,但新的数据会被隐藏起来直到事务被提交。这样客户端之间就不存在直接争夺的情况,数据也不再面临重写而且可以安全地被保存。

事务开始执行的时候,数据库会生成一个此刻数据库状态的快照。在数据库的每一个事务都会以串行的顺序执行,通过一个全局锁来保证每次只有一个事务能够提交或者中止操作。快照完美体现了两个事务之间的数据库状态。

为了避免被删除或隐藏的行数据不断地堆积,数据库最后将经过一个 vacuum 程序(或在某些情况下,带有歧义查询的 “microvacuums” 队列)来清理淘汰数据,但是只能在数据不再被其它快照使用的时候才能进行。

让我们看下 Postgres 如何使用 MVCC 管理并发的情况。

事务、元组和快照

这是 Postgres 用于实现事务的结构(来自 proc.c):

  1. typedef struct PGXACT
  2. {
  3. TransactionId xid; /* 当前由程序执行的顶级事务 ID
  4. * 如果正在执行且 ID 被赋值;
  5. * 否则是无效事务 ID */
  6.  
  7. TransactionId xmin; /* 正在运行最小的 XID,
  8. * 除了 LAZY VACUUM:
  9. * vacuum 不移除因 xid >= xmin
  10. * 而被删除的元组 */
  11.  
  12. ...
  13. } PGXACT;

事务是以

  1. xid
(或 “xact” ID)来标记。这是 Postgres 的优化,当事务开始更改数据的时候,Postgres 会赋值一个
  1. xid
给它。因为只有那个时候,其它程序才需要开始追踪它的改变。只读操作可以直接执行,不需要分配
  1. xid

当这个事务开始运行的时候,

  1. xmin
马上被设置为运行事务中
  1. xid
最小的值。Vacuum 进程计算数据的最低边界,使它们保持
  1. xmin
是所有事务中的最小值。

生命周期感知的元组

在 Postgres,行数据常常与元组有关。当 Postgres 使用像 B 树通用的查找结构去快速检索信息,索引并没有存储一个元素的完整数据或其中任意的可见信息。相反,他们存储可以从物理存储器(也称为“堆”)检索特定行的

  1. tid
(元组 ID)。Postgres 通过
  1. tid
作为起始点,对堆进行扫描直到找到一个能满足当前快照的可见元组。

这是 Postgres 实现的堆元组(不是索引元组),以及它头信息的结构( 和 htup_details.h):

  1. typedef struct HeapTupleData
  2. {
  3. uint32 t_len; /* *t_data 的长度 */
  4. ItemPointerData t_self; /* SelfItemPointer */
  5. Oid t_tableOid; /* 唯一标识一个表 */
  6. HeapTupleHeader t_data; /* -> 元组的头部及数据 */
  7. } HeapTupleData;
  8.  
  9. /* 相关的 HeapTupleData */
  10. struct HeapTupleHeaderData
  11. {
  12. HeapTupleFields t_heap;
  13.  
  14. ...
  15. }
  16.  
  17. /* 相关的 HeapTupleHeaderData */
  18. typedef struct HeapTupleFields
  19. {
  20. TransactionId t_xmin; /* 插入 xact ID */
  21. TransactionId t_xmax; /* 删除或隐藏 xact ID */
  22.  
  23. ...
  24. } HeapTupleFields;

像事务一样,元组也会追踪它自己的

  1. xmin
,但是这只在特定的元组情况下,例如它被记录为第一个事务,其中元组变可见。(即创建它的那个)。它还追踪
  1. xmax
作为最后的一个的事务,其中元组是可见的(即删除它的那个)[2]

可以使用

  1. xmin
  1. xmax
来追踪堆元组的生存期。虽然
  1. xmin
  1. xmax
是内部概念,但是他们可以显示任何 Postgres 表上被隐藏的列。通过名字显示地选择它们:

  1. # SELECT *, xmin, xmax FROM names;
  2.  
  3. id | name | xmin | xmax
  4. ----+----------+-------+-------
  5. 1 | Hyperion | 27926 | 27928
  6. 2 | Endymion | 27927 | 0

快照:xmin,xmax,和 xip

这是快照的实现结构 ():

  1. typedef struct SnapshotData {
  2. /*
  3. * 以下字段仅用于 MVCC 快照,和在特定的快照。
  4. * (但 xmin 和 xmax 专门用于 HeapTupleSatisfiesDirty)
  5. *
  6. *
  7. * 一个 MVCC 快照 永远不可能见到 XIDs >= xmax 的事务。
  8. * 除了那些列表中的 snapshot,它会看到时间长的 XIDs 的内容。
  9. * 对于大多数的元组,xmin 被存储起来是个优化的操作,这样避免去搜索 XID 数组。
  10. *
  11. */
  12. TransactionId xmin;
  13. /* id小于xmin的所有事务更改在当前快照中可见 */
  14. TransactionId xmax;
  15. /* id大于xmax的所有事务更改在当前快照中可见 */
  16.  
  17. /*
  18. * 对于普通的 MVCC 快照,它包含了程序中所有的 xact IDs
  19. * 除非在它是空的情况下被使用。
  20. * 对于历史 MVCC 的快照, 这就是刚好相反, 即它包含了在 xmin 和 xmax 中已提交的事务。
  21. *
  22. *
  23. * 注意: 所有在 xip[] 的 ids 都满足 xmin <= xip[i] < xmax
  24. */
  25. TransactionId * xip;
  26. /* 所有正在运行的事务的id列表 */
  27. uint32 xcnt;
  28. /* 正在运行的事务的计数 */
  29.  
  30. ...
  31. }

快照的

  1. xmin
计算方式和计算事务的相同(即在正在运行的事务中,
  1. xid
最低的事务),但用途却不一样。
  1. xmin
是数据可见的最低边界。元组是被
  1. xid < xmin
条件的事务所创建,对快照可见。

同时也有定义为

  1. xmax
的变量,它被设置为最后一次提交事务的
  1. xid
+ 1。
  1. xmax
是数据可见的上限;
  1. xid >= xmax
的事务对快照是不可见的。

最后,当快照被创建,它会定义一个

  1. *xip
作为存储所有事务
  1. xid
的数组。
  1. *xip
存在是因为即使
  1. xmin
被设定为可见边界,可能有一些已经提交的事务的
  1. xid
大于
  1. xmin
,但也存在
  1. xmin
也大于一些处于执行阶段的事务的
  1. xid

我们希望任何

  1. xid > xmin
的事务提交结果都是可见的,但事实上它们被隐藏了。快照创建的时候,
  1. *xip
存储的有效事务清单可以帮助我们辨别各事务身份。

事务是对数据库进行操作,快照是为了抓捕数据库一瞬间的信息。

开启事务

当你执行

  1. BEGIN
语句,尽管 Postgres 对于一些常用的操作会有相应优化,但它会尽可能地推迟更多开销比较大的操作。举个例子,一个新的事务在开始修改数据之前,我们不会给它分配 xid。这样做可以减少在其他地方追踪它的花费。

新的事务也不会立即使用快照。当事务运行第一个查询,

  1. exec_simple_query
(在 postgres.c)才会将其入栈。甚至一个简单的
  1. SELECT 1;
语句也会触发:

  1. static void
  2. exec_simple_query(const char *query_string)
  3. {
  4. ...
  5.  
  6. /*
  7. * 如果解析/计划需要,则设置一个快照
  8. */
  9. if (analyze_requires_snapshot(parsetree))
  10. {
  11. PushActiveSnapshot(GetTransactionSnapshot());
  12. snapshot_set = true;
  13. }
  14.  
  15. ...
  16. }

创建新快照是程序真正开始加载的起始点。这是

  1. GetSnapshotData
(在 procarray.c):

  1. Snapshot
  2. GetSnapshotData(Snapshot snapshot)
  3. {
  4. /* xmax 总是等于 latestCompletedXid + 1 */
  5. xmax = ShmemVariableCache->latestCompletedXid;
  6. Assert(TransactionIdIsNormal(xmax));
  7. TransactionIdAdvance(xmax);
  8.  
  9. ...
  10.  
  11. snapshot->xmax = xmax;
  12. }

这个函数做了很多初始化的工作,但像我们谈到的,它最主要的工作就是设置快照的

  1. xmin
  1. xmax
,和
  1. *xip
。其中最简单的就是设置
  1. xmax
,它可以从 Postmaster 管理的共享存储器中检索出来。每个提交的事务都会通知 Postmaster,和
  1. latestCompletedXid
将会被更新,如果
  1. xid
高于当前
  1. xid
的值(稍后将详细介绍)。

需要注意的是,最后的

  1. xid
自增是由函数实行的。因为在 Postgres 里面,事务的 IDs 是被允许包装,所以并不是单纯的自增那么简单。一个事务 ID 是被定义为一个无符号32位整数(来自 c.h):

  1. typedef uint32 TransactionId;

尽管

  1. xid
是看情况来分配的(上文提过,读取数据时是不需要它的),但是系统大量的吞吐量很容易就达到32位的边界,所以系统需要根据需求将
  1. xid
序列进行“重置”。这是由一些预处理器处理的(在 transam.h):

  1. #define InvalidTransactionId ((TransactionId) 0)
  2. #define BootstrapTransactionId ((TransactionId) 1)
  3. #define FrozenTransactionId ((TransactionId) 2)
  4. #define FirstNormalTransactionId ((TransactionId) 3)
  5.  
  6. ...
  7.  
  8. /* 提前一个事务ID变量, 直接操作 */
  9. #define TransactionIdAdvance(dest) \
  10. do { \
  11. (dest)++; \
  12. if ((dest) < FirstNormalTransactionId) \
  13. (dest) = FirstNormalTransactionId; \
  14. } while(0)

最初的几个 ID 被保留作为特殊标识符,所以我们一般跳过它,从

  1. 3
开始。

回到

  1. GetSnapshotData
里,通过迭代所有正在执行的事务我们可以得到
  1. xmin
  1. xip
(回顾快照中它们的作用):

  1. /*
  2. * 循环 procArray 查看 xid,xmin,和 subxids。
  3. * 目的是得到所有 active xids,找到最低的 xmin,和试着去记录 subxids。
  4. *
  5. */
  6. for (index = 0; index < numProcs; index++)
  7. {
  8. volatile PGXACT *pgxact = &allPgXact[pgprocno];
  9. TransactionId xid;
  10. xid = pgxact->xmin; /* fetch just once */
  11.  
  12. /*
  13. * 如果事务中没有被赋值的 XID,我们可以跳过;
  14. * 对于 sub-XIDs 也同理。如果 XID >= xmax,我们也可以跳过它;
  15. * 这样的事务被认为(任何 sub-XIDs 都将 >= xmax)。
  16. *
  17. */
  18. if (!TransactionIdIsNormal(xid)
  19. || !NormalTransactionIdPrecedes(xid, xmax))
  20. continue;
  21.  
  22. if (NormalTransactionIdPrecedes(xid, xmin))
  23. xmin = xid;
  24.  
  25. /* 添加 XID 到快照中。 */
  26. snapshot->xip[count++] = xid;
  27.  
  28. ...
  29. }
  30.  
  31. ...
  32.  
  33. snapshot->xmin = xmin;

提交事务

事务通过 CommitTransaction (在 xact.c)被提交。函数非常复杂,下面代码是函数比较重要部分:

  1. static void
  2. CommitTransaction(void)
  3. {
  4. ...
  5.  
  6. /*
  7. * 我们需要去 pg_xact 标记 XIDs 来表示已提交。作为
  8. * 已稳定提交的标记。
  9. */
  10. latestXid = RecordTransactionCommit();
  11.  
  12. /*
  13. * 让其他知道没有其他事务在程序中。
  14. * 需要注意的是,这个操作必须在释放锁之前
  15. * 和记录事务提交之前完成。
  16. */
  17. ProcArrayEndTransaction(MyProc, latestXid);
  18.  
  19. ...
  20. }

持久性和 WAL

Postgres 是完全围绕着持久性的概念设计的。这样即使像在外力摧毁或功率损耗的情况下,已提交的事务也保持原有的状态。像许多优秀的系统,Postgres 使用预写式日志( WAL,或 “xlog”)去实现稳定。所有的更改被记录进磁盘,甚至像宕机这种事情,Postgres 会搜寻 WAL,然后重新恢复没有写进数据文件的更改记录。

从上面

  1. RecordTransactionCommit
的片段代码中,将事务的状态更改到 WAL:

  1. static TransactionId
  2. RecordTransactionCommit(void)
  3. {
  4. bool markXidCommitted = TransactionIdIsValid(xid);
  5.  
  6. /*
  7. * 如果目前我们还没有指派 XID,那我们就不能再指派,也不能
  8. * 写入提交记录
  9. */
  10. if (!markXidCommitted)
  11. {
  12. ...
  13. } else {
  14. XactLogCommitRecord(xactStopTimestamp,
  15. nchildren, children, nrels, rels,
  16. nmsgs, invalMessages,
  17. RelcacheInitFileInval, forceSyncCommit,
  18. MyXactFlags,
  19. InvalidTransactionId /* plain commit */ );
  20.  
  21. ....
  22. }
  23.  
  24. if ((wrote_xlog && markXidCommitted &&
  25. synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
  26. forceSyncCommit || nrels > 0)
  27. {
  28. XLogFlush(XactLastRecEnd);
  29.  
  30. /*
  31. * 如果我们写入一个有关提交的记录,那么可能更新 CLOG
  32. */
  33. if (markXidCommitted)
  34. TransactionIdCommitTree(xid, nchildren, children);
  35. }
  36.  
  37. ...
  38. }

commit log

伴随着 WAL,Postgres 也有一个commit log(或者叫 “clog” 和 “pg_xact”)。这个记录都保存事务提交痕迹,无论最后事务提交与否。上面的

  1. TransactionIdCommitTree
实现了这个功能 - 首先会尝试把一系列的信息写入 WAL,然后
  1. TransactionIdCommitTree
会在 commit log 中改为“已提交”。

虽然 commit log 也被称为“日志”,但实际上它是一个提交状态的位图,在共享内存和在磁盘上的进行拆分。
在现代编程中很少出现这么简约的例子,事务的状态可以仅使用二个字节来记录,我们能每字节存储四个事务,或者每个标准 8k 页面存储 32758。

  1. #define TRANSACTION_STATUS_IN_PROGRESS 0x00
  2. #define TRANSACTION_STATUS_COMMITTED 0x01
  3. #define TRANSACTION_STATUS_ABORTED 0x02
  4. #define TRANSACTION_STATUS_SUB_COMMITTED 0x03
  5.  
  6. #define CLOG_BITS_PER_XACT 2
  7. #define CLOG_XACTS_PER_BYTE 4
  8. #define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
  1. /*
  2. * 将提交日志中的事务目录的最终状态记录到单个页面上所有目录上。
  3. * 原子只出现在这个页面。
  4. *
  5. * 其他的 API 与 TransactionIdSetTreeStatus() 相同。
  6. */
  7. static void
  8. TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
  9. TransactionId *subxids, XidStatus status,
  10. XLogRecPtr lsn, int pageno)
  11. {
  12. ...
  13.  
  14. LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
  15.  
  16. /*
  17. * 无论什么情况,都设置事务的 id。
  18. *
  19. * 如果我们在写的时候在这个页面上更新超过一个 xid,
  20. * 我们可能发现有些位转到了磁盘,有些则不会。
  21. * 如果我们在更新页面的时候提交了一个破坏原子性的最高级 xid,
  22. * 那么在我们标记最高级的提交之前我们先提交 subxids。
  23. *
  24. */
  25. if (TransactionIdIsValid(xid))
  26. {
  27. /* Subtransactions first, if needed ... */
  28. if (status == TRANSACTION_STATUS_COMMITTED)
  29. {
  30. for (i = 0; i < nsubxids; i++)
  31. {
  32. Assert(ClogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
  33. TransactionIdSetStatusBit(subxids[i],
  34. TRANSACTION_STATUS_SUB_COMMITTED,
  35. lsn, slotno);
  36. }
  37. }
  38.  
  39. /* ... 然后是主事务 */
  40. TransactionIdSetStatusBit(xid, status, lsn, slotno);
  41. }
  42.  
  43. ...
  44.  
  45. LWLockRelease(CLogControlLock);
  46. }
  1. void
  2. ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
  3. {
  4. /*
  5. * 当清除我们的 XID时,我们必须锁住 ProcArrayLock
  6. * 这样当别人设置快照的时候,运行的事务已全被清空了。
  7. * 看讨论
  8. * src/backend/access/transam/README.
  9. */
  10. if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
  11. {
  12. ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
  13. LWLockRelease(ProcArrayLock);
  14. }
  15.  
  16. ...
  17. }
  18.  
  19. static inline void
  20. ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
  21. TransactionId latestXid)
  22. {
  23. ...
  24.  
  25. /* 也是在持锁的情况下提前全局 latestCompletedXid */
  26. if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
  27. latestXid))
  28. ShmemVariableCache->latestCompletedXid = latestXid;
  29. }
  1. static void
  2. heapgettup(HeapScanDesc scan,
  3. ScanDirection dir,
  4. int nkeys,
  5. ScanKey key)
  6. {
  7. ...
  8.  
  9. /*
  10. * 预先扫描直到找到符合的元组
  11. *
  12. */
  13. lpp = PageGetItemId(dp, lineoff);
  14. for (;;)
  15. {
  16. /*
  17. * if current tuple qualifies, return it.
  18. */
  19. valid = HeapTupleSatisfiesVisibility(tuple,
  20. snapshot,
  21. scan->rs_cbuf);
  22.  
  23. if (valid)
  24. {
  25. return;
  26. }
  27.  
  28. ++lpp; /* 这个页面的itemId数组向前移动一个索引 */
  29. ++lineoff;
  30. }
  31.  
  32. ...
  33. }
  1. bool
  2. HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
  3. Buffer buffer)
  4. {
  5. ...
  6.  
  7. else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
  8. SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
  9. HeapTupleHeaderGetRawXmin(tuple));
  10.  
  11. ...
  12.  
  13. /* xmax transaction committed */
  14.  
  15. return false;
  16. }
  1. bool
  2. /* true if given transaction committed */
  3. TransactionIdDidCommit(TransactionId transactionId) {
  4. XidStatus xidstatus;
  5.  
  6. xidstatus = TransactionLogFetch(transactionId);
  7.  
  8. /*
  9. * 如果该事务标记提交,那就提交
  10. */
  11. if (xidstatus == TRANSACTION_STATUS_COMMITTED) return true;
  12.  
  13. ...
  14. }
  1. SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
  2. HeapTupleHeaderGetRawXmin(tuple));
  1. BEGIN;
  2.  
  3. SELECT * FROM users
  4.  
  5. INSERT INTO users (email) VALUES ('brandur@example.com')
  6. RETURNING *;
  7.  
  8. COMMIT;