线程协作

最近,碰到一个问题,如下:

有四个线程 t1,t2,t3,t4。t1 只能输出1,t2 只能输出2,t3 只能输出3,t4 只能输出4。要求输出41234123…

这个问题考察什么?起初我的大脑如一团白雾。直到我重温《操作系统的概念》,看到两个字:协作。大脑才如一溪流水。原来是考察线程协作问题。

1

什么是线程协作?指多个线程之间相互协作,共同完成任务。比如一个线程负责准备数据,另一个线程负责发送数据。发送线程在发送数据前,必须等待准备线程准备好数据。协作涉及到执行顺序,共享数据,以及消息通信。

与协作对立的是独立。独立指线程独自运行,不与其它线程协作。比如,你的 Word 软件线程,显然不与其他线程协作,一个线程单独的完成你的文本编辑任务。同样,在很多桌面软件中,线程相互独立。

相比相互独立的线程,相互协作的线程重要的多。大量服务器软件,运行在多核处理器中,他们内部的线程相互协作,共同完成任务。协作线程,通常在服务器软件中;而独立线程,通常在桌面软件中。

回到题目中,怎么样才能输出41234123…?线程4、线程1、线程2、线程3依次输出,并且一直轮回下去。怎么才能保证四个线程按顺序输出呢?显然,需要 4 个线程协作,按照既定执行顺序去执行。这是一个典型的协作问题。

2

协作的核心是通信。什么是通信?在线程之间,通信指线程间的信息交流与传递。

协作问题都是通信问题。 在我们的问题中,线程协作,其实就是通信问题。但是,很多时候,我们并没有意识到通信问题的存在。其原因有两点:一,没有意识到,这道题考察协作问题;二,没有意识到,协作问题就是通信问题。

通信模式决定线程协作模式。通信模式决定了是良好协作,还是充满冲突。在共享内存通信模式中,冲突不可避免,因为多个线程竞争同一个数据。你可能认为,在消息通信的通信模式中,可以避免冲突。但事实上,仍然有许多冲突不可避免,比如多个消费者竞争同一条消息。

好的通信模式使线程间良好协作。比如,共享内存的通信模式,解决了数据库的共享数据问题。在多线程环境下,总会有多个线程同时竞争数据。通过共享内存的通信模式,得以解决。共享内存是一种重要的通信模式,解决了许多通信问题,但是也有许多问题无法解决。

3

通信模式有两种:一种是共享内存,另一种是消息通信。

共享内存,指多个线程通过共享内存空间通信。 在共享内存模式中,有三要素:多个线程、共享的内存区域和锁。常用数据库 MySQL,多个线程通过共享内存通信。你可能认为,MySQL 的线程之间没有通信,而且也从未听说过他们之间有通信。但事实上,线程之间一直在通信。比如两条线程同时修改一条数据,先拿到锁的线程在修改数据,没拿到锁的线程在等待。锁是线程间通信的工具,通信一直存在。

消息通信,指多个线程通过消息通信的方式通信。 在消息通信模式中,有四要素:发送者、接受者、通道、消息。发送者把消息发送到通道,接受者等待通道的消息。其中,发送者和监听者均是线程。在 Go 语言中,就是采用这一套通信模式。

哪种模式更好?坦白说,只能用一句话搪塞:“没有银弹。”也就是说,没有最优解,只有满意解。这两种模式,都有自己适应场景,并在自己场景内达到满意解。鼓吹某种模式更好,是一种偏见。如果你不信,那么请思考:数据库能通过消息通信模式完成任务吗?

4

我们一直在讨论线程协作,实际上线程是小进程,线程也是大协程。进程和协程,通信方式一样。在多个进程之间,也是同样的通信模式:共享内存和消息通信。

共享内存的典型案例是分布式锁。做电商的同学,经常会碰到秒杀问题,很多基于分布式锁实现。在秒杀问题中,库存是固定数字,多个订单处理进程处理订单,要求不能出现库存是负数。其方式是通过 Redis 的单线程,实现分布式锁。多个订单处理进程,通过分布式锁,实现通信。从而,实现库存不能为负数的业务功能。

