MongoDB Java Driver 源码分析(6):com.mongodb.DBTCPConnector_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > MongoDB Java Driver 源码分析(6):com.mongodb.DBTCPConnector

MongoDB Java Driver 源码分析(6):com.mongodb.DBTCPConnector

 2012/2/22 9:20:01  裴小星  程序员俱乐部  我要评论(0)
  • 摘要:DBTCPConnecror是对DBPort类的封装,借助DBPort实现读写操作、获取服务器状态等。say方法和call方法DBTCPConnecror类中比较值得分析的是say方法和call方法的实现://执行写操作WriteResultsay(DBdb,OutMessagem,WriteConcernconcern,ServerAddresshostNeeded)//执行读操作Responsecall(DBdb,DBCollectioncoll,OutMessagem
  • 标签:源码 Java 分析 CTO MongoDB
  DBTCPConnecror 是对 DBPort 类的封装,借助 DBPort 实现读写操作、获取服务器状态等。
say 方法和 call 方法
  DBTCPConnecror 类中比较值得分析的是 say 方法和 call 方法的实现:
// 执行写操作
WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
// 执行读操作
Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries )

  这两个方法的实现方式相似,实际上都是通过 DBPort 实现,同时增加了检查连接和错误处理的代码。
  这里仅以 say 方法的实现作为例子进行说明:
    public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
        throws MongoException {

        // 检查数据库连接
        _checkClosed();
        checkMaster( false , true );

        // 获取数据连接端口
        MyPort mp = _myPort.get();
        DBPort port = mp.get( true , false , hostNeeded );

        try {
            // 检查权限
            port.checkAuth( db );

            // 通过 DBPort 实现写操作
            port.say( m );

            // 检查错误或返回正确的结果
            if ( concern.callGetLastError() ){
                return _checkWriteError( db , mp , port , concern );
            }
            else {
                return new WriteResult( db , port , concern );
            }
        }
        catch (...){
            // ...
        }
        finally {
            // 结束操作
            mp.done( port );
            m.doneWithMessage();
        }
    }

  say 和 call 方式是借助 DBPort  实现的,而 DBPort 对象是通过内部类 DBTCPConnector.MyPort 的 get 方法获取的:
        // 获取数据库端口
        DBPort get( boolean keep , boolean slaveOk , ServerAddress hostNeeded ){

            // 如果指定了服务器,就获取指定服务器的端口
            if ( hostNeeded != null ){
                // asked for a specific host
                return _portHolder.get(hostNeeded ).get();
            }

            // 在一个请求中,如果已经使用了一个端口,则继续使用它
            if ( _requestPort != null ){
                if ( _requestPort.getPool() == _masterPortPool || !keep ) {
                    return _requestPort;
                }

                _requestPort.getPool().done(_requestPort);
                _requestPort = null;
            }

            //  集群部署,可以从 Slave 获取端口
            if ( slaveOk && _rsStatus != null ){
                // if slaveOk, try to use a secondary
                ServerAddress slave = _rsStatus.getASecondary();
                if ( slave != null ){
                    return _portHolder.get( slave ).get();
                }
            }

            //  master 端口池为空,抛异常
            if (_masterPortPool == null) {
                throw new MongoException("Rare case where master=null, probably all servers are down");
            }

            // 通过 master 端口池获取端口
            DBPort p = _masterPortPool.get();
            if ( keep && _inRequest ) {
                _requestPort = p;
            }

            return p;
        }

其他方法
  getServerAddressList (获取服务器地址列表),checkMaster(检查 master 服务器)和 fetchMaxBsonObjectSize (获取 maxBsonObjectSize) 也有一定的分析价值:
    // 获取服务器地址列表
    public List<ServerAddress> getServerAddressList() {
        // 如果是集群方式,则通过 ReplicaSetStatus 返回地址列表
        if (_rsStatus != null) {
            return _rsStatus.getServerAddressList();
        }

        // 否则,返回 master 服务器的地址
        ServerAddress master = getAddress();
        if (master != null) {
            List<ServerAddress> list = new ArrayList<ServerAddress>();
            list.add(master);
            return list;
        }
        return null;
    }

    // 检查 master 服务器
    void checkMaster( boolean force , boolean failIfNoMaster )
        throws MongoException {
        
        // 检查是集群部署还是单机部署
        if ( _rsStatus != null ){
            if ( _masterPortPool == null || force ){

                // 集群部署
                // 通过 ReplicaSetStatus 获取 master 节点
                ReplicaSetStatus.Node n = _rsStatus.ensureMaster();
                if ( n == null ){
                    if ( failIfNoMaster )
                        throw new MongoException( "can't find a master" );
                }
                else {
                    // 根据 master 节点的信息设置 DBTCPConnector 的属性
                    _set( n._addr );
                    maxBsonObjectSize = _rsStatus.getMaxBsonObjectSize();
                }
            }
        } else {
            // 单机部署
            // 根据服务器信息设置 maxBsonObjectSize
            if (maxBsonObjectSize == 0)
                    maxBsonObjectSize = fetchMaxBsonObjectSize();
        }
    }

    // 获取并设置 maxBsonObjectSize
    int fetchMaxBsonObjectSize() {
        if (_masterPortPool == null)
            return 0;
        DBPort port = _masterPortPool.get();
        try {
            // 连接 admin 数据库,执行检查 master 服务器的命令
            CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1));
 
           // 1.8 之后的版本返回的结果中包含 maxBsonObjectSize
            if (res.containsField("maxBsonObjectSize")) {
                maxBsonObjectSize = ((Integer) res.get("maxBsonObjectSize")).intValue();
            } else {
                maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
            }
        } catch (Exception e) {
            _logger.log(Level.WARNING, null, e);
        } finally {
            port.getPool().done(port);
        }
        return maxBsonObjectSize;
    }
发表评论
用户名: 匿名