现在的位置: 首页 > 综合 > 正文

mongodb源码分析(二十五)mongos writeback

2014年02月14日 ⁄ 综合 ⁄ 共 9308字 ⁄ 字号 评论关闭

        这里的writeback也许可以翻译成回写,是指发生如下情况,来自mongos对mongod的数据请求,但是请求时发现版本不对了(发生了chunk的迁移)那么这里的请求将得不到响应,这里的请求需要以某种方式回到mongos,然后再次发往正确的mongod,这就是所谓的writeback.下面直接来看代码.先来看一份简化了的插入操作代码.

void receivedInsert(Message& m, CurOp& op) {
    if ( handlePossibleShardedMessage( m , 0 ) )//这里判断插入的消息数据是否因为chunk的迁移而不应该在该服务上执行了
        return;
    checkAndInsert(ns, first);
}
inline bool handlePossibleShardedMessage( Message &m, DbResponse * dbresponse ) {
    if( !shardingState.enabled() ) //只有在开启了shard时才有意义
        return false;
    return _handlePossibleShardedMessage(m, dbresponse);
}
    bool _handlePossibleShardedMessage( Message &m, DbResponse* dbresponse ) {
        int op = m.operation();
        if ( op < 2000 || op >= 3000|| op == dbGetMore )
            return false;
        DbMessage d(m);
        const char *ns = d.getns();
        string errmsg;
        // We don't care about the version here, since we're returning it later in the writeback
        ConfigVersion received, wanted;//判断ns对应版本是否变化了,没有变化返回true,则正常执行操作
        if ( shardVersionOk( ns , errmsg, received, wanted ) ) {
            return false;
        }
        //下面已经是因为版本改变之类的信息,使得操作不能继续了
        bool getsAResponse = doesOpGetAResponse( op );//查询操作
        if( getsAResponse ){//这里查询操作,但是chunk版本已改变,所以返回错误消息
            BufBuilder b( 32768 );
            b.skip( sizeof( QueryResult ) );
            {
                BSONObjBuilder bob;
                bob.append( "$err", errmsg );
                bob.append( "ns", ns );
                wanted.addToBSON( bob, "vWanted" );
                received.addToBSON( bob, "vReceived" );
                BSONObj obj = bob.obj();
                b.appendBuf( obj.objdata() , obj.objsize() );
            }
            QueryResult *qr = (QueryResult*)b.buf();
            qr->_resultFlags() = ResultFlag_ErrSet | ResultFlag_ShardConfigStale;
            qr->len = b.len();
            qr->setOperation( opReply );
            qr->cursorId = 0;
            qr->startingFrom = 0;
            qr->nReturned = 1;
            b.decouple();
            Message * resp = new Message();
            resp->setData( qr , true );
            dbresponse->response = resp;
            dbresponse->responseTo = m.header()->id;
            return true;
        }
        OID writebackID;//下面是修改操作,先将这些操作保存起来
        writebackID.initSequential();
        const OID& clientID = ShardedConnectionInfo::get(false)->getID();
        BSONObjBuilder b;
        b.appendBool( "writeBack" , true );
        b.append( "ns" , ns );
        b.append( "id" , writebackID );
        b.append( "connectionId" , cc().getConnectionId() );
        b.append( "instanceIdent" , prettyHostName() );
        wanted.addToBSON( b );
        received.addToBSON( b, "yourVersion" );
        b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
        // Don't register the writeback until immediately before we queue it -
        // after this line, mongos will wait for an hour if we don't queue correctly
        lastError.getSafe()->writeback( writebackID );
        writeBackManager.queueWriteBack( clientID.str() , b.obj() );//操作保持到本地queue中
        return true;
    }

继续这里的shardVersionOk函数.

    bool shardVersionOk( const string& ns , string& errmsg, ConfigVersion& received, ConfigVersion& wanted ) {
        if ( ! shardingState.enabled() )
            return true;
        if ( ! isMasterNs( ns.c_str() ) )  {
            // right now connections to secondaries aren't versioned at all
            return true;
        }
        ShardedConnectionInfo* info = ShardedConnectionInfo::get( false );
        if ( ! info ) {//不存在本地的shard信息,允许该操作继续在本地执行
            // this means the client has nothing sharded
            // so this allows direct connections to do whatever they want
            // which i think is the correct behavior
            return true;
        }
        if ( info->inForceVersionOkMode() ) //版本信息一定是正确的,允许操作继续
            return true;
        // TODO : all collections at some point, be sharded or not, will have a version
        //  (and a ShardChunkManager)
        received = info->getVersion( ns );//得到当前ns版本信息
        wanted = shardingState.getVersion( ns );//从shard中得到信息
		//writeCompatible比较major版本,这个版本只会在chunk发生迁移时增长.
        if( received.isWriteCompatibleWith( wanted ) ) return true;
        // Figure out exactly why not compatible, send appropriate error message
        // The versions themselves are returned in the error, so not needed in messages here
        // Check epoch first, to send more meaningful message, since other parameters probably
        // won't match either
        if( ! wanted.hasCompatibleEpoch( received ) )//Epoch版本不匹配,需要缓存操作,以后发回mogos
            return false;
        if( ! wanted.isSet() && received.isSet() )
            return false;
        if( wanted.isSet() && ! received.isSet() )
            return false;
        if( wanted.majorVersion() != received.majorVersion() ){
            // Could be > or < - wanted is > if this is the source of a migration,
            // wanted < if this is the target of a migration
            return false;
        }
        return false;
    }

