使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题_C/C++_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > C/C++ > 使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题

使用ACE_Proactor模式,在使用ACE_Asynch_Write_Stream.write发送数据的问题

 2011/9/26 8:04:05  sogo6  http://sogo6.iteye.com  我要评论(0)
  • 摘要:1.使用ACE_Proactor模式,客户端实现代码如下:voidTClientHandler::open(ACE_HANDLEhandle,ACE_Message_Block&msgBlock){this->handle(handle);intsize=0;intbuf_len=sizeof(int);ACE_OS::getsockopt(handle,SOL_SOCKET,SO_RCVBUF,(char*)(&size),&buf_len);iLog.info
  • 标签:使用 问题 模式 数据 CTO
1. 使用ACE_Proactor模式,客户端实现代码如下:


void TClientHandler::open(ACE_HANDLE handle, ACE_Message_Block &msgBlock)
{  
    this->handle(handle);
    int size = 0;  
    int buf_len = sizeof(int);  
    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)(&size), &buf_len);
    iLog.info("Receive size is %d", size);
    size = size * 10;  
    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (const char*)(&size), sizeof(int));

    size = 0;
    ACE_OS::getsockopt(handle, SOL_SOCKET, SO_SNDBUF, (char*)(&size), &buf_len);
    iLog.info("Send size is %d", size);
    size = size * 10;
    ACE_OS::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)(&size), sizeof(int));

    if (this->m_reader.open (*this) != 0)
    {
        delete this;
        return;
    }  
    if (this->m_writer.open (*this) != 0)
    {
        delete this;
        return;
    }

    ACE_Message_Block *mb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
    if (this->m_reader.read (*mb, mb->space ()) != 0)
    {
        delete this;
        return;
    }

    //  向tmc发送注册命令
    TMessage *tmsg = new (sizeof(TMessage))TMessage;
    tmsg->msgHeader.version = TVERSION_BASE;
    tmsg->msgHeader.cmdCode = CMD_LOGIN;
    tmsg->msgHeader.sendProcType = getLocalProcType();
    tmsg->msgHeader.sendProcHandle =  getLocalProcHandle();
    this->write_message(tmsg, sizeof(TMessage));

    return;
}

