A fixed number of goroutines (workers) that handle a stream of jobs are managed using worker pools, a concurrency paradigm. When handling a lot of activities, this method aids in reducing resource usage and increasing productivity.
What are Worker Pools in Go?
Concept: A worker pool is often a collection of “threads” (goroutines in Go) that are intended to handle tasks that are delegated to them.
Go’s Approach: Instead of using conventional operating system threads as workers, worker pools in Go use goroutines. One significant distinction is that, in contrast to OS threads, which have a large creation/destruction cost, Go’s goroutines are incredibly lightweight and have the ability to “die” after completing a task.
Implementation: In Go, buffered channels are the main tool used to construct worker pools. These buffered channels are essential because they let you regulate resource utilization by limiting the number of goroutines that can run concurrently.
Benefits of Worker Pools
Resource Management: Worker pools save CPU and memory by preventing the system from becoming overloaded with too many running, concurrent processes.
Concurrency Control: They offer a methodical approach to concurrency management, guaranteeing that tasks are completed effectively without causing race situations or system instability due to unchecked goroutine generation.
Templating for Complex Tasks: Even for straightforward activities, the fundamental design of a worker pool can be used as a model to execute much more complex tasks, like managing several client connections in a server process.
Code Example
The workerPool.go program demonstrates a basic worker pool that uses a single goroutine for each request to process integer integers and output their square values. Using goroutines, this sophisticated method may be used to build server processes that can accept and service several clients at once.
Data Structures for Jobs and Results
package main
import (
"fmt"
"os"
"strconv"
"sync"
"time"
)
type Client struct {
id int
integer int
}
type Data struct {
job Client
square int
}
// The main function sets up and manages the worker pool.
// It takes the number of jobs as a command-line argument.
func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: go run <filename.go> <number_of_jobs>")
return
}
nJobs, err := strconv.Atoi(os.Args[1])
if err != nil {
fmt.Println("Error: Invalid number of jobs argument. Must be an integer.")
return
}
fmt.Printf("Starting job processing for %d jobs...\n", nJobs)
jobsChannel := make(chan Client, nJobs)
resultsChannel := make(chan Data, nJobs)
var wg sync.WaitGroup
nWorkers := 3 // Example: run 3 workers concurrently
for w := 1; w <= nWorkers; w++ {
wg.Add(1)
go consumer(w, &wg, jobsChannel, resultsChannel)
}
go producer(jobsChannel, nJobs)
wg.Wait()
close(resultsChannel)
for result := range resultsChannel {
fmt.Printf("Job ID %d (Input: %d) -> Output: %d\n", result.job.id, result.job.integer, result.square)
}
fmt.Println("All jobs processed successfully.")
}
// producer generates jobs and sends them to the jobsChannel
func producer(jobsChannel chan<- Client, nJobs int) {
fmt.Println("Producer: Generating jobs...")
for i := 1; i <= nJobs; i++ {
jobsChannel <- Client{id: i, integer: i}
}
close(jobsChannel)
fmt.Println("Producer: All jobs sent.")
}
// consumer receives jobs, processes them, and sends results to the resultsChannel
func consumer(id int, wg *sync.WaitGroup, jobsChannel <-chan Client, resultsChannel chan<- Data) {
defer wg.Done()
for job := range jobsChannel {
fmt.Printf("Worker %d: Processing job %d...\n", id, job.id)
time.Sleep(1 * time.Second) // Simulate work
output := job.integer * job.integer
resultsChannel <- Data{job: job, square: output}
fmt.Printf("Worker %d: Finished job %d.\n", id, job.id)
}
fmt.Printf("Worker %d: No more jobs. Exiting.\n", id)
}
Output
Starting job processing for 5 jobs...
Producer: Generating jobs...
Producer: All jobs sent.
Worker 1: Processing job 1...
Worker 2: Processing job 2...
Worker 3: Processing job 3...
Worker 1: Finished job 1.
Worker 1: Processing job 4...
Worker 2: Finished job 2.
Worker 2: Processing job 5...
Worker 3: Finished job 3.
Worker 3: No more jobs. Exiting.
Worker 1: Finished job 4.
Worker 1: No more jobs. Exiting.
Worker 2: Finished job 5.
Worker 2: No more jobs. Exiting.
Job ID 1 (Input: 1) -> Output: 1
Job ID 2 (Input: 2) -> Output: 4
Job ID 3 (Input: 3) -> Output: 9
Job ID 4 (Input: 4) -> Output: 16
Job ID 5 (Input: 5) -> Output: 25
All jobs processed successfully.
Client
struct: used to store the numerical input and give each request a unique ID.
Data
struct: combines the computed square result with the input data from the client.
Global Channels and Worker Function
package main
import (
"fmt"
"sync"
"time"
)
// Client represents a job request with an ID and an integer value.
type Client struct {
id int
integer int
}
// Data represents the result of a processed job.
type Data struct {
job Client // The original job associated with this result
square int // The calculated square of the integer
}
// Global buffered channels for communication between main/producer and workers.
// The size is set to 10, meaning up to 10 items can be in the channel buffer
// before a sender blocks.
var (
size = 10
clients = make(chan Client, size) // Buffered channel for incoming client requests/jobs
data = make(chan Data, size) // Buffered channel for processed results
)
// worker is a goroutine that reads jobs from the 'clients' channel,
// processes them (calculates the square), and sends the results
// to the 'data' channel. It signals completion using a WaitGroup.
func worker(w *sync.WaitGroup) {
// Defer w.Done() to ensure that the WaitGroup counter is decremented
// when this worker goroutine finishes processing all jobs and exits.
defer w.Done()
// Loop indefinitely, reading clients from the 'clients' channel.
// This loop will automatically terminate when the 'clients' channel is closed.
for c := range clients {
fmt.Printf("Worker processing Client ID: %d (Input: %d)\n", c.id, c.integer)
// Simulate some computational work or I/O delay.
time.Sleep(500 * time.Millisecond)
// Calculate the square of the integer from the client job.
square := c.integer * c.integer
// Create a Data struct to hold the original job and its result.
output := Data{c, square}
// Send the processed result to the 'data' channel.
data <- output
}
fmt.Println("Worker: No more jobs, exiting.")
}
// main function to orchestrate the client generation and worker processing.
func main() {
fmt.Println("Starting Go Concurrency Example...")
var wg sync.WaitGroup // WaitGroup to wait for all worker goroutines to finish
// Start multiple worker goroutines.
// Each worker will run concurrently, pulling jobs from the 'clients' channel.
numWorkers := 3
fmt.Printf("Starting %d worker goroutines...\n", numWorkers)
for i := 0; i < numWorkers; i++ {
wg.Add(1) // Increment the WaitGroup counter for each worker
go worker(&wg)
}
// Produce jobs: Send Client structs to the 'clients' channel.
// In a real application, this might come from network requests, a file, etc.
numJobs := 5
fmt.Printf("Generating %d jobs...\n", numJobs)
for i := 1; i <= numJobs; i++ {
clients <- Client{id: i, integer: i * 2} // Send a new job to the clients channel
}
// Close the 'clients' channel.
// This signals to all workers that no more jobs will be sent.
// The `for c := range clients` loop in each worker will then complete.
close(clients)
fmt.Println("All jobs sent to clients channel. Closing clients channel.")
// Wait for all worker goroutines to finish.
// This blocks until `wg.Done()` has been called `numWorkers` times.
wg.Wait()
fmt.Println("All workers have finished.")
// Close the 'data' channel after all workers are done producing results.
// This allows the following `for range` loop to terminate.
close(data)
fmt.Println("Closing data channel.")
// Collect and print all results from the 'data' channel.
fmt.Println("\n--- Final Results ---")
for result := range data {
fmt.Printf("Result for Client ID %d (Input: %d): Square = %d\n",
result.job.id, result.job.integer, result.square)
}
fmt.Println("Program finished successfully.")
}
Output
Starting Go Concurrency Example...
Starting 3 worker goroutines...
Generating 5 jobs...
Worker processing Client ID: 1 (Input: 2)
Worker processing Client ID: 2 (Input: 4)
Worker processing Client ID: 3 (Input: 6)
All jobs sent to clients channel. Closing clients channel.
Worker processing Client ID: 4 (Input: 8)
Worker processing Client ID: 5 (Input: 10)
Worker: No more jobs, exiting.
Worker: No more jobs, exiting.
Worker: No more jobs, exiting.
All workers have finished.
Closing data channel.
--- Final Results ---
Result for Client ID 1 (Input: 2): Square = 4
Result for Client ID 2 (Input: 4): Square = 16
Result for Client ID 3 (Input: 6): Square = 36
Result for Client ID 4 (Input: 8): Square = 64
Result for Client ID 5 (Input: 10): Square = 100
Program finished successfully.
Buffered Channels: Global buffered channels with a size capacity of 10 are used for clients and data. Data is used to convey results, and clients are used for new requests. The capacity of a channel can be found using the cap() function.
worker()
function: This is a goroutine function. It transmits the data result to the data channel after continually reading client tasks from the clients channel and calculating the square. time.Sleep(time.Second) mimics a processing lag. W.Done() is called to indicate that this worker has finished when the client channel has been closed and all jobs have been completed.
Functions to Manage Workers and Create Jobs
package main
import (
"fmt"
"sync"
"time"
)
// Client represents a job request with an ID and an integer value.
type Client struct {
id int
integer int
}
// Data represents the result of a processed job.
type Data struct {
job Client // The original job associated with this result
square int // The calculated square of the integer
}
// Global buffered channels for communication between producer, workers, and main.
// The size is set to 10, allowing up to 10 items in the channel buffer
// before a sender blocks.
var (
size = 10
clients = make(chan Client, size) // Buffered channel for incoming client requests/jobs
data = make(chan Data, size) // Buffered channel for processed results
)
// worker is a goroutine that reads jobs from the 'clients' channel,
// processes them (calculates the square), and sends the results
// to the 'data' channel. It signals completion using a WaitGroup.
func worker(w *sync.WaitGroup) {
// Defer w.Done() to ensure that the WaitGroup counter is decremented
// when this worker goroutine finishes processing all jobs and exits.
defer w.Done()
// Loop indefinitely, reading clients from the 'clients' channel.
// This loop will automatically terminate when the 'clients' channel is closed.
for c := range clients {
fmt.Printf("Worker processing Client ID: %d (Input: %d)\n", c.id, c.integer)
// Simulate some computational work or I/O delay.
time.Sleep(500 * time.Millisecond)
// Calculate the square of the integer from the client job.
square := c.integer * c.integer
// Create a Data struct to hold the original job and its result.
output := Data{c, square}
// Send the processed result to the 'data' channel.
data <- output
}
fmt.Println("Worker: No more jobs, exiting.")
}
// makeWP creates 'n' worker goroutines and waits for them to finish.
// It also closes the 'data' channel after all workers are done.
func makeWP(n int) { // Creates 'n' worker goroutines
var w sync.WaitGroup // Each call to makeWP uses its own WaitGroup for its workers
for i := 0; i < n; i++ {
w.Add(1) // Increment the WaitGroup counter for each worker
go worker(&w) // Start a worker goroutine
}
w.Wait() // Wait for all workers to finish their jobs
close(data) // Close the data channel once all workers are done producing results
fmt.Println("makeWP: All workers finished and data channel closed.")
}
// create generates 'n' client jobs and sends them to the 'clients' channel.
// It closes the 'clients' channel after all jobs are sent.
func create(n int) { // Creates 'n' client jobs
fmt.Printf("create: Generating %d client jobs...\n", n)
for i := 0; i < n; i++ {
c := Client{i, i * 2} // Create a new client job
clients <- c // Send client job to the clients channel
}
close(clients) // Close the clients channel after all jobs are sent to signal EOF to workers
fmt.Println("create: All jobs sent and clients channel closed.")
}
// main function to orchestrate the client generation and worker processing.
func main() {
fmt.Println("Starting Go Concurrency Producer-Consumer Example...")
numJobs := 5 // Number of jobs to create
numWorkers := 3 // Number of worker goroutines
// Start a goroutine to create jobs. This runs concurrently with worker creation.
go create(numJobs)
// Start the worker pool and wait for all workers to finish.
// This function will block until all workers have processed all jobs from 'clients'
// and sent their results to 'data'.
makeWP(numWorkers)
// Collect and print all results from the 'data' channel.
// This loop will terminate when the 'data' channel is closed by makeWP.
fmt.Println("\n--- Final Results ---")
for result := range data {
fmt.Printf("Result for Client ID %d (Input: %d): Square = %d\n",
result.job.id, result.job.integer, result.square)
}
fmt.Println("Program finished successfully.")
}
Output
Starting Go Concurrency Producer-Consumer Example...
create: Generating 5 client jobs...
Worker processing Client ID: 0 (Input: 0)
Worker processing Client ID: 1 (Input: 2)
Worker processing Client ID: 2 (Input: 4)
create: All jobs sent and clients channel closed.
Worker processing Client ID: 3 (Input: 6)
Worker processing Client ID: 4 (Input: 8)
Worker: No more jobs, exiting.
Worker: No more jobs, exiting.
Worker: No more jobs, exiting.
makeWP: All workers finished and data channel closed.
--- Final Results ---
Result for Client ID 0 (Input: 0): Square = 0
Result for Client ID 1 (Input: 2): Square = 4
Result for Client ID 2 (Input: 4): Square = 16
Result for Client ID 3 (Input: 6): Square = 36
Result for Client ID 4 (Input: 8): Square = 64
Program finished successfully.
makeWP(n int)
: n
worker goroutines are created by this function. It employs a sync.Before calling close(data), WaitGroup will wait for all worker routines to finish their tasks. W.For every worker produced, Add(1) is called.
create(n int)
: This function creates n jobs, or client requests. The client buffered channel receives each client. The clients channel is close()d to indicate that no more jobs will be sent after all jobs have been sent.
Main Function: Orchestration
package main
import (
"fmt"
"os"
"strconv"
"sync"
"time"
)
// Client represents a job request with an ID and an integer value.
type Client struct {
id int
integer int
}
// Data represents the result of a processed job.
type Data struct {
job Client // The original job associated with this result
square int // The calculated square of the integer
}
// Global buffered channels for communication between producer, workers, and main.
// The size is set to 10, allowing up to 10 items in the channel buffer
// before a sender blocks.
var (
size = 10
clients = make(chan Client, size) // Buffered channel for incoming client requests/jobs
data = make(chan Data, size) // Buffered channel for processed results
)
// worker is a goroutine that reads jobs from the 'clients' channel,
// processes them (calculates the square), and sends the results
// to the 'data' channel. It signals completion using a WaitGroup.
func worker(w *sync.WaitGroup) {
defer w.Done() // Ensure the WaitGroup counter is decremented when this worker exits
for c := range clients { // Loop, reading clients from the 'clients' channel until it's closed
fmt.Printf("Worker processing Client ID: %d (Input: %d)\n", c.id, c.integer)
time.Sleep(500 * time.Millisecond) // Simulate work delay
square := c.integer * c.integer
output := Data{c, square}
data <- output // Send the processed result to the 'data' channel
}
fmt.Println("Worker: No more jobs, exiting.")
}
// makeWP creates 'n' worker goroutines and waits for them to finish.
// It also closes the 'data' channel after all workers are done.
func makeWP(n int) { // Creates 'n' worker goroutines
var w sync.WaitGroup // WaitGroup specific to the workers created by this call
for i := 0; i < n; i++ {
w.Add(1) // Increment WaitGroup counter for each worker
go worker(&w) // Start a worker goroutine
}
w.Wait() // Block until all workers created by this call have finished
close(data) // Close the data channel once all workers are done producing results
fmt.Println("makeWP: All workers finished and data channel closed.")
}
// create generates 'n' client jobs and sends them to the 'clients' channel.
// It closes the 'clients' channel after all jobs are sent.
func create(n int) { // Creates 'n' client jobs
fmt.Printf("create: Generating %d client jobs...\n", n)
for i := 0; i < n; i++ {
c := Client{i, i * 2} // Create a new client job (e.g., ID 0, value 0; ID 1, value 2; etc.)
clients <- c // Send client job to the clients channel
}
close(clients) // Close the clients channel to signal EOF to workers
fmt.Println("create: All jobs sent and clients channel closed.")
}
// main function orchestrates the producer (create) and consumer (workers)
// and handles argument parsing and final result printing.
func main() {
fmt.Println("Capacity of clients channel:", cap(clients))
fmt.Println("Capacity of data channel:", cap(data))
// Ensure exactly two command-line arguments are provided: #jobs and #workers
if len(os.Args) != 3 {
fmt.Println("Usage: go run <filename.go> <number_of_jobs> <number_of_workers>")
os.Exit(1)
}
// Correctly access command-line arguments (0-indexed: program_name, arg1, arg2)
nJobs, err := strconv.Atoi(os.Args[1])
if err != nil {
fmt.Println("Error parsing number of jobs:", err)
return
}
nWorkers, err := strconv.Atoi(os.Args[2])
if err != nil {
fmt.Println("Error parsing number of workers:", err)
return
}
fmt.Printf("\nStarting producer-consumer with %d jobs and %d workers...\n", nJobs, nWorkers)
go create(nJobs) // Start a goroutine to create jobs concurrently
// Channel to signal that the result printing goroutine has finished
finished := make(chan bool)
// Anonymous goroutine to read and print results from the 'data' channel
go func() {
for d := range data { // This loop runs until 'data' channel is closed by makeWP
fmt.Printf("Result -> Client ID: %d (Input: %d)\tSquare: %d\n", d.job.id, d.job.integer, d.square)
}
finished <- true // Signal main function that result printing is done
fmt.Println("Result printer: Finished reading all data.")
}()
// Create and manage worker goroutines. This call blocks until all workers are done
// and the 'data' channel is closed.
makeWP(nWorkers)
// Block until the result printing goroutine signals completion, then print confirmation
<-finished
fmt.Println("Main: All operations completed successfully.")
}
Output
Capacity of clients channel: 10
Capacity of data channel: 10
Starting producer-consumer with 5 jobs and 3 workers...
create: Generating 5 client jobs...
Worker processing Client ID: 0 (Input: 0)
Worker processing Client ID: 1 (Input: 2)
Worker processing Client ID: 2 (Input: 4)
create: All jobs sent and clients channel closed.
Result -> Client ID: 0 (Input: 0) Square: 0
Worker processing Client ID: 3 (Input: 6)
Result -> Client ID: 1 (Input: 2) Square: 4
Worker processing Client ID: 4 (Input: 8)
Result -> Client ID: 2 (Input: 4) Square: 16
Worker: No more jobs, exiting.
Result -> Client ID: 3 (Input: 6) Square: 36
Worker: No more jobs, exiting.
Result -> Client ID: 4 (Input: 8) Square: 64
Worker: No more jobs, exiting.
makeWP: All workers finished and data channel closed.
Result printer: Finished reading all data.
Main: All operations completed successfully.
Command-line Arguments: The number of jobs (nJobs) and the number of workers (nWorkers) are the two command-line parameters that the program accepts.
Job Creation: A goroutine called create(nJobs) is started to produce jobs simultaneously.
Result Consumption: The task of reading and printing results from the data channel falls to an anonymous goroutine.
finished
Channel: When the anonymous goroutine has completed printing all of the results from the data channel, a finished channel (of interface{} type) is used to signal the main function. In FMT, the <-finished statement.Before the program ends, Printf() makes sure the main function blocks and waits until all results have been used.
Worker Management: The worker pool is set up and started by using makeWP(nWorkers).
This workerPool.go application serves as a simple example. Error handling for malfunctioning resources (such as network connections) would be essential in increasingly complicated real-world situations.
You can also read What Is Mean By Context Package With Cancellation In GoLang