C# 多线程 02 线程同步
🏷️ 《C# 多线程》
实现线程同步的方法
如果无须共享对象,那么就无须进行线程同步
只是用原子操作
内核模式(kernel-mode)
将等待的线程置于阻塞状态。
当线程处于阻塞状态时,只会占用尽可能少的 CPU 时间。
然而,这意味着将引入至少一次所谓的上下文切换(context switch)。
上下文切换是指操作系统的线程调度器。该调度器会保存等待的线程的状态,并切换到另一个线程,依次恢复等待的线程的状态。这需要消耗相当多的资源。
用户模式(user-mode)
不将线程切换到阻塞状态。
该模式非常轻量,速度很快,但如果线程需要等待较长时间则会浪费大量的 CPU 时间。
混合模式(hybrid)
混合模式会先尝试使用用户模式等待,
如果线程等待了足够长的时间,则会切换到阻塞状态以节省 CPU 资源。
执行基本的原子操作
借助于Interlocked
类,无需锁定任何对象即可获得正确的结果。
private int _count;
public int Count => _count;
public override void Decrement()
{
Interlocked.Decrement(ref _count);
}
public override void Increment()
{
Interlocked.Increment(ref _count);
}
2
3
4
5
6
7
8
9
10
11
12
13
使用 Mutex 类
onst string MutexName = "CSharp Threading Cookbook";
// Mutex 是一种原始的同步方式,其只对一个线程授予对共享资源的独占访问。
// 具名的互斥量是全局的操作系统对象,务必正确关闭互斥量。
// 最好是使用 using 代码快来包裹互斥对象。
using (var m = new Mutex(false, MutexName))
{
if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
{
Console.WriteLine("Second instance is running");
Console.ReadLine();
} else
{
Console.WriteLine("Running");
Console.ReadLine();
m.ReleaseMutex();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
使用 SemaphoreSlim 类
SemaphoreSlim
类限制了同时访问同一个资源的线程数量。
// 构造函数中指定允许的并发线程数量
// 这里指定了并发数为 4 个线程
// 当有 4 个线程获取了资源后,其它的线程需要等待
static SemaphoreSlim _samphore = new SemaphoreSlim(4);
static void Main(string[] args)
{
for (int i = 0; i < 6; i++)
{
string threadName = "Thread " + i;
int secondsToWait = 2 + 2 * i;
var t = new Thread(() => AccessDatabase(threadName, secondsToWait));
t.Start();
}
Console.ReadLine();
}
static void AccessDatabase(String name, int secondes)
{
Console.WriteLine($"{name} wait to access a database");
// 调用 Wait 方法获取资源,当超过最大指定并发数量时,则需要等待其它线程释放资源
_samphore.Wait();
Console.WriteLine($"{name} was granted an access to a database");
Thread.Sleep(TimeSpan.FromSeconds(secondes));
Console.WriteLine($"{name} is completed");
// 调用 Release 方法释放资源
_samphore.Release();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
这里使用了混合模式,其允许在等待时间很短的情况下无需使用上下文切换。
还有一个叫Semaphore
的SemaphoreSlim
类的老版本。该版本使用纯粹的内核时间(kernel-time)方法。
一般没必要使用它,除非是非常重要的场景。SemaphoreSlim
并不适用 Windows 内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用Semaphore
.
使用 AutoResetEvent 类
static void Main(string[] args)
{
var t = new Thread(() => Process(10));
t.Start();
Console.WriteLine("Waiting for another thread to complete work");
_workerEvent.WaitOne();
Console.WriteLine("First operation is completed!");
Console.WriteLine("Performing an operation on a main thread");
Thread.Sleep(TimeSpan.FromSeconds(5));
_mainEvent.Set();
Console.WriteLine("Now running the second operation on a second thread");
_workerEvent.WaitOne();
Console.WriteLine("Second operation is completed");
Console.ReadLine();
}
// 参数为 false,定义了初始状态为 unsignaled
// 这意味着任何线程调用这个对象的 WaitOne 方法将会被阻塞,直到调用了 Set 方法
private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
private static AutoResetEvent _mainEvent = new AutoResetEvent(false);
// 如果参数为 true,则初始状态为 signaled,如果线程调用 WaitOne 方法则会被立即处理,然后事件状态自动变为 unsignaled,
// 所以需要对该实例调用一次 Set 方法,以便让其他的线程对该实例调用 WaitOne 方法从而继续执行。
static void Process(int seconds)
{
Console.WriteLine("Starting a long running work...");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine("Work is done!");
_workerEvent.Set();
Console.WriteLine("Waiting for a main thread to complete its work");
_mainEvent.WaitOne();
Console.WriteLine("Starting second operation...");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine("Work is done!");
_workerEvent.Set();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
执行结果
Waiting for another thread to complete work
Starting a long running work...
Work is done!
Waiting for a main thread to complete its work
First operation is completed!
Performing an operation on a main thread
Now running the second operation on a second thread
Starting second operation...
Work is done!
Second operation is completed
使用 ManualResetEventSlim 类
static void Main(string[] args)
{
var t1 = new Thread(() => TravelThroughGates("Thread 1", 5));
var t2 = new Thread(() => TravelThroughGates("Thread 2", 6));
var t3 = new Thread(() => TravelThroughGates("Thread 3", 12));
t1.Start();
t2.Start();
t3.Start();
Thread.Sleep(TimeSpan.FromSeconds(6));
Console.WriteLine("门打开啦!");
_mainEvent.Set();
Thread.Sleep(TimeSpan.FromSeconds(2));
_mainEvent.Reset();
Console.WriteLine("门关上啦!");
Thread.Sleep(TimeSpan.FromSeconds(10));
Console.WriteLine("门又打开啦!");
_mainEvent.Set();
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("门又关上啦!");
_mainEvent.Reset();
Console.ReadLine();
}
static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);
static void TravelThroughGates(string threadName, int seconds)
{
Console.WriteLine($"{threadName} 睡眠 {seconds} 秒");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{threadName} 等待门打开!");
_mainEvent.Wait();
Console.WriteLine($"{threadName} 已进门!");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
输出结果
Thread 1 睡眠 5 秒
Thread 2 睡眠 6 秒
Thread 3 睡眠 12 秒
Thread 1 等待门打开!
门打开啦!
Thread 1 已进门!
Thread 2 等待门打开!
Thread 2 已进门!
门关上啦!
Thread 3 等待门打开!
门又打开啦!
Thread 3 已进门!
门又关上啦!
ManualResetEventSlim
是ManualResetEvent
的混合版本,一直保持大门敞开直到手动调用 Reset 方法。
使用 CountDownEvent 类
static void Main(string[] args)
{
Console.WriteLine("开始两个操作");
var t1 = new Thread(() => PerformOperation("操作 1 已经完成", 4));
var t2 = new Thread(() => PerformOperation("操作 2 已经完成", 8));
t1.Start();
t2.Start();
// 直到_countdown 的计数变为 0 才会继续执行
_countdown.Wait();
Console.WriteLine("两个操作都已经完成");
_countdown.Dispose();
Console.ReadLine();
}
static CountdownEvent _countdown = new CountdownEvent(2);
static void PerformOperation(string message, int seconds)
{
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine(message);
// _countdown 的计数减 1
_countdown.Signal();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
打印结果
开始两个操作
操作 1 已经完成
操作 2 已经完成
两个操作都已经完成
使用 Barrier 类
// Barrier 类用于组织多个线程及时在某个时刻碰面
// 其提供一个回调函数,每次线程调用了 SignalAndWait 方法后该回调函数会被执行
static Barrier _barrier = new Barrier(2, b => Console.WriteLine($"结束阶段 {b.CurrentPhaseNumber + 1}"));
static void Main(string[] args)
{
var t1 = new Thread(() => PlayMusic("吉他手", "弹一段 Solo", 5));
var t2 = new Thread(() => PlayMusic("歌手", "唱歌", 2));
t1.Start();
t2.Start();
Console.ReadLine();
}
static void PlayMusic(string name, string message, int seconds)
{
for (int i = 1; i < 3; i++)
{
Console.WriteLine("----------------------------------------");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{name} 开始 {message}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
Console.WriteLine($"{name} 完成了 {message}");
_barrier.SignalAndWait();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
打印结果
----------------------------------------
----------------------------------------
歌手 开始 唱歌
歌手 完成了 唱歌
吉他手 开始 弹一段Solo
吉他手 完成了 弹一段Solo
结束阶段 1
----------------------------------------
----------------------------------------
歌手 开始 唱歌
歌手 完成了 唱歌
吉他手 开始 弹一段Solo
吉他手 完成了 弹一段Solo
结束阶段 2
2
3
4
5
6
7
8
9
10
11
12
13
14
使用 ReaderWriterLockSlim 类
// ReaderWriterLockSlim 代表一个管理资源访问的锁,允许多个线程同时读取,以及独占写
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static Dictionary<int, int> _items = new Dictionary<int, int>();
static void Main(string[] args)
{
new Thread(Read) { IsBackground = true, Name = "Read Thread 1"}.Start();
new Thread(Read) { IsBackground = true, Name = "Read Thread 2" }.Start();
new Thread(Read) { IsBackground = true, Name = "Read Thread 3" }.Start();
new Thread(() => Write("Write Thread 1")) { IsBackground = true }.Start();
new Thread(() => Write("Write Thread 2")) { IsBackground = true }.Start();
// 30s 后主线程结束
Thread.Sleep(TimeSpan.FromSeconds(30));
}
static void Read()
{
Console.WriteLine("读取字典中的内容");
while (true)
{
try
{
// 获取读锁(允许多个线程同时获取读锁)
_rw.EnterReadLock();
foreach (var key in _items)
{
// Console.WriteLine($"线程 {Thread.CurrentThread.Name} : {key}");
Thread.Sleep(TimeSpan.FromSeconds(0.1));
}
}
finally
{
// 在 finally 中释放锁,确保锁最终会被释放
_rw.ExitReadLock();
}
}
}
static void Write(string threadName)
{
while (true)
{
try
{
int newKey = new Random().Next(250);
// 获取可升级读锁
_rw.EnterUpgradeableReadLock();
if (!_items.ContainsKey(newKey))
{
try
{
// 等待所有的读锁释放后获取写锁,此时所有的获取读锁操作会被阻塞
_rw.EnterWriteLock();
_items[newKey] = 1;
Console.WriteLine($"新 Key {newKey} 被 {threadName} 加入字典");
}
finally
{
// 在 finally 中释放锁,确保锁最终会被释放
_rw.ExitWriteLock();
}
}
Thread.Sleep(TimeSpan.FromSeconds(0.1));
}
finally
{
// 在 finally 中释放锁,确保锁最终会被释放
_rw.ExitUpgradeableReadLock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
打印结果
读取字典中的内容
读取字典中的内容
读取字典中的内容
新 Key 64 被 Write Thread 1 加入字典
新 Key 230 被 Write Thread 1 加入字典
新 Key 243 被 Write Thread 1 加入字典
新 Key 125 被 Write Thread 1 加入字典
新 Key 152 被 Write Thread 2 加入字典
新 Key 165 被 Write Thread 1 加入字典
新 Key 114 被 Write Thread 1 加入字典
新 Key 183 被 Write Thread 1 加入字典
新 Key 119 被 Write Thread 1 加入字典
新 Key 171 被 Write Thread 1 加入字典
新 Key 30 被 Write Thread 1 加入字典
新 Key 61 被 Write Thread 2 加入字典
新 Key 87 被 Write Thread 2 加入字典
新 Key 195 被 Write Thread 2 加入字典
新 Key 40 被 Write Thread 2 加入字典
新 Key 202 被 Write Thread 2 加入字典
新 Key 59 被 Write Thread 2 加入字典
新 Key 104 被 Write Thread 2 加入字典
新 Key 15 被 Write Thread 2 加入字典
新 Key 32 被 Write Thread 2 加入字典
新 Key 206 被 Write Thread 2 加入字典
新 Key 147 被 Write Thread 2 加入字典
新 Key 215 被 Write Thread 1 加入字典
新 Key 38 被 Write Thread 1 加入字典
// volatile 关键字指出一个字段可能会被同时执行的多个线程修改。
// 声明为 volatile 的字段不会被编译器和处理器优化为只能被单个线程访问。
// 这确保了该字段总是最新的值
static volatile bool _isCompleted = false;
static void Main(string[] args)
{
var t1 = new Thread(UserModeWait);
var t2 = new Thread(HybirdSpinWait);
Console.WriteLine("执行用户模式等待");
t1.Start();
Thread.Sleep(20);
// Thread.Sleep(TimeSpan.FromSeconds(20));
_isCompleted = true;
Thread.Sleep(TimeSpan.FromSeconds(1));
_isCompleted = false;
Console.WriteLine("执行混合模式等待");
t2.Start();
Thread.Sleep(5);
// Thread.Sleep(TimeSpan.FromSeconds(20));
_isCompleted = true;
Console.ReadLine();
}
static void UserModeWait()
{
while (!_isCompleted)
{
// 这里会一直消耗 CPU 时间
Console.Write(".");
}
Console.WriteLine();
Console.WriteLine("等待已完成");
}
static void HybirdSpinWait()
{
// 使用 SpinWait 来使线程等待
//
var w = new SpinWait();
while (!_isCompleted)
{
w.SpinOnce();
// 获取是否确保下次调用 System.Threading.SpinWait.SpinOnce 会产生处理器,同时触发强制的上下文切换。
// false : 用户模式 不会发生上下文切换 但会浪费 CPU 时间
// true : 内核模式 会发生上下文切换 但会节省 CPU 时间
Console.WriteLine(w.NextSpinWillYield);
}
Console.WriteLine("等待已完成");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
打印结果
执行用户模式等待
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
............................................
等待已完成
执行混合模式等待
False
False
False
False
False
False
False
False
False
False
True
True
True
True
True
True
True
True
True
True
True
True
True
等待已完成
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
看上面的数据结果没啥差别。我们将持续改为 20s 后执行,看一下 CPU 的负载。
效果很明显,使用 SpinWati 的版本明显 CPU 负载很低。