1. 使用ACE_Proactor模式,客户端实现代码如下:
void TClientHandler::open(ACE_HANDLE handle, ACE_Message_Block &msgBlock)
{
this->handle(handle);
int size = 0;
int buf_len = sizeof(int);
ACE_OS::getsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)(&size), &buf_len);
iLog.info("Receive size is %d", size);
size = size * 10;
ACE_OS::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (const char*)(&size), sizeof(int));
size = 0;
ACE_OS::getsockopt(handle, SOL_SOCKET, SO_SNDBUF, (char*)(&size), &buf_len);
iLog.info("Send size is %d", size);
size = size * 10;
ACE_OS::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)(&size), sizeof(int));
if (this->m_reader.open (*this) != 0)
{
delete this;
return;
}
if (this->m_writer.open (*this) != 0)
{
delete this;
return;
}
ACE_Message_Block *mb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
if (this->m_reader.read (*mb, mb->space ()) != 0)
{
delete this;
return;
}
// 向tmc发送注册命令
TMessage *tmsg = new (sizeof(TMessage))TMessage;
tmsg->msgHeader.version = TVERSION_BASE;
tmsg->msgHeader.cmdCode = CMD_LOGIN;
tmsg->msgHeader.sendProcType = getLocalProcType();
tmsg->msgHeader.sendProcHandle = getLocalProcHandle();
this->write_message(tmsg, sizeof(TMessage));
return;
}
//异步读完成后会调用此函数
void TClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mbb = result.message_block();
if (!result.success () || result.bytes_transferred () == 0)
{
// 知道remoteaddress是哪个?
mbb.release();
delete this;
return;
}
ACE_Message_Block *mb = NULL;
do
{
// 用这种方法尽量保证可以clone到数据
mb = mbb.clone();
} while (NULL == mb);
mbb.release();
// 根据消息类型,将消息放入到不同的队列里面
TMessage *tmsg = NULL;
tmsg = (TMessage *)mb->base();
// 每个接收的消息都记录一下日志
//writeLog(tmsg);
if (MT_RESPONSE == tmsg->msgHeader.msgType)
{
// 如果是响应消息就放入到响应队列里
TResponseQueue::instance()->enqueue(mb);
//iLog.info("Put message to response queue, retCode=%d", retCode);
}
else
{
// 走到这里都是请求消息或者通知消息,还有异步请求与异步响应
if (CMD_LOGIN == tmsg->msgHeader.cmdCode)
{
// 如果是命令码为CMD_LOGIN的通知消息,先处理一下,再放入消息队列
iLog.info("Login notice, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);
// 这个消息不需要放入队列,在这里release掉
mb->release();
// 发异步消息取相应TMC上所有连接的进程
TMessage *mymsg = new (sizeof(TMessage))TMessage;
mymsg->msgHeader.version = TVERSION_BASE;
mymsg->msgHeader.msgType = MT_ASYN_REQUEST; // 发送异步请求消息
mymsg->msgHeader.cmdCode = CMD_GETPROCINTMC;
mymsg->msgHeader.sendProcType = getLocalProcType();
mymsg->msgHeader.sendProcHandle = getLocalProcHandle();
this->write_message(mymsg, sizeof(TMessage));
}
else if (CMD_GETPROCINTMC == tmsg->msgHeader.cmdCode)
{
iLog.info("Process in tmc, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);
// 这个消息不需要放入队列,在这里release掉
mb->release();
}
else
{
// 如果是同步请求消息,异步请求,通知消息和,异步请求,异步响应,就放入TikyTaskMgr队列
TikyTaskMgr *taskMgr = TikyAppMgr::instance()->getTaskMgr(DEFAULT_TASKMGRID);
if (NULL != taskMgr)
{
taskMgr->putq(mb);
//iLog.info("Put messaget to taskmgr, retCode=%d", retCode);
}
else
{
iLog.error("TaskMgr(%d) is not found...", DEFAULT_TASKMGRID);
// 回送一个响应消息,没有找到TaskMgr
TMessage *rspMsg = new (sizeof(TMessage))TMessage;
rspMsg->msgHeader.msgType = MT_RESPONSE;
rspMsg->msgHeader.sendProcType = tmsg->msgHeader.recvProcType;
rspMsg->msgHeader.sendProcHandle = tmsg->msgHeader.recvProcHandle;
rspMsg->msgHeader.recvProcType = tmsg->msgHeader.sendProcType;
rspMsg->msgHeader.recvProcHandle = tmsg->msgHeader.sendProcHandle;
rspMsg->msgHeader.sn = tmsg->msgHeader.sn;
// 没有TaskMgr,就回送一个错误信息
rspMsg->msgHeader.retCode = ERR_TCOM_NOTFOUNDTASKMGR;
this->write_message(rspMsg, sizeof(TMessage));
mb->release();
}
}
}
ACE_Message_Block *nmb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
if (this->m_reader.read(*nmb, nmb->space()) != 0)
{
nmb->release();
delete this;
return;
}
}
//异步写完成后会调用此函数
void TClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
iLog.info("Handle result is %d", result.success());
ACE_Message_Block &mb = result.message_block();
iLog.info("Write result=%ld, trans=%d, write=%d, len=%d", result.error(), result.bytes_transferred(),
result.bytes_to_write(), mb.length());
TMessage *tmsg = (TMessage *)mb.base();
// 记录一下发送消息日志
writeLog(tmsg);
delete tmsg;
mb.release();
return;
}
int TClientHandler::write_message(TMessage *tmsg, unsigned short msgLength)
{
if (NULL == tmsg)
{
iLog.error("[TClientHandler::write_message] tmsg is NULL.");
return -1;
}
ACE_Message_Block *mbb = new ACE_Message_Block((const char *)tmsg, msgLength);
// 要设置一下wr_ptr的指针,要不mbb->length()返回的会是0
mbb->wr_ptr(msgLength);
int retCode = this->m_writer.write(*mbb, mbb->length());
if (retCode != 0)
{
iLog.error("Send msg failure, lasterror=%d, length=%d, retCode=%d", ACE_OS::last_error(), mbb->length(), retCode);
delete tmsg;
mbb->release();
delete this;
}
return retCode;
}
写一个循环不间断调用write_message发送数据,会看到发送的数据,会用一个TCP报文发出去,不知道有没有人遇到相同的问题?