Disruptor应用实例 _JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Disruptor应用实例

Disruptor应用实例

 2019/1/8 15:37:16  bijian1013  程序员俱乐部  我要评论(0)
  • 摘要:Disruptor是什么可以阅读《高性能线程间队列DISRUPTOR简介》一文,下面重点讲讲在实际应用中如何去使用Disruptor。项目结构如下:CreateReqEvent.javapackagecom.bijian.study;importcom.lmax.disruptor.EventFactory;publicclassCreateReqEvent{privateStringreqStr;publicStringgetReqStr(){returnreqStr;
  • 标签:

  Disruptor是什么可以阅读《高性能线程队列DISRUPTOR简介》一文,下面重点讲讲在实际应用中如何去使用Disruptor。

? ? ? ? 项目结构如下:


CreateReqEvent.java

class="java">package com.bijian.study;

import com.lmax.disruptor.EventFactory;

public class CreateReqEvent {
	
    private String reqStr;
    
    public String getReqStr() {
		return reqStr;
	}

	public void setReqStr(String reqStr) {
		this.reqStr = reqStr;
	}

	private static class Factory implements EventFactory<CreateReqEvent> {

        @Override
        public CreateReqEvent newInstance() {
            return new CreateReqEvent();
        }
    }

    public static final CreateReqEvent.Factory FACTORY = new CreateReqEvent.Factory();
}

CreateReqEventHandler.java

package com.bijian.study;

import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.EventHandler;

public class CreateReqEventHandler implements EventHandler<CreateReqEvent> {
	
    private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override
    public void onEvent(CreateReqEvent event, long sequence, boolean endOfBatch) throws Exception {
    	
        log.info("on event:{}", event.getReqStr());
    }
}

CreateReqEventTranslator.java

package com.bijian.study;

import com.lmax.disruptor.EventTranslator;

public class CreateReqEventTranslator implements EventTranslator<CreateReqEvent> {

    private String reqString;
    
    public String getReqString() {
		return reqString;
	}

	public void setReqString(String reqString) {
		this.reqString = reqString;
	}

	@Override
    public void translateTo(CreateReqEvent event, long sequence) {
        event.setReqStr(reqString);
    }
}

ReqEventUtil.java

package com.bijian.study;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class ReqEventUtil {
	
    private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    private static volatile Disruptor<CreateReqEvent> disruptor;
    private static ExecutorService executor;

    //启动处理线程
    static {
        int ringBufferSize = 256 * 256; // RingBuffer 大小=65536,必须是 2的 N次方;
        executor = Executors.newFixedThreadPool(4*4);
        disruptor = new Disruptor(CreateReqEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,new BlockingWaitStrategy());

        EventHandler<CreateReqEvent> eventHandler = new CreateReqEventHandler();
        disruptor.handleEventsWith(eventHandler);
        disruptor.start();
    }
    
    public static void push(String reqString){
        try {
            log.info("push create reqString event:{}", reqString);
            CreateReqEventTranslator translator = new CreateReqEventTranslator();
            translator.setReqString(reqString);
            disruptor.publishEvent(translator);
        }catch (Exception e){
            log.error("push CreateOrderEvent error:",e);
        }
    }
    
    /**
     * 停止处理
     */
    public static void shutdown(){
    	
        log.info("shutdown now...");
        
        if(disruptor != null) {
            disruptor.shutdown();
        }

        if(executor != null) {
            executor.shutdown();
        }
    }
}

测试类Main.java

package com.bijian.test;

import java.lang.invoke.MethodHandles;
import java.util.Scanner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bijian.study.ReqEventUtil;

public class Main {

	private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
	
	public static void main(String[] args) {
		
		Scanner sc = new Scanner(System.in);
		while(sc.hasNext()) {
			String reqStr = sc.nextLine();
			log.info("输入的信息:" + reqStr);
			if(reqStr.equals("exit")) {
				ReqEventUtil.shutdown();
				break;
			}else {
				ReqEventUtil.push(reqStr);
			}
		}
	}
}

运行结果:

test
2018-12-24 23:12:26.337 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:test
2018-12-24 23:12:26.370 [main] INFO  ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:test
2018-12-24 23:12:26.371 [pool-2-thread-1] INFO  CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:test
abc
2018-12-24 23:12:28.323 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:abc
2018-12-24 23:12:28.323 [main] INFO  ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:abc
2018-12-24 23:12:28.324 [pool-2-thread-1] INFO  CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:abc
xyz
2018-12-24 23:12:30.628 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:xyz
2018-12-24 23:12:30.628 [main] INFO  ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:xyz
2018-12-24 23:12:30.628 [pool-2-thread-1] INFO  CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:xyz
exit
2018-12-24 23:12:34.032 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:exit
2018-12-24 23:12:34.033 [main] INFO  ReqEventUtil.shutdown(ReqEventUtil.java:49)[][][] - shutdown now...

?

  • 大小: 16.5 KB
  • DisruptorStudy.zip (1.8 MB)
  • 下载次数: 1
  • 查看图片附件
  • 相关文章
发表评论
用户名: 匿名