class="java" name="code"> /******************************************************************************* * Copyright (c) quickfixengine.org All rights reserved. * * This file is part of the QuickFIX FIX Engine * * This file may be distributed under the terms of the quickfixengine.org * license as defined by quickfixengine.org and appearing in the file * LICENSE included in the packaging of this file. * * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A * PARTICULAR PURPOSE. * * See http://www.quickfixengine.org/LICENSE for licensing information. * * Contact ask@quickfixengine.org if any conditions of this licensing * are not clear to you. ******************************************************************************/ package com.qchen.quickfixmessage.util; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import junit.framework.Assert; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.TransportType; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import quickfix.mina.message.FIXProtocolCodecFactory; public class TestConnection { private static HashMap<TransportType, IoConnector> connectors = new HashMap<TransportType, IoConnector>(); private Logger log = LoggerFactory.getLogger(getClass()); private HashMap<Integer, TestIoHandler> ioHandlers = new HashMap<Integer, TestIoHandler>(); public void sendMessage(int clientId, String message) throws IOException { TestIoHandler handler = getIoHandler(clientId); handler.getSession().write(message); } private TestIoHandler getIoHandler(int clientId) { synchronized (ioHandlers) { return ioHandlers.get(Integer.valueOf(clientId)); } } public void tearDown() { Iterator<TestIoHandler> handlerItr = ioHandlers.values().iterator(); while (handlerItr.hasNext()) { CloseFuture closeFuture = handlerItr.next().getSession().close(); closeFuture.join(); } ioHandlers.clear(); } public CharSequence readMessage(int clientId, long timeout) throws InterruptedException { return getIoHandler(clientId).getNextMessage(timeout); } public void waitForClientDisconnect(int clientId) throws IOException, InterruptedException { getIoHandler(clientId).waitForDisconnect(); } public void connect(int clientId, TransportType transportType, int port) throws UnknownHostException, IOException { IoConnector connector = connectors.get(transportType); if (connector == null) { if (transportType == TransportType.SOCKET) { connector = new SocketConnector(); } else if (transportType == TransportType.VM_PIPE) { connector = new VmPipeConnector(); } else { throw new RuntimeException("Unsupported transport type: " + transportType); } connectors.put(transportType, connector); } SocketAddress address; if (transportType == TransportType.SOCKET) { address = new InetSocketAddress("localhost", port); } else if (transportType == TransportType.VM_PIPE) { address = new VmPipeAddress(port); } else { throw new RuntimeException("Unsupported transport type: " + transportType); } TestIoHandler testIoHandler = new TestIoHandler(); synchronized (ioHandlers) { ioHandlers.put(Integer.valueOf(clientId), testIoHandler); ConnectFuture future = connector.connect(address, testIoHandler); future.join(); Assert.assertTrue("connection to server failed", future.isConnected()); } } private class TestIoHandler extends IoHandlerAdapter { private IoSession session; private BlockingQueue<Object> messages = new LinkedBlockingQueue<Object>(); private CountDownLatch sessionCreatedLatch = new CountDownLatch(1); private CountDownLatch disconnectLatch = new CountDownLatch(1); public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); this.session = session; session.getFilterChain().addLast("codec", new ProtocolCodecFilter(new FIXProtocolCodecFactory())); sessionCreatedLatch.countDown(); } public void exceptionCaught(IoSession session, Throwable cause) throws Exception { super.exceptionCaught(session, cause); log.error(cause.getMessage(), cause); } public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); disconnectLatch.countDown(); } public void messageReceived(IoSession session, Object message) throws Exception { messages.add(message); } public IoSession getSession() { try { sessionCreatedLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } return session; } public String getNextMessage(long timeout) throws InterruptedException { return (String) messages.poll(timeout, TimeUnit.MILLISECONDS); } public void waitForDisconnect() throws InterruptedException { if (!disconnectLatch.await(500000L, TimeUnit.MILLISECONDS)) { Assert.fail("client not disconnected"); } } } }
package com.qchen.quickfixmessage; import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Date; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.TransportType; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import com.qchen.quickfixmessage.util.TestConnection; import quickfix.field.HeartBtInt; import quickfix.field.SenderCompID; import quickfix.fix40.Logon; import quickfix.fix40.Message; public class FixMinaClient { private TestConnection connection = new TestConnection(); private static TransportType transportType = TransportType.SOCKET; private static int port = 9876; public static final String SOH = "\u0001"; public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException { // SocketConnector connector = new SocketConnector(); // SocketConnectorConfig config = connector.getDefaultConfig(); // // config.setConnectTimeout(3000); // config.getFilterChain().addLast("codec", // new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // connector.connect(new InetSocketAddress("localhost",9876), new ClientHandler(),config); FixMinaClient client = new FixMinaClient(); StringBuilder sb = new StringBuilder(); String time = "20131015-09:06:26"; sb.append("8=FIX.4.0").append(SOH).append("35=A").append(SOH).append("34=5").append(SOH) .append("49=BANZAI").append(SOH).append("52=").append(time).append(SOH).append("56=EXEC").append(SOH) .append("98=0").append(SOH).append("108=30").append(SOH).append("10=005").append(SOH); // 8=FIX.4.09=5735=A34=149=TW52=20131015-09:00:2656=ISLD98=0108=3010=005 String logonString = sb.toString(); client.connection.connect(1, transportType, port); client.connection.sendMessage(1, logonString); CharSequence messageBack = client.connection.readMessage(1, 3000); System.out.println("back msg is "+messageBack); } } class ClientHandler extends IoHandlerAdapter{ public void sessionOpened(IoSession session) { System.err.println("session opened"); Logon logonMessage = new Logon(); logonMessage.set(new HeartBtInt(30)); session.write(logonMessage); } private void accpetInput() { } public void messageReceived(IoSession session, Object message) { System.out.println("in messageReceived!"); session.write("receive from server"); } }