Las Clases Queuey SizedQueue

  1. The standard Thread library defines the Queue and SizedQueue data structures specifically for concurrent programming.

  2. They implement thread-safe FIFO queues and are intended for a producer/consumer model of programming.

  3. Under this model, one thread produces values of some sort and places them on a queue with the enq (enqueue) method or its synonym push.

  4. Another thread consumes these values, removing them from the queue with the deq (dequeue) method as needed. (The pop and shift methods are synonyms for deq.)

  5. The key features of Queue that make it suitable for concurrent programming is that the deq method blocks if the queue is empty and waits until the producer thread adds a value to the queue.

  6. The Queue and SizedQueue classes implement the same basic API, but the SizedQueue variant has a maximum size.
  7. If the queue is already at its maximum size, then the method for adding a value to the queue will block until the consumer thread removes a value from the queue.

Example: A Threaded Map-Reduce

  1. We now define a method that combines a concurrent map with a concurrent inject.

  2. It creates a thread for each element of the enumerable collection and uses that thread to apply a mapping Proc.

  3. The value returned by that Proc is enqueued on a Queue object.

  4. One final thread acts as a consumer; it removes values from the queue and passes them to the injection Proc as they become available.

Ejemplo de uso:

if $0 == __FILE__
  a = if ARGV.empty? 
      else &:to_f 
  mapper = lambda {|x| x*x }               # Compute squares
  injector = lambda {|total,x| total+x }   # Compute sum
  result = a.conject(0, mapper, injector)  # => 10
  puts result


[~/ruby/threads(master)]$ cat conject.rb 
require 'thread'

module Enumerable
  # Concurrent inject: expects an initial value and two Procs
  def conject(initial, mapper, injector)
    # Use a Queue to pass values from mapping threads to injector thread
    q =   
    count = 0                 # How many items?
    each do |item|            # For each item do           # Create a new thread
        q.enq(mapper[item])   # Map and enqueue mapped value
      count += 1              # Count items

    t = do         # Create injector thread
      x = initial             # Start with specified initial value
      while(count > 0)        # Loop once for each item
        x = injector[x,q.deq] # Dequeue value and inject
        count -= 1            # Count down
      x                       # Thread value is injected value

    t.value   # Wait for injector thread and return its value

Casiano Rodriguez León 2015-01-07