Ruby - 多线程

传统程序只有一个执行线程组成程序的语句或指令按顺序执行,直到程序终止。

多线程程序有多个执行线程。 在每个线程中,语句按顺序执行,但线程本身可以在多核 CPU 上并行执行。 通常在单个 CPU 机器上,多个线程实际上并不是并行执行的,而是通过交错执行线程来模拟并行性。

Ruby 使用 Thread 类可以轻松编写多线程程序。 Ruby 线程是一种在代码中实现并发的轻量级且高效的方法。


创建 Ruby 线程

要启动一个新线程,只需将一个块与对 Thread.new 的调用相关联。 将创建一个新线程来执行块中的代码,原线程将立即从 Thread.new 返回并继续执行下一条语句 −

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

示例

这是一个例子,它展示了我们如何使用多线程 Ruby 程序。

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

这将产生以下结果 −

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

线程生命周期

使用 Thread.new 创建一个新线程。 您还可以使用同义词 Thread.startThread.fork

创建线程后无需启动线程,当 CPU 资源可用时它会自动开始运行。

Thread 类定义了许多在线程运行时查询和操作线程的方法。 线程运行与调用 Thread.new 相关联的块中的代码,然后停止运行。

该块中最后一个表达式的值就是线程的值,可以通过调用Thread对象的value方法获得。 如果线程已运行完成,则该值立即返回线程的值。 否则,value 方法会阻塞并且在线程完成之前不会返回。

类方法Thread.current返回代表当前线程的Thread对象。 这允许线程自己操作。 类方法 Thread.main 返回代表主线程的 Thread 对象。 这是 Ruby 程序启动时开始的初始执行线程。

您可以通过调用特定线程的 Thread.join 方法来等待特定线程完成。 调用线程将阻塞,直到给定线程完成。


线程和异常

如果在主线程中引发异常,并且没有在任何地方处理,Ruby 解释器会打印一条消息并退出。 在除主线程之外的线程中,未处理的异常会导致线程停止运行。

如果线程 t 因为未处理的异常而退出,而另一个线程 s 调用 t.join 或 t.value,则 t 中发生的异常在线程 s 中引发。

如果 Thread.abort_on_exceptionfalse(默认情况),未处理的异常只会杀死当前线程,其余线程继续运行。

如果您希望任何线程中的任何未处理异常导致解释器退出,请将类方法 Thread.abort_on_exception 设置为 true

t = Thread.new { ... }
t.abort_on_exception = true

线程变量

线程通常可以访问创建线程时范围内的任何变量。 线程块的局部变量对线程来说是局部的,并且不共享。

Thread 类具有一个特殊的功能,允许通过名称创建和访问线程局部变量。 您只需将线程对象视为 Hash,使用 []= 写入元素并使用 [] 读取它们。

在这个例子中,每个线程用 mycount 键将变量 count 的当前值记录在一个线程局部变量中。

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

This produces the following result −

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主线程等待子线程完成,然后打印出每个子线程捕获的count的值。


线程优先级

影响线程调度的第一个因素是线程优先级:高优先级线程在低优先级线程之前被调度。 更准确地说,只有在没有更高优先级的线程等待运行时,线程才会获得 CPU 时间。

您可以使用 priority =priority 设置和查询 Ruby 线程对象的优先级。 新创建的线程以与创建它的线程相同的优先级启动。 主线程从优先级 0 开始。

没有办法在线程开始运行之前设置它的优先级。 但是,线程可以在执行的第一个操作时提高或降低自己的优先级。


线程排除

如果两个线程共享对相同数据的访问,并且至少有一个线程修改了该数据,则必须特别注意确保没有线程可以看到处于不一致状态的数据。 这称为线程排除

Mutex 是一个实现简单信号量锁的类,用于对某些共享资源进行互斥访问。 也就是说,在给定时间只有一个线程可以持有锁。 其他线程可能会选择排队等待锁变得可用,或者可能只是选择立即获得指示锁不可用的错误。

通过将所有对共享数据的访问置于 mutex 的控制之下,我们确保了一致性和原子操作。 让我们尝试示例,第一个没有mutax,第二个有mutax −

示例没有 Mutax

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果 −

count1 :  1583766
count2 :  1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果 −

count1 :  696591
count2 :  696591
difference : 0

处理死锁

当我们开始使用 Mutex 对象进行线程排除时,我们必须小心避免 deadlock。 死锁是当所有线程都在等待获取另一个线程持有的资源时发生的情况。 因为所有线程都被阻塞了,所以它们不能释放它们持有的锁。 而且因为它们不能释放锁,所以没有其他线程可以获取这些锁。

这就是条件变量发挥作用的地方。 条件变量只是一个与资源相关联的信号量,用于特定互斥体的保护。 当您需要不可用的资源时,您需要等待条件变量。 该操作会释放相应 mutex 上的锁定。 当一些其他线程发出资源可用的信号时,原始线程退出等待并同时重新获得对关键区域的锁定。

示例

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

这将产生以下结果 −

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

线程状态

有五种可能的返回值对应于下表所示的五种可能状态。 status 方法返回线程的状态。

线程状态 返回值
Runnable run
Sleeping Sleeping
Aborting aborting
Terminated normally false
Terminated with exception nil

线程类方法

以下方法由Thread 类提供,它们适用于程序中所有可用的线程。 这些方法将被调用为使用 Thread 类名,如下所示 −

Thread.abort_on_exception = true

线程实例方法

这些方法适用于线程的实例。 这些方法将被调用为使用 Thread 的实例,如下所示 −

#!/usr/bin/ruby

thr = Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join