@JunQiu
2018-10-14T12:52:25.000000Z
字数 4556
阅读 3349
mongodb summary_2018/10
1、涉及协调器和节点多次通信,通信时间较长。2、因此整个事务的持续时间也变长,对资源的占有时间也会变长,会产生较长的等待。

## 假设一个场景从账户A转钱到账户B,在关系型数据库中,我们可以在一个事务中完成,但在小于4.0的mongo中,我们需要使用2PC完成。## 使用2PC来完成这个事务// 初始化A/B两个账户db.accounts.insert([{ _id: "A", balance: 1000, pendingTransactions: [] },{ _id: "B", balance: 1000, pendingTransactions: [] }])// 初始化一个协调者来协调所有参与的文档,在mongo中我们使用一个transactions集合,当需要完成一个事务时,插入一条数据,包含如下字段:source 和 destination 字段,与 accounts 集合里的 _id 字段相关联的。value 字段,指定影响 source 账户和 destination 账户 balance 的传输量,state 字段,反应传输的当前状态。state 字段可以具有值 initial , pending , applied , done , canceling 和 canceled 。lastModified 字段,反映了最后修改的日期。db.transactions.insert({ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() })// 从协调器中取出需要完成的事务,即状态为initialvar t = db.transactions.findOne( { state: "initial" } )// 处理取出来的事务1、Update transaction state to pendingdb.transactions.update({ _id: t._id, state: "initial" },{$set: { state: "pending" },$currentDate: { lastModified: true }})2、Apply the transaction to both accounts.(并没有对资源保持持续占有,只保证不会重复操作,若要持续占有可以判定:pendingTransactions: []时)db.accounts.update({ _id: t.source, pendingTransactions: { $ne: t._id } },{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } })db.accounts.update({ _id: t.destination, pendingTransactions: { $ne: t._id } },{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } })3、Update transaction state to applieddb.transactions.update({ _id: t._id, state: "pending" },{$set: { state: "applied" },$currentDate: { lastModified: true }})4、Update both accounts’ list of pending transactions.db.accounts.update({ _id: t.source, pendingTransactions: t._id },{ $pull: { pendingTransactions: t._id } })db.accounts.update({ _id: t.destination, pendingTransactions: t._id },{ $pull: { pendingTransactions: t._id } })// 在协调器中标记事务完成db.transactions.update({ _id: t._id, state: "applied" },{$set: { state: "done" },$currentDate: { lastModified: true }})
## Applied 状态的事务这种情况,最好不要回滚事务,而是创建一个相同的事务来抵消该事务## Pending 状态的事务1、Update transaction state to canceling.2、Undo the transaction on both accounts.3、Update transaction state to canceled.Tips:基本和进行事务的方式相同,进行2PC## 其它注意点1、在2PC的过程中,每一步都需要判定是否成功,根据具体情况对事务进行回滚/重试2、如何保证在整个过程中,保证对资源的持续占有,其实上述过程中并没有保证对资源的持续占有,只保证了避免重复操作。
// 看一下一个其它人写的例子//app.js(async function() {// 连接DBconst { MongoClient } = require('mongodb');const uri = 'mongodb://localhost:1301/dbfour';const client = await MongoClient.connect(uri, { useNewUrlParser: true });const db = client.db();await db.dropDatabase();console.log('(1) 首先 删库 dbfour, then 跑路\n')// 插入两个账户并充值一些金额await db.collection('Account').insertMany([{ name: 'A', balance: 50 },{ name: 'B', balance: 10 }]);console.log('(2) 执行 insertMany, A 充值 50, B 充值 10\n')await transfer('A', 'B', 10); // 成功console.log('(3) 然后 A 给 B 转账 10\n')try {// 余额不足 转账失败console.log('(4) A 再次转账给 B 50\n')await transfer('A', 'B', 50);} catch (error) {//error.message; // "Insufficient funds: 40"console.log(error.message)console.log('\n(5) A 余额不够啊,所以这次转账操作不成功')}// 转账逻辑async function transfer(from, to, amount) {const session = client.startSession();session.startTransaction();try {const opts = { session, returnOriginal: false };const A = await db.collection('Account').findOneAndUpdate({ name: from }, { $inc: { balance: -amount } }, opts).then(res => res.value);if (A.balance < 0) {// 如果 A 的余额不足,转账失败 中止事务// `session.abortTransaction()` 会撤销上面的 `findOneAndUpdate()` 操作throw new Error('Insufficient funds: ' + (A.balance + amount));}const B = await db.collection('Account').findOneAndUpdate({ name: to }, { $inc: { balance: amount } }, opts).then(res => res.value);await session.commitTransaction();session.endSession();return { from: A, to: B };} catch (error) {// 如果错误发生,中止全部事务并回退到修改之前await session.abortTransaction();session.endSession();throw error; //使其调用者 catch error}}})()Tips:When using the drivers, you must pass the session to each operation in the transaction.
