Go Patterns: Elegant Parallel Queues

by about Go, Software Engineering in Technology

Processing items in a queue in parallel is one of the trickier subjects in programming. You don’t want to overload your systems, but you also want to maximize speed. Go has some unique tools to deal with it.

Programming is relatively simple until it comes to running code in parallel. When things get parallel things get complicated: scheduling, race conditions, locks are just some of the concepts that can break a CS student’s brain during exam season.

For this post let’s assume we need to process a large number of items in parallel. For example, we could be in need of a service that resizes images for a blog. Let’s assume we want to resize the images as fast as possible, but only use a maximum of 8 cores. In other words, we want to convert only 8 images in parallel.

In this post we’ll be using goroutines, smart threads that gave Go its name. They are a combination of threads and coroutines.

You can think of a goroutine as a function that’s running in the background. As a basic example we will launch the work() function in the background using the go keyword. We will provide a wait group, so the main routine can wait for the work() function to finish.

func work(wg *sync.WaitGroup) {
    time.Sleep(10 * time.Second)
    wg.Done()
}

func main() {
    wg := &sync.WaitGroup{}
    for i := 0; i < 8; i++ {
        wg.Add(1)
        go work(wg)
    }
    wg.Wait()
}

For our queue problem the easiest solution would be to split the workload into 8 pieces and have 8 goroutines process them in parallel, similar to the example above. The problem with this solution is that one goroutine might finish earlier, while others take longer, leading to a loss of performance. Alternatively, we could also fiddle around with locks and taking elements out of a list one at a time.

However, with Go we can implement not one, but two more elegant solutions with using channels. Channels in Go are a way to send items from one goroutine to another, similar to a queue. However, unlike a queue channels don’t have an unlimited capacity. By default, channels have zero capacity. If you want to write into a channel that has no capacity available, your code will block until someone reads from it.

Solution 1: Items through channels ▲ Back to top

The first solution uses channels as they are intended: to transport items across goroutine boundaries. We will launch 8 goroutines as before. We will then feed the items that are in need of processing to the goroutines through a channel. When we are done feeding the items we will close the channel, which will lead to the work() function failing the channel read and exiting.

func work(itemChannel <-chan int, wg *sync.WaitGroup) {
    for {
        if item, ok := <- itemChannel; ok {
            fmt.Printf("Processing item %d...\n", item)
            time.Sleep(10 * time.Second)
            fmt.Printf("Item %d done.\n", item)
        } else {
            wg.Done()
            return
        }
    }
}

func main() {
    items := []int{
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
    }

    wg := &sync.WaitGroup{}
    itemChannel := make(chan int)
    for i := 0; i < 8; i++ {
        wg.Add(1)
        go work(itemChannel, wg)
    }

    for _, item := range items {
        itemChannel <- item
    }
    close(itemChannel)
    wg.Wait()
}
Multiple queues

You can even handle multiple queues using the select block. Select returns with whichever channel has items available.

var item int
var ok bool
select {
case item, ok = <-chan1:
    // channel 1 returned an item or closed
case item, ok = <-chan2:
    // channel 2 returned an item or closed
}
if !ok {
    // Channel is closed.
} else {
    // Handle item.
}

Solution 2: Using channels as locks ▲ Back to top

The second solution is a bit less straight forward. Here we will launch as many goroutines as we have items, one goroutine to process each item. If we ran the code without any locking mechanism it would run all items in parallel, depending on your processor and the GOMAXPROCS setting. However, we are creating a locking channel with the size of 8. This means we can write a maximum of 8 items into this channel without another goroutine reading from it. The ninth item will block execution. We are using this in our work function to create a lock at the beginning and release the lock at the end. The item we are writing into this channel doesn’t matter, therefore we are using an empty struct.

func work(item int, lock chan struct{}, wg *sync.WaitGroup) {
    lock <- struct{}{}
    fmt.Printf("Processing item %d...\n", item)
    time.Sleep(10 * time.Second)
    fmt.Printf("Item %d done.\n", item)
    <- lock
    wg.Done()
}

func main() {
    items := []int{
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
    }

    wg := &sync.WaitGroup{}
    lock := make(chan struct{}, 8)
    for _, item := range items {
        wg.Add(1)
        go work(item, lock, wg)
    }
    wg.Wait()
}

As you can see, this code is somewhat shorter than the previous example, but is a bit harder to read since the channel magic is not intuitive at first.

Summary ▲ Back to top

As you can see, channels can be a versatile tool to manage parallel processes. Goroutines and channels are often touted to be the flagship features of Go. However, my experience is that in daily work I run into them rather rarely. Still, they are useful tools to have in your arsenal and well worth practicing.