转两个帖子
一个java worker thread例子http://blog.csdn.net/derekjiang/article/details/5204090
另一个java worker thread的讲解http://blog.csdn.net/derekjiang/article/details/5204090
?
?
Worker Thread Pattern的参与者:
1. Client(委托人)参与者
??? Client参与者会建立Request参与者,并传给Channel参与者。
2. Channel(通路)参与者
??? Channel参与者会从Client参与者获取Request参与者,传递给Worker参与者。
3. Worker(工人)参与者
??? Worker参与者会从Channel参与者获取Request参与者,并执行这份工作。当工作结束之后,会去拿下一个Request参与者。
4. Request(请求)参与者
??? Reqeust参与者用来表示工作。Request参与者会存放执行这份工作所需要的数据。
---------------------------------------------
控制承载量:
Worker参与者的数量是可以自由设置的。提高Worker参与者的数量,可以提高并发处理的工作量。但是如果准备的Worker参与者数量比同时间的工作量来的多,有些Worker参与者也派不上用场,没活干,还会占用内存。所以有必要配合实际软件使用的需要来调整Worker参与者的数量。
---------------------------------------------
Invocation与execution的分离:
Client参与者会发出工作请求,工作的内容以Request的形式包装起来,传递个Channel参与者。这个过程,对应于普通的方法调用来说,就相当于“评估自变量,启动方法”的操作。
另一方面,Worker参与者会使用从Channel参与者拿来的Request参与者,进行实际的操作。这个过程,对应于普通的方法调用来说,可以对应到“执行方法”的部分。
普通的方法调用的操作,“启动方法”和“执行方法”是连续进行的。当一个方法一经调用,就会马上继续执行,启动和执行是密不可分的。
在Worker Thread Pattern中,实现了“启动方法”和“执行方法”的分离。这样的好处有:
1. 提高响应性。
invocation可以继续自己前进,不用等待execution,这样能提高程序的响应性。
2. 控制实行顺序。
invocation和execution分离,execute的顺序就可以与invoke的次序无关,也就是说,我们可以对Request参与者设立优先级,控制Channel参与者传递Request给Worker参与者的顺序。
3. 可以取消和重复执行。
如果能将invocation和execution分离,那么就能做到在execute之前,把已经invoke的请求的execution取消掉。
同样,只要把Request参与者保存下来,就可以重复执行。
4. 分散处理的第一步
因为invocation和execution分离了,所以invoke和execute的操作也比较容易拆开在不同的计算机上面执行,相当于Reqeust参与者的对象,可以通过网络传送的另外一台计算机上。
------------------------------------------------
多态的Request参与者:
因为WorkerThread参与者只不过是单纯的接收Request的实例,调用Request的execute方法,
那么当我们建立Request类的子类,并将其传递给Channel,WorkerThread也应该能够正确的处理。
执行工作所需要的所有信息,都定义在Request参与者里。所以即使建立出多态的Request参与者,增加工作的种类,Channel参与者和Worker参与者都不需要进行任何修改。
------------------------------------------
实例:
Java代码??
class="star" alt="收藏代码">
- public?class?Request?{??
- ????private?final?String?name;???
- ????private?final?int?number;????
- ????private?static?final?Random?random?=?new?Random();??
- ????public?Request(String?name,?int?number)?{??
- ????????this.name?=?name;??
- ????????this.number?=?number;??
- ????}??
- ????public?void?execute()?{??
- ????????System.out.println(Thread.currentThread().getName()?+?"?executes?"?+?this);??
- ????????try?{??
- ????????????Thread.sleep(random.nextInt(1000));??
- ????????}?catch?(InterruptedException?e)?{??
- ????????}??
- ????}??
- ????public?String?toString()?{??
- ????????return?"[?Request?from?"?+?name?+?"?No."?+?number?+?"?]";??
- ????}??
- }??
?
?
?
Java代码??
- public?class?ClientThread?extends?Thread?{??
- ????private?final?Channel?channel;??
- ????private?static?final?Random?random?=?new?Random();??
- ????public?ClientThread(String?name,?Channel?channel)?{??
- ????????super(name);??
- ????????this.channel?=?channel;??
- ????}??
- ????public?void?run()?{??
- ????????try?{??
- ????????????for?(int?i?=?0;?true;?i++)?{??
- ????????????????Request?request?=?new?Request(getName(),?i);??
- ????????????????channel.putRequest(request);??
- ????????????????Thread.sleep(random.nextInt(1000));??
- ????????????}??
- ????????}?catch?(InterruptedException?e)?{??
- ????????}??
- ????}??
- }??
?
?
?
Java代码??
- public?class?WorkerThread?extends?Thread?{??
- ????private?final?Channel?channel;??
- ????public?WorkerThread(String?name,?Channel?channel)?{??
- ????????super(name);??
- ????????this.channel?=?channel;??
- ????}??
- ????public?void?run()?{??
- ????????while?(true)?{??
- ????????????Request?request?=?channel.takeRequest();??
- ????????????request.execute();??
- ????????}??
- ????}??
- }??
?
?
Java代码??
- public?class?Channel?{??
- ????private?static?final?int?MAX_REQUEST?=?100;??
- ????private?final?Request[]?requestQueue;??
- ????private?int?tail;????
- ????private?int?head;????
- ????private?int?count;???
- ??
- ????private?final?WorkerThread[]?threadPool;??
- ??
- ????public?Channel(int?threads)?{??
- ????????this.requestQueue?=?new?Request[MAX_REQUEST];??
- ????????this.head?=?0;??
- ????????this.tail?=?0;??
- ????????this.count?=?0;??
- ??
- ????????threadPool?=?new?WorkerThread[threads];??
- ????????for?(int?i?=?0;?i?<?threadPool.length;?i++)?{??
- ????????????threadPool[i]?=?new?WorkerThread("Worker-"?+?i,?this);??
- ????????}??
- ????}??
- ????public?void?startWorkers()?{??
- ????????for?(int?i?=?0;?i?<?threadPool.length;?i++)?{??
- ????????????threadPool[i].start();??
- ????????}??
- ????}??
- ????public?synchronized?void?putRequest(Request?request)?{??
- ????????while?(count?>=?requestQueue.length)?{??
- ????????????try?{??
- ????????????????wait();??
- ????????????}?catch?(InterruptedException?e)?{??
- ????????????}??
- ????????}??
- ????????requestQueue[tail]?=?request;??
- ????????tail?=?(tail?+?1)?%?requestQueue.length;??
- ????????count++;??
- ????????notifyAll();??
- ????}??
- ????public?synchronized?Request?takeRequest()?{??
- ????????while?(count?<=?0)?{??
- ????????????try?{??
- ????????????????wait();??
- ????????????}?catch?(InterruptedException?e)?{??
- ????????????}??
- ????????}??
- ????????Request?request?=?requestQueue[head];??
- ????????head?=?(head?+?1)?%?requestQueue.length;??
- ????????count--;??
- ????????notifyAll();??
- ????????return?request;??
- ????}??
- }??
?
?
Java代码??
- public?class?Main?{??
- ????public?static?void?main(String[]?args)?{??
- ????????Channel?channel?=?new?Channel(5);?????
- ????????channel.startWorkers();??
- ????????new?ClientThread("Alice",?channel).start();??
- ????????new?ClientThread("Bobby",?channel).start();??
- ????????new?ClientThread("Chris",?channel).start();??
- ????}??
- } ?