DBPort 是表示数据库端口的类,分别用 call 和 say 方法实现读取和写入操作。
这两个方法都调用了 go 方法。
// 读取操作
Response call( OutMessage msg , DBCollection coll )
throws IOException {
return go( msg , coll );
}
// 写入操作
void say( OutMessage msg )
throws IOException {
go( msg , null );
}
// 执行操作
private synchronized Response go( OutMessage msg , DBCollection coll )
throws IOException {
return go( msg , coll , false );
}
// 执行操作
private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse )
throws IOException {
// 正在处理请求
if ( _processingResponse ){
if ( coll == null ){
// this could be a pipeline and should be safe
}
else {
// this could cause issues since we're reading data off the wire
throw new IllegalStateException( "DBPort.go called and expecting a response while processing another response" );
}
}
// 增加调用次数计数
_calls++;
// _sorket 为空,打开连接
if ( _socket == null )
_open();
if ( _out == null )
throw new IllegalStateException( "_out shouldn't be null" );
try {
// 准备消息
msg.prepare();
// 输出
msg.pipe( _out );
if ( _pool != null )
_pool._everWorked = true;
if ( coll == null && ! forceReponse )
return null;
_processingResponse = true;
// 返回结果
return new Response( _sa , coll , _in , _decoder);
}
catch ( IOException ioe ){
close();
throw ioe;
}
finally {
_processingResponse = false;
}
}
DBProt 的 go 方法调用了 OutMessage 的 prepare、pipe 等方法,实际上这些方法又是间接地通过 PoolOutputBuffer 实现的,这将在后面的文章中提到。
另外 DBPort 的 open 方法用于打开数据连接:
// 打开连接
boolean _open()
throws IOException {
long sleepTime = 100;
final long start = System.currentTimeMillis();
while ( true ){
IOException lastError = null;
try {
// 创建 socket 并连接
_socket = new Socket();
_socket.connect( _addr , _options.connectTimeout );
// 设置 socket 参数
_socket.setTcpNoDelay( ! USE_NAGLE );
_socket.setKeepAlive( _options.socketKeepAlive );
_socket.setSoTimeout( _options.socketTimeout );
// 获取输入输出流
_in = new BufferedInputStream( _socket.getInputStream() );
_out = _socket.getOutputStream();
return true;
}
catch ( IOException ioe ){
// ...
}
if ( ! _options.autoConnectRetry || ( _pool != null && ! _pool._everWorked ) )
throw lastError;
// 超时处理
long sleptSoFar = System.currentTimeMillis() - start;
if ( sleptSoFar >= CONN_RETRY_TIME_MS )
throw lastError;
if ( sleepTime + sleptSoFar > CONN_RETRY_TIME_MS )
sleepTime = CONN_RETRY_TIME_MS - sleptSoFar;
// 等待重试
_logger.severe( "going to sleep and retry. total sleep time after = " + ( sleptSoFar + sleptSoFar ) + "ms this time:" + sleepTime + "ms" );
ThreadUtil.sleep( sleepTime );
sleepTime *= 2;
}
}