SocketListenerPusher.java代码如下:
class="java">
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.directwebremoting.impl.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.shihuan.dragonkeeper.common.utils.PropertiesUtil;
import com.shihuan.dragonkeeper.global.ConfigFile;
public class SocketListenerPusher implements Runnable {
protected static Logger logger = LoggerFactory.getLogger(SocketListenerPusher.class);
public static String socketlistenerserver_CONFIG = ConfigFile.SOCKETLISTENERSERVER__CONFIG + ConfigFile.SUFFIX_NAME;
private ServerSocket serverSocket;
private ExecutorService pool;
public SocketListenerPusher() {
int port = 0;
int poolsize = 0;
try {
port = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverport"));
poolsize = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "poolsize"));
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(port));
pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * poolsize);
//下面两句循环执行run()方法, 相当于while(true){...}
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
executor.scheduleAtFixedRate(this, 1L, 1L, TimeUnit.MILLISECONDS);
} catch (NumberFormatException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
} catch (ConfigurationException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
} catch (IOException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
}
}
public void run() {
Socket socket = null;
try {
socket = serverSocket.accept();
pool.execute(new SocketListenerHandler(socket));
} catch (IOException e) {
System.out.println("线程池被关闭!!!!!!!!!!!");
pool.shutdown();
logger.error(e.getMessage(), e);
e.printStackTrace();
}
}
SocketListenerHandler.java代码如下:
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.net.Socket;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.io.IOUtils;
import org.directwebremoting.Browser;
import org.directwebremoting.ScriptSessions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.shihuan.dragonkeeper.common.dto.DataSourceInfo;
import com.shihuan.dragonkeeper.common.utils.ByteArrayUtil;
import com.shihuan.dragonkeeper.common.utils.DataSourceMapUtil;
import com.shihuan.dragonkeeper.common.utils.DateFormatterUtil;
import com.shihuan.dragonkeeper.common.utils.PropertiesUtil;
import com.shihuan.dragonkeeper.global.ConfigFile;
import com.shihuan.dragonkeeper.server.bean.ActivityServiceBean;
public class SocketListenerHandler implements Runnable {
protected static Logger logger = LoggerFactory.getLogger(SocketListenerHandler.class);
private static String jdbc_CONFIG = ConfigFile.JDBC_CONFIG + ConfigFile.SUFFIX_NAME;
public static final int timeOut = 0*1000 ; //设置读取操作异常为1秒
private final String dataRealTimeAction_id = "Agentdata_" + Math.random();
private static final String noData = "{'nodata':'心跳信息'}";
private static final String errorData = "{'error':'无法解析的请求'}";
private Socket connectedsocket = null;
public SocketListenerHandler(Socket socket){
this.connectedsocket = socket;
}
@Override
public void run() {
BufferedReader in = null;
String resultData = "";
try {
connectedsocket.setSoTimeout(timeOut); //表示接收数据时的等待超时数据, 此方法必须在接收数据之前执行才有效. 此外, 当输入流的 read()方法抛出 SocketTimeoutException后, Socket仍然是连接的, 可以尝试再次读数据, 单位为毫秒, 它的默认值为 0(表示会无限等待, 永远不会超时)
connectedsocket.setKeepAlive(false); //表示对于长时间处于空闲状态的Socket, 是否要自动把它关闭.
in = new BufferedReader(new InputStreamReader(connectedsocket.getInputStream()));
if (in.ready()) { //判断流中是否有数据
resultData = getNoHeadData(in.readLine()); //从Agent端接收到的数据
logger.info("#### 结果DATA = "+resultData);
if (resultData==null || "".equals(resultData)) {
logger.info(dataRealTimeAction_id + " -->>> " + "内容为空!");
} else if (resultData.charAt(0) != '{') { //要在客户端定时维持心跳信息
logger.info(dataRealTimeAction_id + " -->>> " + noData);
} else {
ActivityServiceBean asb = JSON.parseObject(resultData, ActivityServiceBean.class);
System.out.println("打印预处理信息Start......");
System.out.println(asb.getProxyname() + " -- " + asb.getIp() + " -- " + asb.getCalltime() + " -- " + asb.getAnswertime() + " -- " + asb.getCpu() + " -- " + asb.getThread() + " -- " + asb.getStatus() + " -- " + asb.getAccessaddress() + " -- " + asb.getAccessfilename() + " -- " + asb.getSql() + " -- " + asb.getContent());
System.out.println("打印预处理信息End......");
// parseData(ois);
logger.info(dataRealTimeAction_id + ": 成功处理了接收到的数据!");
}
}
} catch (IOException e) {
logger.error(e.getMessage() + " " + errorData, e);
e.printStackTrace();
} catch (NumberFormatException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
e.printStackTrace();
}
}
}
}
TestSocketListenerPusher.java请求端代码如下:
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;
import org.apache.commons.configuration.ConfigurationException;
import com.alibaba.fastjson.JSON;
import com.shihuan.dragonkeeper.common.utils.ByteArrayUtil;
import com.shihuan.dragonkeeper.common.utils.PropertiesUtil;
import com.shihuan.dragonkeeper.global.ConfigFile;
import com.shihuan.dragonkeeper.server.bean.ActivityServiceBean;
public class TestSocketListenerPusher implements Runnable {
private static String socketlistenerserver_CONFIG = ConfigFile.SOCKETLISTENERSERVER__CONFIG + ConfigFile.SUFFIX_NAME;
private Socket socketclient = null;
@Override
public void run() {
String serverip = "";
int port = 0;
OutputStream os = null;
try {
serverip = PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverip");
port = Integer.parseInt(PropertiesUtil.getPropertiesValue(socketlistenerserver_CONFIG, "serverport"));
ActivityServiceBean asb = null;
for (int i=0; i<2; i++) {
asb = new ActivityServiceBean();
asb.setProxyname("testProxyname"+i);
asb.setIp("testIp"+i);
Date curdate = new Date();
asb.setCalltime(curdate);
asb.setAnswertime(curdate);
asb.setCpu("testCpu"+i);
asb.setThread("testThread"+i);
asb.setStatus("testStatus"+i);
asb.setAccessaddress("testAccessaddress"+i);
asb.setAccessfilename("testAccessfilename"+i);
asb.setSql("testSql"+i);
asb.setContent("testContent"+i);
String jsonStr = JSON.toJSONString(asb).trim();
byte[] information = (new String(ByteArrayUtil.getIntToByte(jsonStr.length()))+jsonStr).getBytes();
System.out.println(information.length);
socketclient = new Socket(serverip, port);
socketclient.setSoTimeout(0);
socketclient.setKeepAlive(false);
os = new BufferedOutputStream(socketclient.getOutputStream());
os.write(information);
os.flush();
System.out.println("Client" + i + " -->>> " + new String(ByteArrayUtil.getIntToByte(jsonStr.length()))+jsonStr);
os.close();
Thread.sleep(3000);
}
} catch (ConfigurationException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/*
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
e.printStackTrace();
}
*/
}
}
public static void main(String[] args) {
Thread t = new Thread(new TestSocketListenerPusher());
t.start();
}
}
源代码在笔者shihuan8@163.com邮箱网盘中
J2EE代码文件夹里。