class="java" name="code">
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/
package com.qchen.quickfixmessage.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportType;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.mina.message.FIXProtocolCodecFactory;
public class TestConnection {
private static HashMap<TransportType, IoConnector> connectors = new HashMap<TransportType, IoConnector>();
private Logger log = LoggerFactory.getLogger(getClass());
private HashMap<Integer, TestIoHandler> ioHandlers = new HashMap<Integer, TestIoHandler>();
public void sendMessage(int clientId, String message) throws IOException {
TestIoHandler handler = getIoHandler(clientId);
handler.getSession().write(message);
}
private TestIoHandler getIoHandler(int clientId) {
synchronized (ioHandlers) {
return ioHandlers.get(Integer.valueOf(clientId));
}
}
public void tearDown() {
Iterator<TestIoHandler> handlerItr = ioHandlers.values().iterator();
while (handlerItr.hasNext()) {
CloseFuture closeFuture = handlerItr.next().getSession().close();
closeFuture.join();
}
ioHandlers.clear();
}
public CharSequence readMessage(int clientId, long timeout) throws InterruptedException {
return getIoHandler(clientId).getNextMessage(timeout);
}
public void waitForClientDisconnect(int clientId) throws IOException, InterruptedException {
getIoHandler(clientId).waitForDisconnect();
}
public void connect(int clientId, TransportType transportType, int port)
throws UnknownHostException, IOException {
IoConnector connector = connectors.get(transportType);
if (connector == null) {
if (transportType == TransportType.SOCKET) {
connector = new SocketConnector();
} else if (transportType == TransportType.VM_PIPE) {
connector = new VmPipeConnector();
} else {
throw new RuntimeException("Unsupported transport type: " + transportType);
}
connectors.put(transportType, connector);
}
SocketAddress address;
if (transportType == TransportType.SOCKET) {
address = new InetSocketAddress("localhost", port);
} else if (transportType == TransportType.VM_PIPE) {
address = new VmPipeAddress(port);
} else {
throw new RuntimeException("Unsupported transport type: " + transportType);
}
TestIoHandler testIoHandler = new TestIoHandler();
synchronized (ioHandlers) {
ioHandlers.put(Integer.valueOf(clientId), testIoHandler);
ConnectFuture future = connector.connect(address, testIoHandler);
future.join();
Assert.assertTrue("connection to server failed", future.isConnected());
}
}
private class TestIoHandler extends IoHandlerAdapter {
private IoSession session;
private BlockingQueue<Object> messages = new LinkedBlockingQueue<Object>();
private CountDownLatch sessionCreatedLatch = new CountDownLatch(1);
private CountDownLatch disconnectLatch = new CountDownLatch(1);
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
this.session = session;
session.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new FIXProtocolCodecFactory()));
sessionCreatedLatch.countDown();
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
log.error(cause.getMessage(), cause);
}
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
disconnectLatch.countDown();
}
public void messageReceived(IoSession session, Object message) throws Exception {
messages.add(message);
}
public IoSession getSession() {
try {
sessionCreatedLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return session;
}
public String getNextMessage(long timeout) throws InterruptedException {
return (String) messages.poll(timeout, TimeUnit.MILLISECONDS);
}
public void waitForDisconnect() throws InterruptedException {
if (!disconnectLatch.await(500000L, TimeUnit.MILLISECONDS)) {
Assert.fail("client not disconnected");
}
}
}
}
package com.qchen.quickfixmessage;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Date;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportType;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import com.qchen.quickfixmessage.util.TestConnection;
import quickfix.field.HeartBtInt;
import quickfix.field.SenderCompID;
import quickfix.fix40.Logon;
import quickfix.fix40.Message;
public class FixMinaClient {
private TestConnection connection = new TestConnection();
private static TransportType transportType = TransportType.SOCKET;
private static int port = 9876;
public static final String SOH = "\u0001";
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
// SocketConnector connector = new SocketConnector();
// SocketConnectorConfig config = connector.getDefaultConfig();
//
// config.setConnectTimeout(3000);
// config.getFilterChain().addLast("codec",
// new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
// connector.connect(new InetSocketAddress("localhost",9876), new ClientHandler(),config);
FixMinaClient client = new FixMinaClient();
StringBuilder sb = new StringBuilder();
String time = "20131015-09:06:26";
sb.append("8=FIX.4.0").append(SOH).append("35=A").append(SOH).append("34=5").append(SOH)
.append("49=BANZAI").append(SOH).append("52=").append(time).append(SOH).append("56=EXEC").append(SOH)
.append("98=0").append(SOH).append("108=30").append(SOH).append("10=005").append(SOH);
// 8=FIX.4.09=5735=A34=149=TW52=20131015-09:00:2656=ISLD98=0108=3010=005
String logonString = sb.toString();
client.connection.connect(1, transportType, port);
client.connection.sendMessage(1, logonString);
CharSequence messageBack = client.connection.readMessage(1, 3000);
System.out.println("back msg is "+messageBack);
}
}
class ClientHandler extends IoHandlerAdapter{
public void sessionOpened(IoSession session) {
System.err.println("session opened");
Logon logonMessage = new Logon();
logonMessage.set(new HeartBtInt(30));
session.write(logonMessage);
}
private void accpetInput() {
}
public void messageReceived(IoSession session, Object message) {
System.out.println("in messageReceived!");
session.write("receive from server");
}
}