副本集Oplog同步原理

数据库 waitig 579℃ 百度已收录 0评论

副本集是mongodb的基础组件,是实现高可用, 自动选主, 读写分离以及数据一致性的基础。 比较概括的说, 副本集是将同一份数据保存在不同的节点上面, 这些节点通过一致性的协议(RAFT), 实现数据的同步, 并且选出一个主节点, 该节点对外提供读写服务, 当该主节点发生故障的时候, 自动从剩余的从节点内选出新的主节点。

本文主要针对副本集的整体架构进行分析, 先来看一下架构图:
这里写图片描述

从这里, 我们可以看到, oplog 的整个从Primary 到Second的 sync 过程, 主要是通过SyncTail类来完成的, 其中用到几个重要的类来协助完成: BackgroundSync, OpQueueBatcher, ThreadPool。
这个的流程如下流程图:
这里写图片描述

BackgroundSync 类

该类的实现在: src/mongo/db/repl/bgsync.cpp.
每一个Secondary节点都会启动一个名为rsBackgroundSync的线程, 专门负责从Primary 抓取Oplog, 并且把住区的结果放进buffer里面。每一次就住区
这个类主要负责的工作:

  • 创建从secondary 到Primary的连接类: OplogReader;
  • 更新记录当前抓取的oplog的时间点lastOpTimeFetched等;
  • 创建Fetcher 类来调用Remote Command 抓取oplog;
  • 调用getMore 命令从oplog collection 获得下一批的oplog;

OpQueueBatcher 类

该类是SyncTail的一个子类, 主要是起一个名叫:ReplBatcher 的线程, 不断地把BackgroundSync的到的全部的oplog 集合, 分成一个个的batch进行处理, 每处理掉一个batch, BackgroundSync的buffer就被腾出了相应的空间, 可以继续从远端获得更多的oplog对象。
每个batch 最多是512M或者5000条oplog, 每当batcher 内有新的数据, 就会发返回给SyncTail, ReplBatcher线程继续准备下一个batch的数据。

SyncTail类

每个Secondary节点都有一个名为: rsSync的线程, 它负责得到并且更新本地的oplog。
它会检查本地的oplog状况, 决定是要全量同步oplog, 还是增量同步。
增量同步的代码在: src/mongo/db/repl/sync_tail.cpp。

SyncTail从OpQueueBatcher得到OpQueue 类型的ops, 需要把他们进行replay, 但是, 如果使用当前线程执行的话, 有可能执行的时间很长。
这里, SyncTail 通过ThreadPool的方式, 一次创建16个线程, 并行的replay这些oplog。那么, 这些的oplog是怎么分配到不同的线程里面的? 他是通过oplog的namespace。

void fillWriterVectors(OperationContext* txn,
                       const std::deque<SyncTail::OplogEntry>& ops,
                       std::vector<std::vector<BSONObj>>* writerVectors) {
    const bool supportsDocLocking =
        getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
    const uint32_t numWriters = writerVectors->size();

    Lock::GlobalRead globalReadLock(txn->lockState());

    CachingCappedChecker isCapped;

    for (auto&& op : ops) {
        StringMapTraits::HashedKey hashedNs(op.ns);
        uint32_t hash = hashedNs.hash();

        const char* opType = op.opType.rawData();
        if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, hashedNs)) {
            BSONElement id;
            switch (opType[0]) {
                case 'u':
                    id = op.o2.Obj()["_id"];
                    break;
                case 'd':
                case 'i':
                    id = op.o.Obj()["_id"];
                    break;
            }

            const size_t idHash = BSONElement::Hasher()(id);
            MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
        }

        (*writerVectors)[hash % numWriters].push_back(op.raw);
    }
}

oplog分配到不同的线程以后, 就是replay该线程分配到的oplog。

void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors,
              OldThreadPool* writerPool,
              SyncTail::MultiSyncApplyFunc func,
              SyncTail* sync) {
    TimerHolder timer(&applyBatchStats);
    for (std::vector<std::vector<BSONObj>>::const_iterator it = writerVectors.begin();
         it != writerVectors.end();
         ++it) {
        if (!it->empty()) {
            writerPool->schedule(func, stdx::cref(*it), sync);
        }
    }
}

执行完oplog, 我们需要把oplog更新到本地的oplog 集合里面。

OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) {
    ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();

    OpTime lastOptime;
    MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
        lastOptime = replCoord->getMyLastAppliedOpTime();
        invariant(!ops.empty());
        ScopedTransaction transaction(txn, MODE_IX);
        Lock::DBLock lk(txn->lockState(), "local", MODE_X);

        if (_localOplogCollection == 0) {
            OldClientContext ctx(txn, rsOplogName);

            _localDB = ctx.db();
            verify(_localDB);
            _localOplogCollection = _localDB->getCollection(rsOplogName);
            massert(13389,
                    "local.oplog.rs missing. did you drop it? if so restart server",
                    _localOplogCollection);
        }

        OldClientContext ctx(txn, rsOplogName, _localDB);
        WriteUnitOfWork wunit(txn);

        checkOplogInsert(
            _localOplogCollection->insertDocuments(txn, ops.begin(), ops.end(), false));
        lastOptime =
            fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back()));
        wunit.commit();
    }
    MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns());

    return lastOptime;
}

到这里, 一次oplog的更新就完成了, 循环上面的过程, 我们就可以把primary上面新的oplog更新到secondary节点上。 当primary掉线或者故障, 某个节点从secondary变为primary的时候, BackgroundSync 停止sync oplog, 并且通知Synctail 结束工作。


本文由【waitig】发表在等英博客
本文固定链接:副本集Oplog同步原理
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)