消息队列对大多数人应该比较陌生。但是要提到MQ听说过的人会多很多。MQ就是英文单词"Message queue"的缩写,翻译成中文就是消息队列(我英语差,翻译错了请告知)。
PS:话说国人熟悉MQ比消息队列多,是不是因为国人的外语水平高于国语水平好几个数量级
1、看一下度娘怎么解释消息队列
参考链接:消息队列_百度百科
度娘解释消息队列是在两台计算机间传输的,套句很时髦的说法就是用来做分布式传输的,是个很高大上的东西
2、我的看法稍有不同
我更追溯到“消息队列”的字面“本源”的意思。我认为消息队列就是消息的管理容器工具
消息队列可以在“两台计算机间传输”,也可以同一台计算机不同进行进程间传输,甚至是同一进程内“传输”
3、消息队列使用主要场景
我认为主要有两种,一种是排队先进先出(也有加优先级的),另一种是消息发布订阅模式,当然两种方式“复合”使用也是可以的
4、消息队列主要解决什么问题
我们写的程序偶尔出现一些“灵异”问题。除了一般的业务逻辑bug外,主要就是折腾服务器了。比如,web服务器cpu满载、数据库cpu满载、内存满载、磁盘IO满载、网络带宽满载等等
我总结为两点,计算密集型问题(cpu满载)和资源密集型问题(内存、磁盘、网络)
我们要优化程序需要知道到底是哪种问题,针对不同问题进行不同的优化,优化一般无非“开源”和“节流”两种手段。
“开源”:增加计算能力(含增加cpu和服务器)和增加资源
“节流”:减少“多余”的逻辑和资源消耗
现实中的情况很复杂,有的时候很简单的逻辑(但资源耗费严重)也能导致cpu满载,我们认为程序是在“等”资源,其实它在“等”资源的时候依然吞噬了大量的“cpu”,所以把计算和资源消耗“拆分”开很多情况下更加有效
增加了cpu如何用得上,如果我们的程序是单线程,就算增加到256个cpu对性能改善也用处不大,另外增加了服务我们的逻辑是否还能完整,不同服务器的程序如何协同工作,那就是“消息队列”隆重登场的时候了。
前面太抽象,直接上例子了
一、消息订阅模式
1、消息订阅代码
public static void Test() { Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" }, new Soldier { Name = "士兵2" }, new Soldier { Name = "士兵3" } }; Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() }; SubscribeChannel<int> channel = new SubscribeChannel<int>() { MaxDegreeOfParallelism = 3 }; channel.Init(); channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new SubscribeConsume<int>() { Instance = item }); } while (producer.Run()) { } }
解读一下:
A:定义了一个"队伍"(士兵(Soldier)数组,等待接受命令(订阅长官的命令))
B:定义了一个“长官”(命令的生产(发布)者)
C:定义了一个消息订阅频道(SubscribeChannel)并把生产者和消费者都添加进去
这里面要重点说一下频道的重要性,大家“通信”必须在相同频道才可以相互沟通。不在一个频道,人民解放军是不能听美国将军的命令的
另一个方面也说明可以通过增加不同频道来建立多个消息队列,频道独立于生产者和消费者,同一个生产者或者消费者也可能同时为多个消息队列服务,Very Good!
D:不停的等待“长官”发布命令直至“长官”自己终止
2、F5
3、“指令”什么鬼,再看一些代码
class="code_img_closed" src="/Upload/Images/2016052305/0015B68B3C38AA5B.gif" alt="" />logs_code_hide('5b783cfd-3f6d-4de0-9888-6f56811e162c',event)" src="/Upload/Images/2016052305/2B1B950FA3DF188F.gif" alt="" />/// <summary> /// 命令操作 /// </summary> public class DirectiveAction : IResultInstance<int> { /// <summary> /// /// </summary> /// <param name="result"></param> /// <returns></returns> public bool Run(ref int result) { System.Threading.Thread.Sleep(2000); Console.Write(string.Concat(string.Concat(DateTime.Now.ToString("hh:mm:ss.fff"), " 请输入指令:"))); string str = Console.ReadLine(); Console.WriteLine(string.Concat("I say ", str)); if (string.Equals(str, "Exit", StringComparison.CurrentCultureIgnoreCase)) return false; result = IntConverter.Instance.Get(str); return true; } public void OnSuccess() { } public void OnFail() { } public void OnException(Exception ex) { } }命令操作
/// <summary> /// 士兵 /// </summary> public class Soldier : IArgInstance<int, bool> { /// <summary> /// 士兵名字 /// </summary> public string Name { get; set; } /// <summary> /// /// </summary> /// <param name="arg"></param> /// <param name="result"></param> /// <returns></returns> public bool Run(int arg, ref bool result) { Directive directive = (Directive)arg; string msg = null; switch (directive) { case Directive.LookForward: msg = "看前看"; break; case Directive.Left: msg = "向左转"; break; case Directive.Right: msg = "向右转"; break; case Directive.Behind: msg = "向后转"; break; case Directive.Attention: msg = "立正"; break; case Directive.Ease: msg = "稍息"; break; case Directive.EyeRight: msg = "向右看齐"; break; case Directive.EyeLeft: msg = "向左看齐"; break; } Console.WriteLine(string.Concat(DateTime.Now.ToString("hh:mm:ss.fff"), " ", Name, msg ?? string.Concat("未知指令(", arg.ToString(),")"))); result = !string.IsNullOrWhiteSpace(msg); return true; } /// <summary> /// /// </summary> /// <param name="arg"></param> public void OnSuccess(int arg) { } /// <summary> /// /// </summary> /// <param name="arg"></param> public void OnFail(int arg) { Console.WriteLine(string.Concat("指令(", arg, ")错误")); } /// <summary> /// /// </summary> /// <param name="arg"></param> /// <param name="ex"></param> public void OnException(int arg, Exception ex) { Console.WriteLine(string.Concat("指令(", arg, ")异常,", ex.Message)); } }士兵(执行命令)
/// <summary> /// 指令类型 /// </summary> public enum Directive { LookForward = 0,//看前看 Left = 1,//向左转 Right = 2,//向右转 Behind = 3,//向后转 Attention = 4,//立正 Ease = 5,//稍息 EyeRight = 6,//向右看齐 EyeLeft = 7//向左看齐 }
解释一下:
A:命令是枚举(也就是int数字)
B:对于生产者就是接受控制台输入的行文本,如果为“Exit”就结束,否则转化为int
C:消费者就是接受到一个int值转化为命令枚举(Directive),执行
4、输入几条命令玩玩
Begin 11:54:22.164 请输入指令:1 I say 1 12:04:23.475 士兵1向左转 12:04:23.475 士兵3向左转 12:04:23.475 士兵2向左转 12:04:25.491 请输入指令:2 I say 2 12:04:27.562 士兵2向右转 12:04:27.562 士兵1向右转 12:04:27.593 士兵3向右转 12:04:29.608 请输入指令:3 I say 3 12:04:31.343 士兵1向后转 12:04:31.343 士兵2向后转 12:04:31.343 士兵3向后转 12:04:33.359 请输入指令:0 I say 0 12:04:35.859 士兵1看前看 12:04:35.859 士兵2看前看 12:04:35.859 士兵3看前看 12:04:37.874 请输入指令:9 I say 9 12:04:45.983 士兵1未知指令(9) 12:04:45.983 士兵3未知指令(9) 12:04:45.983 士兵2未知指令(9) 12:04:47.999 请输入指令:2 I say 2 12:04:55.671 士兵1向右转 12:04:55.671 士兵2向右转 12:04:55.671 士兵3向右转 12:04:57.686 请输入指令:Exit I say Exit End
是不是很酷很好玩
A:由于使用的是并行计算(Parallel.ForEach),三个士兵的执行顺序是随机的,这样能更好的利用多cpu资源
B:呵呵,想不到使用电脑练兵这么简单,韩信要是能看到这一幕也只能“甘拜下风”了
二、排队模式
1、排队模式代码和订阅模式稍微不同
Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" } , new Soldier { Name = "士兵2" } , new Soldier { Name = "士兵3" } , new Soldier { Name = "士兵4" } , new Soldier { Name = "士兵5" } , new Soldier { Name = "士兵6" } , new Soldier { Name = "士兵7" } , new Soldier { Name = "士兵8" } , new Soldier { Name = "士兵9" } };soldiers
private static void Test1(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>(); Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
也解读一下:
A:这次队伍壮大了(哈哈,由于前面练兵成效不错,部队迅速扩充了三倍),所以单独出来以便复用
B:这次先定义了频道,而且这次的频道是队列频道(QueueChannel)
C:这里的消费者稍有不同,增加了一个Timer属性,是定时触发器(每1000毫秒触发一次)
注:现实项目中,在排队模式Timer属性并不是必须的,只是这里演示需要增加一个触发器。因为控制台输入我用来控制生产者了,消费者只能让触发器来控制了
D:另外这里新加一个(channel.Start())用来启动自己的调度和所有生产者和消费者的调度
这里只有消费者有调度,这行代码对频道和生产者没有影响
E:这个例子里面的生产者(DirectiveAction)和消费者(QueueConsumeService)是直接服用前面例子的
2、和前面例子一样,先输入几个命令玩玩,看看结果
Begin 12:43:04.369 请输入指令:1 I say 1 12:43:09.438 士兵9向左转 12:43:10.642 请输入指令:3 I say 3 12:43:29.751 士兵7向后转 12:43:30.861 请输入指令:8 I say 8 12:43:42.929 士兵4未知指令(8) 12:43:44.898 请输入指令:Exit I say Exit End
生产者生产一个命令,9个消费者随机一个抢到执行
其实就是9个消费者“线程”等待命令执行,大大扩充了消费的效率
当然生产者也可以是多个,也同时生产,这样多多的效率是不是比我们一般同步执行效果高不少啊
PS:但是这里我想到了一个问题,9个消费者9个独立的“线程”,效率确实高,但是如果cpu满载了,别说9个“线程”都不能好好的工作(运行),如果再开多个生产者,大家一起来竞争cpu资源
2.1 cpu资源可能出现恶意竞争,cpu大量浪费
其实线程间不断切换的成本也是挺高的,也是要考虑的因素之一
2.2 cpu资源分配不周
如果大量消费者使用cpu,生产者可能拿到的cpu不够,生产的"消息"不够消费者使用
如果大量生产者使用cpu,导致生产堆积,消费者消费不过来也是很糟糕,整体的执行速度还是很慢(甚至还不如单线程)
3、我们看一个优化这个问题的例子代码
private static void Test2(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { Timer = new Job.JobTimer { Interval = 3000 }, BufferAlert = 10 }; DirectiveProduce producer = new DirectiveProduce { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
和前面代码区别很小,只是给队列频道(QueueChannel)加了一个时间调度(Timer),另外增加了一个属性BufferAlert(值为10,表示如果队列的长度超过这个值就报警(可能触发减少生产者增加消费者策略))
很明显,现在除了生产者和消费要消耗cpu外,队列频道也要消耗cpu;有人说你这样能更快鬼才信
哈哈,这里面“水”就深了。队列频道的调度不是白加的,会定时检查队列的情况
如果队列在增加,它会尝试减少生产者增加消费者,甚至停掉所有生产者
如果队列在较少,它会尝试减少消费者增加生产者,甚至停掉所有消费者
什么个意思呢,cpu也是资源,优化需要从开源和节流两方面下手,你一味增加逻辑线程物理线程不够用了只有害处没有好处;这个时候停掉一些其他的逻辑线程来释放物理线程就比增加逻辑线程更有效
4、继续看测试结果
01:09:44.553 请输入指令:1 I say 1 01:09:46.631 士兵7向左转 01:09:47.662 请输入指令:4 I say 4 01:09:48.647 士兵3立正 01:09:50.100 请输入指令:2 I say 2 01:10:58.621 士兵9向右转 01:11:00.277 请输入指令:1 I say 1 01:11:04.701 士兵9向左转 01:11:05.967 请输入指令:9 I say 9 01:11:14.842 士兵9未知指令(9)
A:前两条还是9个消费者“抢着”执行
B:后面三条就都是最后一个消费(士兵9)一个执行了,另外8个消费者被“优化”掉了
有的人可能看不明白了,既然9个消费者你要“优化”掉8个,你还不如只定义一个消费者
哈哈别急,另外8个消费者还在呢,随时待命呢,我们来个模拟短暂并发
5、先看看模拟代码
public class DirectiveProduce : Produce<int> { public override object SendMessage(Data.IEntityAdd<int> channel, int message) { if (message < 10) return base.SendMessage(channel, message); for (int i = message; i > 0; i--) { base.SendMessage(channel, i % 8); } return message; } }
我们对生产者做了改动,如果传了的消息>=10,我们就把这个消息拆分为n份来发送,这样必然导致消息堆积
6、检测一下消息堆积后的效果
01:20:48.089 请输入指令:2 I say 2 01:23:19.149 士兵9向右转 01:23:20.899 请输入指令:4 I say 4 01:23:21.180 士兵9立正 01:23:22.914 请输入指令:19 I say 19 01:23:36.337 士兵9向后转 01:23:37.353 士兵9向右转 01:23:38.291 请输入指令:01:23:38.353 士兵9向左转 01:23:38.431 士兵6向左看齐 01:23:38.431 士兵7稍息 01:23:38.431 士兵2向右转 01:23:38.431 士兵8看前看 01:23:38.431 士兵4向右看齐 01:23:38.431 士兵3立正 01:23:38.431 士兵1向左转 01:23:38.431 士兵5向后转 01:23:39.369 士兵9看前看 01:23:39.447 士兵4向左看齐 01:23:39.447 士兵1向右看齐 01:23:39.447 士兵5向右转 01:23:39.447 士兵7稍息 01:23:39.447 士兵8立正 01:23:39.447 士兵6向后转 01:23:39.447 士兵3向左转 5 I say 5 01:25:47.964 士兵9稍息 01:25:49.230 请输入指令:6 I say 6 01:25:49.980 士兵9向右看齐
A:程序启动我故意等了一会再输入,就是等把消费者线程优化后再测试
B:前面两个指令是“士兵9”执行的毫无疑问
C:后面我批量模拟了19个指令,其他消费者陆续都出来了
消费者的周期是1秒,19个指令9个消费者全开启了,3秒就全部执行完了,效果是不是杠杠的
D:后来我又等了一会,再输入两个指令又都只有“士兵9”在执行,也就是说执行批处理后,其他8个消费者又陆续被“优化”掉了
这个是优化消费者,如果是生产者也是时间调取也会被优化,效果是一样,这里就不举例了
PS:这里我还是发现一个问题,时间调度(Timer)的问题,如果时间调度周期设置太短,执行任务是更加及时,但是cpu耗费太多,设置太长效率又太低,而并发总是在不经意发生,虽然可以动态增加生产者和消费者,但是生产者和消费的线程还是可能浪费严重。
这个我深有感触,我原来开发消息队列初期这个周期是写死,有的业务任务是每天甚至是每周、每月执行一次,周期设太小浪费cpu;有的业务偶发每秒10多个甚至几十个,每秒一个调度都远不够用
为了这个问题,我修改了多次调度周期的值,都无法满足各种需求,而且消息队列还时常成为cpu和内存的杀手,偶发cpu满载、内存泄露、进程假死,一段悲催的时光
后来,我”发明“了一个东西,我给它取名为”双向变速齿轮“,这个是我加班坐滚梯去餐厅吃饭时想到的。你看着滚梯在慢速运行,你一踩上去电梯就加速了,你下来后不久,电梯又变慢了。我就尝试在自己的消息队列中实现这样的逻辑。等这个逻辑成熟了,每种任务都基本上相安无事了,结果是So Perfect。
7、变速齿轮消息队列例子代码
private static void Test3(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { Timer = new Job.JobTimer { Interval = 3000 }, BufferAlert = 10 }; DirectiveProduce producer = new DirectiveProduce { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); NumIncreaser increaser = new NumIncreaser { Min = 2000, Max = 60000 }; NumLower lower = new NumLower { Min = 5, Max = 1000 }; foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.AutoJobTimer { Interval = 1000, Increaser = increaser, Lower = lower } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
这个例子和前面例子区别很小,就是把JobTimer换成AutoJobTimer,AutoJobTimer有两个属性,Increaser是周期变长(减速齿轮),Lower是周期变短(加速齿轮)
Increaser是在2秒和1分钟之间变动
Lower是在5毫秒到1秒钟之间变动
也就是说,这样设置把消费者实际(一般情况下轮训)的周期从1秒拉长为1分钟,节省了98%的轮训cpu
但是却在高并发的情况下加速99%
字面上算起来就是这样,具体效果我来测试一下
8、变速齿轮运行结果
A:以上例子是并发1000个消息,由于控制台一页已经显示不下了,直好把最前和最后各截图一张
B:前后3秒1000个任务执行完毕,对比前面没加变速齿轮的是3秒执行了19个任务,初算下来加速98%(这个测试会受到采集数据的精度及每次测试的偶发情况稍后不同)
以上都是进程内的消息队列测试,分布式的消息队列更高大上,能不能也支持很好呢?
当然可以,前面展示的很清楚,消息队列由生产者、消费者、消息频道(排队频道和订阅频道)三大块组成,且每一块都相互独立,要接入分布式按此模式即可,很多情况下只需要定义其中一块甚至只是其中一块的一部分
三、MSMQ(微软消息队列)的例子
1、先看生产者测试代码
public static void Test() { Fang.MSMQ.Queue<int> queue = new Fang.MSMQ.Queue<int>(); queue.Init(); Test1(queue); } private static void Test1(IEntityQueue<int> queue) { QueueChannel<int> channel = new QueueChannel<int>() { QueueBuffer = queue }; Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); channel.Init(); channel.Start(); while (producer.Run()) { } }
注:以上代码和前面的代码还是很相似,区别如下:
A:这个代码中只有频道和生产者,消息频道和生产者的类和以前的一样(使用MSMQ不需要定义新的消息频道和生产者)
B:消息频道指定了使用MSMQ的队列(封装MSMQ实现框架的IEntityQueue接口即可)
面向接口可扩展框架嘛,认得是接口并不是具体实现
2、再看消费者测试代码
public static void Test() { Fang.MSMQ.Queue<int> queue = new Fang.MSMQ.Queue<int>(); queue.Init(); Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" } , new Soldier { Name = "士兵2" } , new Soldier { Name = "士兵3" } , new Soldier { Name = "士兵4" } , new Soldier { Name = "士兵5" } , new Soldier { Name = "士兵6" } , new Soldier { Name = "士兵7" } , new Soldier { Name = "士兵8" } , new Soldier { Name = "士兵9" } }; Test1(queue, soldiers); }
private static void Test1(IEntityQueue<int> queue, Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { QueueBuffer = queue }; foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); }
以上生产者和消费在不同的控制台程序中测试,模拟两台计算机
A:这个代码中只有频道和消费者,消息频道和生产者的类和以前的一样(使用MSMQ不需要定义新的消息频道和生产者)
消息频道定义和生产者完全一样,其实是两台计算机(或者两个进程),写程序的时候就像是一台一样
B:消息频道指定了使用MSMQ的队列(封装MSMQ实现框架的IEntityQueue接口即可)
3、分别执行结果如下
生产者和消费者都在随时待命
4、输入几个命令玩玩
生产者和消费者的执行时间差在2秒左右,含我操作输入时间,算我输入1秒左右,生产者和消费者时间差在1秒左右
注:不要纠结生产者的第一个提示信息时间,因第一个提示信息后我并没有立即输入,等了一会导致较大时间差
5、为了说明真的是走的MSMQ,我们先把消费者关掉,看系统的消息队列列表
注:以上是我又生产的4个消息
注:以上是MSMQ消息队列截图
注:以上是最后一条消息的正文截图
可以看到,这里使用的xml序列化的方式,有人可能会说,xml不好,性能不好等云云,其实对于使用哪种序列化也是面向接口的,完全可以配置
5、用Json发个消息的代码
public static void TestJson() { Fang.Serialization.DataContractJson.Provider provider0 = new Fang.Serialization.DataContractJson.Provider(); Fang.Serialization.Formater<QueueTest, System.IO.Stream> formater = new Serialization.Formater<QueueTest, System.IO.Stream>() { Provider = provider0 }; Fang.MSMQ.Queue<QueueTest> queue = new Fang.MSMQ.Queue<QueueTest>() { Name = "QueueTest", Serializer = formater, Deserializer = formater }; queue.Init(); QueueChannel<QueueTest> channel = new QueueChannel<QueueTest>() { QueueBuffer = queue, }; Produce<QueueTest> producer = new Produce<QueueTest>() { }; channel.AddProducer(producer); channel.Init(); channel.Start(); producer.SendMessage(new QueueTest { Name = "Test1" }); }
在新的消息队列中使用Json序列化的方式发的
6、MSMQ消息队列非常简单,公布源代码给大家看一下(还没完全写完,很多功能待完善)
/// <summary> /// MSMQ队列 /// </summary> /// <typeparam name="TEntity"></typeparam> public class Queue<TEntity> : Formatter<TEntity>, IEntityQueue<TEntity>, IUnitInit { #region 定义 /// <summary> /// /// </summary> public Queue() { Host = "."; Path = "Private$"; Name = "Default"; BufferSize = Int16.MaxValue; if (ReceiveTimeOut < 1) ReceiveTimeOut = 1000D; _receiveTime = TimeSpan.FromMilliseconds(ReceiveTimeOut); } private MessageQueue _provider; /// <summary> /// 队列功能支持原始对象 /// </summary> public MessageQueue Provider { get { return _provider; } } private TimeSpan _receiveTime; /// <summary> /// /// </summary> public string Path { get; set; } /// <summary> /// /// </summary> public string Name { get; set; } /// <summary> /// 主机 /// </summary> public string Host { get; set; } /// <summary> /// 单位(Millisecond) /// </summary> public double ReceiveTimeOut { get; set; } #endregion #region IEntityQueue<TEntity> /// <summary> /// /// </summary> /// <param name="entity"></param> /// <returns></returns> public bool TryDequeue(ref TEntity entity) { if (_provider == null) return false; Message message = null; try { message = _provider.Receive(_receiveTime); } catch { return false; } if (message == null) return false; return Transform.TryConvert<object, TEntity>(message.Body, ref entity); } /// <summary> /// /// </summary> /// <returns></returns> public TEntity Dequeue() { TEntity entity = default(TEntity); TryDequeue(ref entity); return entity; } /// <summary> /// /// </summary> /// <param name="entity"></param> /// <returns></returns> public bool Enqueue(TEntity entity) { if (entity == null || _provider == null) return false; Message message = new Message(); message.Body = entity; message.Formatter = this; try { _provider.Send(message); } catch { return false; } return true; } /// <summary> /// /// </summary> public int BufferSize { get; set; } /// <summary> /// /// </summary> /// <returns></returns> public int Count() { if (_provider == null) return 0; return 100; } #endregion #region IUnitInit /// <summary> /// 初始化 /// </summary> public void Init() { string fullName = string.Concat(Host, "\\", Path, "\\", Name); if (!MessageQueue.Exists(fullName)) { MessageQueue.Create(fullName); } _provider = new MessageQueue(fullName); _provider.Formatter = this; } #endregion /// <summary> /// 清空 /// </summary> public void Clear() { if (_provider == null) return; _provider.Purge(); } //public static string CreatePath(string host,string path) //{ //} }Queue
/// <summary> /// MSMQ格式化(序列化和反序列化) /// </summary> public class Formatter<TEntity> : IMessageFormatter, ICloneable { /// <summary> /// /// </summary> public Formatter() { Capacity = 1024; } #region 配置 /// <summary> /// /// </summary> public int Capacity { get; set; } /// <summary> /// 序列化工具 /// </summary> public ISerialize<TEntity, Stream> Serializer { get; set; } /// <summary> /// 反序列化工具 /// </summary> public IDeserialize<Stream, TEntity> Deserializer { get; set; } #endregion #region IMessageFormatter /// <summary> /// /// </summary> /// <param name="message"></param> /// <returns></returns> public bool CanRead(Message message) { return message.BodyStream != null && message.BodyStream.Length > 0; } /// <summary> /// /// </summary> /// <param name="message"></param> /// <returns></returns> public object Read(Message message) { if (message == null) return null; IDeserialize<Stream, TEntity> instance = Deserializer ?? GlobalServices.Instance.CreateDeserializer<Stream, TEntity>(); TEntity entity = default(TEntity); try { Transform.TryDeserialize<Stream, TEntity>(instance, message.BodyStream, ref entity); } catch { } return entity; } /// <summary> /// /// </summary> /// <param name="message"></param> /// <param name="obj"></param> public void Write(Message message, object obj) { TEntity entity = default(TEntity); if (!Transform.TryConvert<object, TEntity>(obj, ref entity)) return; Stream stream = new System.IO.MemoryStream(Capacity); ISerialize<TEntity, Stream> instance = Serializer ?? GlobalServices.Instance.CreateSerializer<TEntity, Stream>(); try { Transform.TrySerialize<TEntity, Stream>(instance, entity, ref stream); } catch { } message.BodyStream = stream; } #region ICloneable /// <summary> /// /// </summary> /// <returns></returns> public object Clone() { return this; } #endregion #endregion }MSMQ格式化
消息队列队列这个组件我开发测试了比较长时间,也是项目开发关键。以后还要整合为框架的基础服务,这样用起来就更加方便和简单。
分布式消息队列一直都是高大上的东西,很多分布式架构都是以分布式消息队列为基石来构架。
我有一句话经常说,”好钢要用在刀刃上,好东西不能滥用“。分布式消息队列也是,我们用分布式任务解决计算密集型问题还是资源密集型问题。如果都不是,可不可以尝试更多的内存(进程内)消息队列,说不定惊喜多多啊。
我也力争让分布式消息队列和内存(进程内)消息队列编码上几乎没有区别,这样”两种“消息队列叶可以很方便的切换。
网上有很多优秀开源消息队列,用法各异,我打算封装为一致的api(只是配置不一样),来把消息队列发扬广大。