Mostafa Asgari
Mostafa Asgari

Mostafa Asgari

tunny: A goroutine pool for Go

Mostafa Asgari's photo
Mostafa Asgari

Published on Jul 15, 2021

3 min read

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchronous API.

A fixed goroutine pool is helpful when you have work coming from an arbitrary number of asynchronous sources, but a limited capacity for parallel processing. For example, when processing jobs from HTTP requests that are CPU heavy you can create a pool with a size that matches your CPU count.

In this tutorial, we will take a visual guide to the implementation details of Tunny and see how it works under the hood. First, read the usage example to get the idea of how it works and then come back here to dig into details. Sounds great? let's start.

What happens when we set the pool size?

Basically, we want a pool of goroutines that can take a job from a queue somehow, process the job, and get back to the pool again when the job has finished. If all the goroutines are busy processing the jobs, new jobs submitted to the pool have to be blocked until a goroutine gets free and picks the job. In Tunny these goroutines are called worker and created in the Pool.SetSize method. If the pool size is N, then we have N workers running concurrently:

func (p *Pool) SetSize(n int) {
    // Code omitted for bravity

    for i := lWorkers; i < n; i++ {
        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    }
}

newWorkerWrapper runs each worker in a separate goroutine:

go w.run()

When running each worker,newWorkerWrapper passes a channel called reqChan to each worker.

reqChan chan workRequest

When these workers are free, they send a request to this channel, saying:

Hey, I'm free and I can process a job from the pool

Because this channel is an unbuffered channel, only one worker can write to this channel at any given time:

reqChan: make(chan workRequest)

and the other workers have to wait until the channel gets free again. Imagine the pool size is 3, the picture below shows how it works:

1.png

2.png

What happens when a job is submited to the pool?

If someone submits a job to the pool, first it reads from reqChan

// this code is part of --> (p *Pool) Process method
request, open := <-p.reqChan

3.png And communicate with the worker through workRequest.

type workRequest struct {
    // jobChan is used to send the payload to this worker.
    jobChan chan<- interface{}

    // retChan is used to read the result from this worker.
    retChan <-chan interface{}
}

It passes the input/payload data via jobChan to the worker and waits until the result comes back via retChan:

request.jobChan <- payload
result, open = <-request.retChan

While worker 3 is busy processing the job, the other 2 workers trying to send the workRequest to the channel, because now the channel is free and they can write to it:

4.png This process repeats for the other jobs submitted to the pool:

5.png

6.png If all the workers are busy, and a new job comes, it will be blocked, because it tires to read from reqChan, and the channel is empty, causing it to wait:

7.png And eventually, one worker will finish his job and notify his readiness and will pick up the blocked job:

8.png

Last word

For me, the way that workers voluntarily declare I'm free and I can pick a job was interesting. The project is popular and I was curious to know about the project. That's why after I understood how it works, I shared it with you.

 
Share this