遇到这样一个问题:有两个线程同时操作一套数据。线程1会不断生产出新的数据。线程2会把最新的数据进行汇报。线程1可能会非常频繁非常快的更新数据,而线程2会以固定频率将最新数据进行汇报(可以想象成把数据显示在界面上)。
优化需要注意的问题:
1. 尽量不要对线程1的执行进行干扰,线程1运行得越快越好。
2. 线程2应该检查数据有没有被更新,如果线程1没有更新数据,线程2不会无限制地继续汇报数据。
3. 最新的数据必须被汇报。
更新:
看评论发现应该是我没有解释清楚问题,网友提出的问题都间接折射着一个疑惑“为什么要用线程2,线程1更新完数据直接汇报不行吗?”
这样解释我当前遇到的问题吧:
更新数据的线程(也就是本文所说的线程1)是一直在运行的,不是更新一次运行一次,而是在运行后不断地更新数据,更新频率是不稳定的,可能会非常快的更新数据(比如1秒10万次),也可能由于网络问题造成内部逻辑在等待,因而会比较慢的更新数据(比如10分钟才更新一次)。
OK,之所以引入线程2是因为数据的更新需要显示在界面上,而如果线程1直接去更新界面,超频繁的调用BeginInvoke会严重影响界面的相应性(界面会卡死)。所以引入线程2,它本质上就是一个Timer,会以一定的频率去把线程1产生的最新结果反馈给界面。
如果用lock的话,线程2肯定无所谓,但是线程1会有不小的损耗,想想如果线程1每秒更新10万次,每次都要获取lock然后释放lock显然会令人不爽。
另外还有一种想法也是不用线程2,让线程1去更新界面,只不过线程1每次更新数据后都会判断与上次更新的时间间隔,超过xxx时间后才会去更新界面。这样做,首先当然会影响线程1的性能。其次会有一个致命问题:
假设这里限制每隔500ms更新一次界面。如果线程1在一个时间段更新了数据A,并更新了界面。接着数据立即被更新成B,但是这时还没有超过500ms,于是线程1不更新界面。接着如果线程1遇到网络问题,长时间不更新数据,那么已经最新的数据B永远不会被更新到界面上。这也是上面需要注意的问题3的意义。
所以无论尝试何种方法,我个人认为必须要引入另一个独立的线程做Timer。而本文尝试解决的问题就是在引入线程2后,怎样既确保程序正确工作,又不用给线程1加入多余的性能损耗。
因此某系方案可能会被立即否定。比如加入lock,lock确实是很万能的,但是注意线程1可能会非常频繁得更新数据,加上lock会大大影响线程1的性能。
还比如,我们可以确保最新数据的写入和读取是具备原子性的。然后线程1不断更新数据,线程2在指定时间间隔内读取最新数据就可以了。但显然这种方案违背了上述要求2所要求的的“线程2不会无限制地汇报没有更新的数据”。
(关于原子性,以前写过一篇文章:.NET(C#):再议值类型 - 原子性。这里就不重复说明了。)
我是这样尝试优化这个问题的,首先,这个问题应该包含两个数据,一个是数据本身,另一个是数据是否被更新的flag。由于我们不用lock,那么首先得保证这两个数据单独写入的原子性。
接着模拟一个测试环境,比如假设我们的线程1会不断地把当前时间作为数据进行更新。
最终,拟定这样声明我们所需要的数据字段:
//是否更新的Flag
//注意bool本身读写也是具有原子性的,这里使用int是因为Interlocked类型中没有bool的相关重载。
int isRefreshed;
//使用DateTime的二进制形式作为最新的数据,之后会使用Interlocked类型来确保原子性读写。
long latestData;
可以看到,为了保证每个数据的原子性操作,我们的数据声明都看起来有点不正常哈哈。还有我曾将在.NET(C#):再议值类型 - 原子性这篇文章中提到过使用装箱和拆箱操作可以立即把值类型的读写操作原子化,但是注意线程1会频繁更新数据,这里如果这样做的话,会大大浪费性能和空间的。因为每次装箱都会在堆中申请一片新的空间。
OK,数据准备好后,来看看两个线程是怎样具体执行的。我们为线程1和线程2的执行方法分别起名字叫:Update和Collect方法。这两个方法代码如下:
//+ using System.Threading;
void Update()
{
//更新latestData
//这里要求latestData的赋值操作必须具备原子性,因为此时另一个线程可能在读取latestData
Interlocked.Exchange(ref latestData, DateTime.Now.ToBinary());
isRefreshed = 1;
}
void Collect()
{
//返回isRefreshed,如果isRefreshed为1,isRefreshed现在为0。
var locIsRefreshed = Interlocked.CompareExchange(ref isRefreshed, 0, 1);
if (locIsRefreshed != 0) //判断如果更新
{
var data = Interlocked.CompareExchange(ref latestData, 0, 0);
Console.WriteLine(DateTime.FromBinary(data));
}
}
注意:
上述代码均在资源访问处使用了Interlocked类型,Interlocked类型会自动加入内存屏障(Memory Barrier),在无锁(Lock-Free)编程中,正确加入内存屏障是至关重要的。其作用是会禁止任何可能会打乱代码执行顺序的优化,这种优化可能来自编译器,JIT甚至是CPU本身。因为这种优化往往是针对单线程执行态,所以在这种多线程资源共享且不用lock的环境下,加入内存屏障是非常重要的。加入内存屏障有多种方式,Thread类型的MemoryBarrier,VolatileRead和VolatileWrite方法,还有C#的volatile关键字,以及Interlocked类型本身都会提供内存屏障。
OK,有了内存屏障,我们不用担心代码顺序会不会颠倒,然后代码逻辑就可以确定了,我们可以把每个方法分为两个部分,因此两个数据本身的读取和写入已经可以保证是原子性的了。
如下图:
可以看到每个方法的每个块都和另一个方法的另一个块依赖同一个数据。注意上图的绿线,它代表一种依赖关系,线程1是先设置数据(latestData字段),后设置flag(isRefreshed字段)。而线程2会直接读取当前的flag,然后根据flag,决定是否汇报当前数据,而线程2执行方法的第二部分完全依赖于第一部分的结果。因为如果两个方法交错执行的话,此时数据肯定已经被更新(也就是说线程1的第一部分肯定已经执行)。所以当两个方法交错后,只会发生一种情况,就是数据更新但是flag没有更新。
接下来分析下这种情况下线程2是否能将最新的数据进行汇报:
如果线程2获取的flag为0(说明之前的数据已经被更新),被更新的数据将不会被汇报。但是注意由于方法交错,且线程2执行方法的第二部分只依赖于第一部分,所以最终线程1会设置isRefreshed为1(线程1执行方法的第二部分会执行)。因此即便是被设置的最新数据没有被更新,由于flag为1,下一次线程2还是会把最新的数据更新的。
另一种情况,如果获取的flag为1,则被更新的数据会被立即汇报,但是上面讲过,由于线程1执行方法的第二部分会在交错中执行,在下一次线程2方法执行中,这个数据可能会再次被汇报一次,不过可以肯定的是绝对不会出现连续三次汇报的情况,因为线程2执行方法的第一部分中的CompareExchange方法会把isRefreshed字段清0。另外,如果线程2的下一次运行又发生了和线程1的执行方法交错,最新数据会再次被覆盖,最终也只会被汇报一次。所以不管怎么样,结果还是我们想要的。确保最新的数据一定会被汇报,且没有用到任何lock。
随便写点测试代码运行下两个线程:
public void Run()
{
ThreadPool.QueueUserWorkItem(state =>
{
while (true)
{
Thread.Sleep(300);
Update();
}
});
_timer = new Timer((state) => Collect(), null, 0, 1000);
}
//Timer字段和销毁方法
Timer _timer;
public void Dispose()
{
if (_timer != null)
_timer.Dispose();
}
代码中线程1的更新间隔是300ms,线程2是1000ms。而如果把线程1的更新间隔改成1000ms,线程2的改成300ms。
运行起来结果都类似这样:
源代码下载
下载页面
注意:链接是微软SkyDrive页面,下载时请用浏览器直接下载,用某些下载工具可能无法下载
源代码环境:Microsoft Visual Studio Express 2013 RC for Windows Desktop