//异步读完成后会调用此函数  
void TClientHandler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{  
    ACE_Message_Block &mbb = result.message_block();  
    if (!result.success () || result.bytes_transferred () == 0)
    {
        // 知道remoteaddress是哪个?
        mbb.release();
        delete this;
        return;
    }
    
    ACE_Message_Block *mb = NULL;
    do 
    {
        // 用这种方法尽量保证可以clone到数据
        mb = mbb.clone();
    } while (NULL == mb);

    mbb.release();

    // 根据消息类型,将消息放入到不同的队列里面
    TMessage *tmsg = NULL;
    tmsg = (TMessage *)mb->base();

    // 每个接收的消息都记录一下日志
    //writeLog(tmsg);
    if (MT_RESPONSE == tmsg->msgHeader.msgType)
    {
        // 如果是响应消息就放入到响应队列里
        TResponseQueue::instance()->enqueue(mb);
        //iLog.info("Put message to response queue, retCode=%d", retCode);
    }
    else
    {
        // 走到这里都是请求消息或者通知消息,还有异步请求与异步响应
        if (CMD_LOGIN == tmsg->msgHeader.cmdCode)
        {
            // 如果是命令码为CMD_LOGIN的通知消息,先处理一下,再放入消息队列
            iLog.info("Login notice, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);
            // 这个消息不需要放入队列,在这里release掉
            mb->release();

            // 发异步消息取相应TMC上所有连接的进程
            TMessage *mymsg = new (sizeof(TMessage))TMessage;
            mymsg->msgHeader.version = TVERSION_BASE;
            mymsg->msgHeader.msgType = MT_ASYN_REQUEST;  // 发送异步请求消息
            mymsg->msgHeader.cmdCode = CMD_GETPROCINTMC;
            mymsg->msgHeader.sendProcType = getLocalProcType();
            mymsg->msgHeader.sendProcHandle =  getLocalProcHandle();
            this->write_message(mymsg, sizeof(TMessage));

        }
        else if (CMD_GETPROCINTMC == tmsg->msgHeader.cmdCode)
        {
            iLog.info("Process in tmc, procIds=%s length=%d", tmsg->msgContent, tmsg->msgHeader.length);
            TClientHandlerMgr::instance()->addAll(this, tmsg->msgContent);

            // 这个消息不需要放入队列,在这里release掉
            mb->release();
        }
        else
        {
            // 如果是同步请求消息,异步请求,通知消息和,异步请求,异步响应,就放入TikyTaskMgr队列
            TikyTaskMgr *taskMgr = TikyAppMgr::instance()->getTaskMgr(DEFAULT_TASKMGRID);
            if (NULL != taskMgr)
            {
                taskMgr->putq(mb);
                //iLog.info("Put messaget to taskmgr, retCode=%d", retCode);
            }
            else
            {
                iLog.error("TaskMgr(%d) is not found...", DEFAULT_TASKMGRID);
                
                // 回送一个响应消息,没有找到TaskMgr
                TMessage *rspMsg = new (sizeof(TMessage))TMessage;
                rspMsg->msgHeader.msgType = MT_RESPONSE;
                rspMsg->msgHeader.sendProcType = tmsg->msgHeader.recvProcType;
                rspMsg->msgHeader.sendProcHandle = tmsg->msgHeader.recvProcHandle;
                rspMsg->msgHeader.recvProcType = tmsg->msgHeader.sendProcType;
                rspMsg->msgHeader.recvProcHandle = tmsg->msgHeader.sendProcHandle;
                rspMsg->msgHeader.sn = tmsg->msgHeader.sn;
                // 没有TaskMgr,就回送一个错误信息
                rspMsg->msgHeader.retCode = ERR_TCOM_NOTFOUNDTASKMGR;
                this->write_message(rspMsg, sizeof(TMessage));
                mb->release();
            }
        }
    }

    ACE_Message_Block *nmb = new ACE_Message_Block(m_msgBuffer, MSG_BUFFER_LEN);
    if (this->m_reader.read(*nmb, nmb->space()) != 0)
    {
        nmb->release();
        delete this;
        return;
    }
}

//异步写完成后会调用此函数
void TClientHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
    iLog.info("Handle result is %d", result.success());
    ACE_Message_Block &mb = result.message_block();
    iLog.info("Write result=%ld, trans=%d, write=%d, len=%d", result.error(), result.bytes_transferred(),
        result.bytes_to_write(), mb.length());
    TMessage *tmsg = (TMessage *)mb.base();
    // 记录一下发送消息日志
    writeLog(tmsg);
    delete tmsg;
    mb.release();
    return;
}

int TClientHandler::write_message(TMessage *tmsg, unsigned short msgLength)
{
    if (NULL == tmsg)
    {
        iLog.error("[TClientHandler::write_message] tmsg is NULL.");
        return -1;
    }

    ACE_Message_Block *mbb = new ACE_Message_Block((const char *)tmsg, msgLength);
    // 要设置一下wr_ptr的指针,要不mbb->length()返回的会是0
    mbb->wr_ptr(msgLength);
    int retCode = this->m_writer.write(*mbb, mbb->length());
    if (retCode != 0)
    {
        iLog.error("Send msg failure, lasterror=%d, length=%d, retCode=%d", ACE_OS::last_error(), mbb->length(), retCode);
        delete tmsg;
        mbb->release();
        delete this;
    }
    return retCode;
}


写一个循环不间断调用write_message发送数据,会看到发送的数据,会用一个TCP报文发出去,不知道有没有人遇到相同的问题?
发表评论
用户名: 匿名