image.png

这个案例中,需要注意:多个订单进程看起来单独运行的,单实际上他们是紧密协作的。每一个订单进程都需要对库存负责,绝对不能出现超出库存的情况。

消息通信的典型案例是消息队列。消息队列是大型分布式系统的重要组件,其功能之一是解耦。既然解耦,那么必定有耦合。耦合有很多种,那么多个进程中,是一种什么样的耦合关系呢?我认为可以称作:通信耦合。没有消息队列之前,各个进程之间的通信是直接而耦合的;有了消息队列以后,各个进程之间的通信是间接而解耦的。

进程协作有一个别称:进程同步。人们喜欢用「同步」描述进程之间的合作。然而,我喜欢用「协作」描述进程之间的合作。协作这个词,更能代表线程之间的现象和规律。而且,协作方式,不只是同步,还有异步。比如在 Erlang 语言中,进程协作的方式是异步的。另外,通过消息队列的通信,实际上也是异步的。

5

而且 Go 同时支持两种通信模式。不仅有共享内存,而且有消息通信。我们试着用 Go 解决我们的问题。

第一种,通过共享内存模式。共享内存模式三要素:多个线程、共享的内存区域、锁。核心操作是加锁和解锁,以及加锁解锁之间的原子操作。在代码中如何体现?

// 共享内存协作方式
func TestShareMemory(t *testing.T) {
	var wg sync.WaitGroup
	wg.Add(4) //设置等待的线程数量

	global := expvar.Int{}
	global.Set(4)
	cond := sync.NewCond(&sync.Mutex{})

	//t1
	go func() {
		for {
			cond.L.Lock()
			if global.Value() == 1 {
				fmt.Println(1)
				global.Set(2)
			}
			cond.L.Unlock()
		}
	}()

	//t2
	go func() {
		for {
			cond.L.Lock()
			if global.Value() == 2 {
				fmt.Println(2)
				global.Set(3)
			}
			cond.L.Unlock()
		}
	}()

	//t3
	go func() {
		for {
			cond.L.Lock()
			if global.Value() == 3 {
				fmt.Println(3)
				global.Set(4)
			}
			cond.L.Unlock()
		}
	}()

	//t4
	go func() {
		for {
			cond.L.Lock()
			if global.Value() == 4 {
				fmt.Println(4)
				global.Set(1)
			}
			cond.L.Unlock()
		}
	}()

	wg.Wait()
}

复制代码

第二种,通过消息通信模式。消息通信模式四要素:发送者、接受者、通道、消息。核心操作是发送和接收。在代码中如何体现?

// 进程通信协作方式
func TestProcessCommunication(t *testing.T) {
	wg := sync.WaitGroup{}
	wg.Add(4)

	ch4_1 := make(chan int, 4)
	ch1_2 := make(chan int, 4)
	ch2_3 := make(chan int, 4)
	ch3_4 := make(chan int, 4)
	
	ch4_1 <- 4
	
	// t4
	go func() {
		for {
			v := <-ch4_1
			fmt.Println(v)
			ch1_2 <- 1
		}
	}()
	
	// t1
	go func() {
		for {
			v := <-ch1_2
			fmt.Println(v)
			ch2_3 <- 2
		}
	}()
	
	// t2
	go func() {
		for {
			v := <-ch2_3
			fmt.Println(v)
			ch3_4 <- 3
		}
	}()
	
	// t3
	go func() {
		for {
			v := <-ch3_4
			fmt.Println(v)
			ch4_1 <- 4
		}
	}()
	
	wg.Wait()

}

原创文章,作者:睿达君,如若转载,请注明出处:http://zrrd.net.cn/1654.html

发表评论

登录后才能评论
咨询电话
联系电话:0451-81320577

地址:哈尔滨市松北区中小企业总部基地13F

微信咨询
微信咨询
QQ咨询
分享本页
返回顶部