继续_handlePossibleShardedMessage->queueWriteBack.

    void WriteBackManager::queueWriteBack( const string& remote , const BSONObj& o ) {
        static mongo::mutex xxx( "WriteBackManager::queueWriteBack tmp" );
        static OID lastOID;
        scoped_lock lk( xxx );
        const BSONElement& e = o["id"];
        lastOID = e.OID();
        getWritebackQueue( remote )->queue.push( o );//记录到queue中
    }

下面我们来看mongos对于这些操作数据的取回过程.前面分析mongos初始化mongodb源码分析(二十五)mongos分片的配置时就提到过当从pool中得到一个连接时,其回调函数onCreate->initShardVersionCB

    bool VersionManager::initShardVersionCB( DBClientBase * conn_in, BSONObj& result ){
        WriteBackListener::init( *conn_in );

这里writeBackListener初始化.继续看这里的init函数.

    void WriteBackListener::init( DBClientBase& conn ) {
        if ( conn.type() == ConnectionString::SYNC ) {
            // don't want write back listeners for config servers
            return;
        }
        if ( conn.type() != ConnectionString::SET ) {//单服务器之间初始化
            init( conn.getServerAddress() );
            return;
        }
        {
            scoped_lock lk( _cacheLock );
            if ( _seenSets.count( conn.getServerAddress() ) )
                return;
        }
        // we want to do writebacks on all rs nodes
        string errmsg;
        ConnectionString cs = ConnectionString::parse( conn.getServerAddress() , errmsg );
        vector<HostAndPort> hosts = cs.getServers();
        for ( unsigned i=0; i<hosts.size(); i++ )//多服务器一台一台初始化
            init( hosts[i].toString() );
    }
    void WriteBackListener::init( const string& host ) {
        scoped_lock lk( _cacheLock );
        WriteBackListener*& l = _cache[host];//建立了的连接缓存起来
        if ( l )
            return;
        l = new WriteBackListener( host );//新建一个writeBackListener
        l->go();//这里开启一个线程专门负责从对端host中取出相应的未正确发往目的地的操作,然后发往正确的host
    }

下面来看这里的线程,其函数为:

    void WriteBackListener::run() {
        int secsToSleep = 0;
        scoped_ptr<ShardChunkVersion> lastNeededVersion;
        int lastNeededCount = 0;
        while ( ! inShutdown() ) {
            if ( ! Shard::isAShardNode( _addr ) ) {//不是一个shard节点.
                sleepsecs( 60 );
                continue;
            }
            try {
                scoped_ptr<ScopedDbConnection> conn(
                        ScopedDbConnection::getInternalScopedDbConnection( _addr ) );
                BSONObj result;
                {
                    BSONObjBuilder cmd;//发往shard的命令,取回操作
                    cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data
                    if ( ! conn->get()->runCommand( "admin" , cmd.obj() , result ) ) {
                        result = result.getOwned();
                        conn->done();
                        continue;
                    }
                }
                conn->done();
                BSONObj data = result.getObjectField( "data" );
                if ( data.getBoolField( "writeBack" ) ) {//实际取得数据
                    string ns = data["ns"].valuestrsafe();
                    ConnectionIdent cid( "" , 0 );
                    OID wid;
                    if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ) {
                        string s = "";
                        if ( data["instanceIdent"].type() == String )
                            s = data["instanceIdent"].String();
                        cid = ConnectionIdent( s , data["connectionId"].numberLong() );
                        wid = data["id"].OID();
                    }
                    int len; // not used, but needed for next call
                    Message msg( (void*)data["msg"].binData( len ) , false );
                    DBConfigPtr db = grid.getDBConfig( ns );
                    ShardChunkVersion needVersion = ShardChunkVersion::fromBSON( data, "version" );
                    // TODO: Refactor the sharded strategy to correctly handle all sharding state changes itself,
                    // we can't rely on WBL to do this for us b/c anything could reset our state in-between.
                    // We should always reload here for efficiency when possible, but staleness is also caught in the
                    // loop below.
                    ChunkManagerPtr manager;
                    ShardPtr primary;
                    db->getChunkManagerOrPrimary( ns, manager, primary );
                    ShardChunkVersion currVersion;
                    if( manager ) currVersion = manager->getVersion();
                    // We should reload only if we need to update our version to be compatible *and* we
                    // haven't already done so.  This avoids lots of reloading when we remove/add a sharded collection
                    bool alreadyReloaded = lastNeededVersion &&
                                           lastNeededVersion->isEquivalentTo( needVersion );
                    if( alreadyReloaded ){}
                    else if( lastNeededVersion )
                        lastNeededCount = 0;
                    // Set our lastNeededVersion for next time
                    lastNeededVersion.reset( new ShardChunkVersion( needVersion ) );
                    lastNeededCount++;
                    // Determine if we should reload, if so, reload
					//版本是否兼容,是否需要重新加载配置
                    bool shouldReload = ! needVersion.isWriteCompatibleWith( currVersion ) &&
                                        ! alreadyReloaded;
                    if( shouldReload && currVersion.isSet()
                                     && needVersion.isSet()
                                     && currVersion.hasCompatibleEpoch( needVersion ) )
                    {
                        // If we disagree about versions only, reload the chunk manager
                        db->getChunkManagerIfExists( ns, true );
                    }
                    else if( shouldReload ){
                        // If we disagree about anything else, reload the full db
                        db->reload();
                    }
                    // do request and then call getLastError
                    // we have to call getLastError so we can return the right fields to the user if they decide to call getLastError
                    BSONObj gle;
                    int attempts = 0;
                    while ( true ) {
                        attempts++;
                        try {//这里再次将来操作发往正确的shard中
                            Request r( msg , 0 );
                            r.init();
                            r.d().reservedField() |= Reserved_FromWriteback;
                            ClientInfo * ci = r.getClientInfo();
                            if (!noauth) {
                                // TODO: Figure out why this is 'admin' instead of 'local'.
                                ci->getAuthenticationInfo()->authorize("admin", internalSecurity.user);
                            }
                            ci->noAutoSplit();
                            r.process( attempts );//再次将该操作发向应该被操作的shard
                            ci->newRequest(); // this so we flip prev and cur shards
                            BSONObjBuilder b;
                            string errmsg;
                            if ( ! ci->getLastError( "admin",
                                                     BSON( "getLastError" << 1 ),
                                                     b,
                                                     errmsg,
                                                     true ) )
                            {
                                b.appendBool( "commandFailed" , true );
                                if( ! b.hasField( "errmsg" ) ){
                                    b.append( "errmsg", errmsg );
                                    gle = b.obj();
                                }
                                else if( errmsg.size() > 0 ){
                                    // Rebuild GLE object with errmsg
                                    // TODO: Make this less clumsy by improving GLE interface
                                    gle = b.obj();
                                    if( gle["errmsg"].type() == String ){
                                        BSONObj gleNoErrmsg =
                                                gle.filterFieldsUndotted( BSON( "errmsg" << 1 ),
                                                                          false );
                                        BSONObjBuilder bb;
                                        bb.appendElements( gleNoErrmsg );
                                        bb.append( "errmsg", gle["errmsg"].String() +
                                                             " ::and:: " +
                                                             errmsg );
                                        gle = bb.obj().getOwned();
                                    }
                                }
                            }
                            else{
                                gle = b.obj();
                            }//9517表示chunk version再次发生了变化,然后这次的发送又失败了,只能取回来
							//再次找到正确的shard发往之
                            if ( gle["code"].numberInt() == 9517 ) {
                                lastNeededVersion.reset();
                                lastNeededCount = 1;
                                // Bringing this in line with the similar retry logic elsewhere
                                // TODO: Reloading the chunk manager may not help if we dropped a
                                // collection, but we don't actually have that info in the writeback
                                // error
                                if( attempts <= 2 ){
                                    db->getChunkManagerIfExists( ns, true );
                                }
                                else{
                                    versionManager.forceRemoteCheckShardVersionCB( ns );
                                    sleepsecs( attempts - 1 );
                                }
                                continue;
                            }
                            ci->clearSinceLastGetError();
                        }
                        break;
                    }
                    {
                        scoped_lock lk( _seenWritebacksLock );
                        WBStatus& s = _seenWritebacks[cid];
                        s.id = wid;
                        s.gle = gle;
                    }
                }
                secsToSleep = 0;
                continue;
            }
            secsToSleep++;
            sleepsecs(secsToSleep);
            if ( secsToSleep > 10 )
                secsToSleep = 0;
        }
    }

简单而言就是读取shard端的writeback的操作,然后将其发往正确的shard.

为什么会发生writeback这种状态呢,我猜测如网络延时这种情况发生导致了有些操作发往了错误的shard.

原文链接:mongodb源码分析(二十五)mongos writeback

作者:    yhjj0108,杨浩

抱歉!评论已关闭.