Lab 5 - Thread Pools

Class: CSCE-313


Notes:

Instructions

Introduction

In this lab, you will modify the banking system code to use concurrency by implementing a thread pool. This thread pool will manage several worker threads and tasks to split several tasks among its threads.

Starter Code

Code Structure

Objectives

By the end of this lab, you should be familiar with:

Getting Started

Read through the code. There have been changes made to only a few files, so the overall structure and function are the same.

thread_pool.h, thread_pool.cpp:

This is a new addition. You will be able to use this to improve processing time in a controlled manner.

thread_pool.h

class ThreadPool {
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;
    std::atomic<int> activeTasks;

public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();
    void enqueuefunction<void()> task;
};

ThreadPool contains a number of private variables and only makes public a constructor, destructor, and enqueue function. Let’s walk through each of these.

The workers vector contains as many threads as desired. These “worker” threads will continuously perform tasks as they become available.

The tasks queue is a queue of functions to be completed. Worker threads obtain their tasks from here, popping and performing the first task. Since this is a shared resource that all worker threads will be accessing, you should be careful about using synchronization to prevent race conditions.

To prevent such issues, you should use the queueMutex mutex and the condition condition_variable. These can be used to conditionally lock access to the tasks queue.

The stop boolean should notify threads when there will no longer be any tasks added to the queue. However, it does not mean that the current tasks should be stopped. All tasks in the queue should run to completion, even if stop is set to true or the destructor is called.

The constructor is responsible for setting up the threads. Num_threads threads should be created, and each of these threads should have the behavior described above.

The destructor should set the stop boolean to true without preventing active tasks from running to completion. (hint, that logic can be done partially in either the destructor or constructor.) It should also join all threads once they all run the queued tasks to completion.

The enqueue function should add a new task to the tasks queue. As such, it may need to wait for access to the queue and notify threads that the queue has been updated on completion.

Common utils

We have added a new request type that is more computationally expensive, but can be done concurrently. The new request type is EARN_INTEREST, which can be sent to the finance server to earn interest on all active accounts. The desired behavior is explained below

Finance server

Here, you will both implement the handling for the new EARN_INTEREST request as well as utilize the ThreadPool implementation you create.

void applyInterest(Account& account);

applyInterest will check that an account is both active and has a positive balance. It will then increase the balance by a flat 1% rate.

You can check and increase the number of cores (and accordingly threads) in your vm settings.

else if (r.type == EARN_INTEREST) {  
            try {  
                // TODO: Create a ThreadPool and add all tasks to it  
            } catch (const std::exception& e) {  
                // basic error handling as you decide, and update response with false success  
            }  
}

When the finance server receives “EARN_INTEREST” requests, it should attempt to create a threadPool and add applyInterest functions to it with each account as the parameter, regardless of account status.

If any of this fails, you do not need to revert any updated accounts. Simply have any basic error handling and set the response success value to false.

Client

The client has already been updated for your convenience, and you do not need to make any modifications. We have updated the menu to allow users to select “Update Interest for All Accounts” as option 9. This is only available to user 0.

What is a Thread Pool?


A thread pool is a group of persistent and reusable threads. Because the threads are pre-instantiated, reusable, and typically persist throughout a program’s lifetime, they can efficiently get the advantages of concurrency without the overhead of creating new threads (although we do not take full advantage of persistence in our program by destroying our threads early, our ThreadPool implementation is still capable of the same thing). Thread Pools are typically most useful when tasks are continuously created. Some ThreadPool implementations are even capable of resizing to fit changes in workload, but we will not do this.

Refer to the slides, readings, or online resources such as this to learn more.

Tasks

Implement ThreadPool Constructor (50 pts)

Implement ThreadPool Destructor (20 pts)

Implement ThreadPool Enqueue (15 pts)

Implement finance server (15 pts)

EARN_INTEREST Request Handling

Implement applyInterest

Implementation

thread_pool.cpp

Implement ThreadPool Constructor

The constructor should create numThreads worker threads, and each worker should:

  1. wait until there is a task available or the pool is stopping
  2. take one task from the queue while holding the lock
  3. release the lock
  4. run the task outside the lock
  5. decrement activeTasks when finished
  6. exit only when stop == true and there are no tasks left

Correct implementation:

ThreadPool::ThreadPool(size_t numThreads) : stop(false), activeTasks(0) {
    for (size_t i = 0; i < numThreads; i++) {
        workers.emplace_back([this]() {
            while (true) {
                std::function<void()> task;

                {
                    std::unique_lock<std::mutex> lock(queueMutex);

                    condition.wait(lock, [this]() {
                        return stop || !tasks.empty();
                    });

                    if (stop && tasks.empty()) {
                        return;
                    }

                    task = tasks.front();
                    tasks.pop();
                    activeTasks++;
                }

                task();

                activeTasks--;
            }
        });
    }
}

Explanation

When you create a ThreadPool, you are creating a group of worker threads that sit around waiting for jobs.

So if you do:

ThreadPool pool(4);

you want 4 worker threads to be created immediately.

Those threads do not know ahead of time what tasks they will run. They just keep checking the shared tasks queue and processing work as it appears.

That is the whole purpose of a thread pool:

Why each worker is a lambda with [this]

This part:

workers.emplace_back([this]() {

creates a new std::thread and gives it a lambda function to run.

Why [this]?

Because the worker needs access to the current object’s members:

Those are all inside the ThreadPool object, so the lambda captures this.

Without [this], the lambda would not be able to access those members.

Why there is an infinite loop

Inside each worker:

while (true) {

The worker thread is supposed to stay alive and keep processing tasks until the pool is shutting down.

So each worker repeatedly does:

If there were no loop, each worker would only do one task and then die, which would defeat the point of a thread pool.

Why we declare std::function<void()> task;

std::function<void()> task;

Each item in the queue is a task represented as a function that takes no arguments and returns nothing.

So before running work, the thread needs a local variable to hold the task it removes from the queue.

That local copy is important because:

Why the lock is inside its own block

This part:

{
    std::unique_lock<std::mutex> lock(queueMutex);
	
    ...
}

is very important.

The braces create a small scope so that the lock is automatically released when the block ends.

That means:

This matches the lab instruction that the thread should release the lock before completing a task so other threads can modify the queue.

Why we use std::unique_lock<std::mutex>

std::unique_lock<std::mutex> lock(queueMutex);

This locks queueMutex, which protects the shared task queue and related shared state.

We need synchronization because multiple threads may try to do things like:

at the same time.

Without the mutex, two threads could both think they got the same task, or one thread could read the queue while another is modifying it.

That would be a race condition.

Why we use condition.wait(...)

condition.wait(lock, [this]() {
    return stop || !tasks.empty();
});

This is how worker threads sleep efficiently until something useful happens.

The worker should wake up when either:

Why not just keep looping and checking?

Because that would waste CPU time. The thread would be busy doing nothing.

The condition variable lets the thread block efficiently until another thread signals it.

Why use the predicate version?

Because condition variables can wake up spuriously, meaning they may wake up even if nothing meaningful happened.

So the predicate:

stop || !tasks.empty()

forces the worker to continue only when there is actually a reason to proceed.

Why we check if (stop && tasks.empty())

if (stop && tasks.empty()) {
    return;
}

This is the shutdown condition.

The lab says the thread should return only once:

That is exactly what this condition means.

Why both parts matter

Only when:

should the worker exit.

Why we pop the task while holding the lock

task = tasks.front();
tasks.pop();
activeTasks++;

These operations happen while the mutex is still locked.

That is necessary because tasks is shared among all workers.

If two workers tried to do front() and pop() at the same time without locking, they could corrupt the queue or take the same task.

So the safe sequence is:

  1. lock the mutex
  2. take the first task
  3. remove it from the queue
  4. increment activeTasks
  5. unlock the mutex

Why activeTasks++ happens before running the task

activeTasks++;

This keeps track of how many tasks are currently being executed by workers.

That is different from how many tasks are waiting in the queue.

Example

Suppose:

Then:

That information is useful later for cleanup/shutdown logic.

You want to increment it right after a worker claims a task, because that task is no longer just “queued” — it is now “active.”

Why the task runs outside the lock

After the block ends, the mutex is released, and then:

task();

runs.

This is one of the most important parts of the design.

The lab explicitly says the worker should release the lock before completing a task. That is exactly what this does.

Why?

Because tasks might take time.

If a worker kept the queue locked while running the task:

By releasing the lock before task(), other threads are free to:

That is what makes the pool concurrent.

Why activeTasks-- happens after task()

activeTasks--;

Once the task finishes, it is no longer active.

So:

That way activeTasks accurately reflects how many tasks are currently running.

Since activeTasks is an atomic integer in your header, incrementing and decrementing it this way is okay. The header declares it as std::atomic<int> activeTasks;.

The main concurrency idea

This constructor is setting up a classic producer-consumer system:
• producer side: enqueue() will later add tasks to tasks
• consumer side: worker threads remove tasks and execute them

Shared resources:
• tasks
• stop

Protected by:
• queueMutex
• condition

Progress tracking:
• activeTasks

That is the structure you are building here.

Implement ThreadPool Destructor

The constructor created worker threads that keep looping until:

So the destructor’s job is to trigger that shutdown and then wait for every worker thread to finish cleanly.

Correct implementation:

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }

    condition.notify_all();

    for thread &worker : workers {
        if (worker.joinable()) {
            worker.join();
        }
    }
}

What each part is doing

1. Lock the mutex before changing stop
{
    std::unique_lock<std::mutex> lock(queueMutex);
    stop = true;
}

This is important because stop is shared with all the worker threads.

Your worker threads are doing things like:

condition.wait(lock, [this]() {
    return stop || !tasks.empty();
});

and later:

if (stop && tasks.empty()) {
    return;
}

So if the destructor changes stop, that change must be synchronized with the worker threads.

By locking queueMutex before setting stop = true, you ensure that access to shared thread-pool state is coordinated properly.

Why the braces?

The braces create a small scope so the lock is released before notify_all() runs.

That is a common pattern:

2. Set stop = true
stop = true;

This tells the workers:

This does not mean “drop all queued tasks immediately.”

Because your worker logic is:

if (stop && tasks.empty()) {
    return;
}

So if there are still tasks in the queue, workers will keep processing them.

That is exactly what the lab wants:

3. Wake up all worker threads
condition.notify_all();

This is necessary because some worker threads may be asleep in:

condition.wait(...)

If you only set stop = true but never notify them, they might stay sleeping forever.

notify_all() wakes every waiting worker so they can re-check the condition.

After waking up, each worker will do one of two things:

That is the shutdown mechanism.

4. Join all worker threads
for thread &worker : workers {
    if (worker.joinable()) {
        worker.join();
    }
}

This waits for every worker thread to finish before the destructor returns.

Why is this required?

Because if a std::thread object is destroyed while still joinable, the program calls std::terminate().

So before the workers vector is destroyed, each thread must be:

In a thread pool, you almost always want join(), because you want orderly shutdown.

Why joinable()?

Big picture of destructor flow

When the thread pool is being destroyed:

  1. mark the pool as stopping
  2. wake up all workers
  3. let workers finish remaining queued tasks
  4. once no tasks remain, workers exit their loops
  5. join all workers
  6. destructor returns safely

That is the intended lifecycle.

Why not hold the lock during join()

You should not do this:

std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
condition.notify_all();

for (...) {
    worker.join();
}

because then the main thread would still be holding queueMutex while waiting for workers to finish.

That is bad because the workers may need that mutex to:

If the destructor kept the mutex locked while joining, you could easily deadlock.

That is why the lock is scoped only around:

stop = true;

and released before notify_all() and join().

Implement ThreadPool Enqueue

enqueue() is the function that puts new work into the thread pool.

Think of the thread pool as having:

So enqueue(task) means:

Correct implementation:

void ThreadPool::enqueuefunction<void()> task {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        tasks.push(task);
    }

    condition.notify_one();
}

What this function is supposed to do

The instructions say:

That means:

  1. lock the shared queue
  2. push the new task into it
  3. unlock the queue
  4. notify a worker thread that work is available

First: what is std::function<void()>?

This type can look scary at first, but it is just a way to store “something callable.”

std::function<void()> task

means:

So examples of things that match this type are:

[]() {
    std::cout << "Hello\n";
}

or

someFunction

So inside the thread pool, each queued item is basically:

What is tasks?

In your thread pool, tasks is a queue.

A queue is a data structure that works like a line at a store:

So when enqueue() runs:

tasks.push(task);

Later, a worker thread does something like:

task = tasks.front();
tasks.pop();

That is why this is called a queue.

Why do we need a mutex?

The queue is shared by multiple threads:

If two threads access the queue at once without protection, bad things can happen:

That is called a race condition.

To prevent that, we use a mutex.

What is a mutex?

A mutex is like a key to a room.

Here, the “room” is access to shared data like:

Your mutex is:

queueMutex

So before touching tasks, a thread should lock queueMutex.

What is std::unique_lock<std::mutex>?

This line:

std::unique_lock<std::mutex> lock(queueMutex);

means:

This is safer than manually doing:

because automatic unlocking helps avoid mistakes.

So in this block:

{
    std::unique_lock<std::mutex> lock(queueMutex);
    tasks.push(task);
}

Why do we lock only around tasks.push(task)?

Because the only shared resource we are modifying here is the task queue.

So we want to keep the critical section as short as possible:

{
    std::unique_lock<std::mutex> lock(queueMutex);
    tasks.push(task);
}

This is good because:

Short critical sections are important in concurrency, because they reduce waiting and contention between threads.

If you notified while still holding the lock, the worker might wake up immediately but then still have to wait for the lock anyway.

Why do we call condition.notify_one()?

After pushing a task into the queue, we need to wake up a worker thread.

Remember, in your constructor the workers are probably doing something like:

condition.wait(lock, [this]() {
    return stop || !tasks.empty();
});

That means workers may be sleeping, waiting for:

When we add a task, we should notify one of them:

condition.notify_one();

This means:

That awakened worker can then:

Why notify_one() instead of notify_all()?

Because adding one task usually only requires one worker to wake up.

If you used notify_all():

That is inefficient.

So for a single new task:

finance.cpp

EARN_INTEREST Request Handling

The lab says that for an EARN_INTEREST request, you should:

So the idea is:

  1. make a pool with numThreads worker threads
  2. loop through all accounts
  3. for each account, add a task to the pool
  4. each task calls applyInterest() on one account
  5. when the pool goes out of scope, its destructor joins all worker threads, so all tasks finish before you send the response

That matches the thread-pool behavior you just implemented: the pool starts workers in the constructor, workers pull tasks from the queue, and the destructor joins workers after remaining tasks are finished.

Correct implementation:

else if (r.type == EARN_INTEREST) {
    try {
        int numThreads = 2;
        if (r.amount > 0) numThreads = r.amount;

        ThreadPool pool(numThreads);

        for (int i = 0; i < max_accounts; i++) {
            pool.enqueue([accounts, i]() {
                applyInterest(accounts[i]);
            });
        }

        resp.message = "Interest applied successfully";
    } catch (const std::exception& e) {
        resp.success = false;
        resp.message = e.what();
    }
}

Big picture first

You have:

Account* accounts = new Account[max_accounts];

That means accounts points to an array of Account objects in memory.

You can think of it like this:

Each one is a bank account object.

For EARN_INTEREST, you want to apply interest to all of them.

Instead of doing it one-by-one in the main thread, the lab wants you to split that work across multiple worker threads using the thread pool.

Step 1: create the thread pool

ThreadPool pool(numThreads);

This creates a ThreadPool object named pool.

When this line runs:

So if numThreads == 4, you now have 4 worker threads ready to process queued work.

Step 2: loop through every account

for (int i = 0; i < max_accounts; i++) {

This goes through every valid index in the accounts array.

That is important because the instructions say:

So you do not skip inactive accounts.

You simply create one task per account.

Why?

Step 3: enqueue one task per account

pool.enqueue([accounts, i]() {
    applyInterest(accounts[i]);
});

This is the most important line, so let’s unpack it carefully.

What is this thing?

This lambda:

[accounts, i]() {
    applyInterest(accounts[i]);
}

means:

This matches the thread pool’s expected task type:

std::function<void()>

which means:

So each queued task is:

Why do we capture accounts and i?
accounts is a pointer to the array of accounts.

By capturing it:

[accounts, i]

each task knows where the account array is stored.

i is the current loop index.

This is very important.

Why capture i by value and not by reference?
This is one of the biggest concurrency/lambda gotchas.

Suppose you wrote this instead:

pool.enqueue([&accounts, &i]() {
    accounts[i].applyInterest();
});

That would be dangerous, because all the tasks would share the same i variable from the loop.

But the loop keeps changing i:

By the time worker threads actually run the task, the loop may already have moved on, or even finished.

That could cause:

So you want:

[accounts, i]

Capturing i by value means:

Why is it okay to capture accounts by value?
Because accounts is a pointer.

Step 4: why we do not manually wait here

You might wonder:

The answer is: the thread pool destructor does that for you.

Because pool is a local variable inside the try block:

try {
    ThreadPool pool(numThreads);

    for (...) {
        pool.enqueue(...);
    }

    resp.message = "Interest applied successfully";
}

when execution leaves that block, pool is destroyed.

Its destructor runs, and based on your lab TODO, it:

So by the time control leaves the try block, all applyInterest() tasks should already be finished.

That means it is safe to send the response after that.

Step 5: handle errors in the catch block

The TODO says:

So inside the catch, do:

resp.success = false;
resp.message = e.what();

What this means

e.what() is a C++ standard way to get the text message from an exception.

For example, if your enqueue() later throws on a stopped pool, or something else goes wrong, the client gets a meaningful failure message.

Conceptual summary

For EARN_INTEREST, you are doing this:

Implement applyInterest

Correct implementation:

void applyInterest(Account& account) {
    if (!account.active || account.balance <= 0) {
        return;
    }

    account.balance *= 1.01;
}

Applying the 1% increase

account.balance *= 1.01;

This means:

Example:
If balance is:

Important detail: type of balance
This works correctly if balance is a floating-point type like:

double

If balance were an integer, then:

account.balance *= 1.01;

would truncate and not behave correctly.

Connected Pages
On this page
  • Instructions
    1. Introduction
    2. Starter Code
      1. Code Structure
      2. Objectives
      3. Getting Started
        1. thread_pool.h, thread_pool.cpp:
      4. Common utils
      5. Finance server
      6. Client
    3. What is a Thread Pool?
    4. Tasks
      1. Implement ThreadPool Constructor (50 pts)
      2. Implement ThreadPool Destructor (20 pts)
      3. Implement ThreadPool Enqueue (15 pts)
      4. Implement finance server (15 pts)
        1. EARN_INTEREST Request Handling
        2. Implement applyInterest
  • Implementation thread_pool.cpp Implement ThreadPool Constructor
  • Explanation
  • Why each worker is a lambda with [this]
  • Why there is an infinite loop
  • Why we declare std::function task;
  • Why the lock is inside its own block
  • Why we use std::unique_lock
  • Why we use condition.wait(...)
  • Why we check if (stop && tasks.empty())
  • Why we pop the task while holding the lock
  • Why activeTasks++ happens before running the task
  • Why the task runs outside the lock
  • Why activeTasks-- happens after task()
  • The main concurrency idea
  • Implement ThreadPool Destructor
    1. What each part is doing
      1. 1. Lock the mutex before changing stop
      2. 2. Set stop = true
      3. 3. Wake up all worker threads
      4. 4. Join all worker threads
    2. Big picture of destructor flow
    3. Why not hold the lock during join()
  • Implement ThreadPool Enqueue
  • What this function is supposed to do
  • First: what is std::function?
  • What is tasks?
  • Why do we need a mutex?
  • What is a mutex?
  • What is std::unique_lock?
  • Why do we lock only around tasks.push(task)?
  • Why do we call condition.notify_one()?
  • Why notify_one() instead of notify_all()?
  • finance.cpp
    1. EARN_INTEREST Request Handling
      1. Big picture first
      2. Step 1: create the thread pool
      3. Step 2: loop through every account
      4. Step 3: enqueue one task per account
      5. Step 4: why we do not manually wait here
      6. Step 5: handle errors in the catch block
      7. Conceptual summary
    2. Implement applyInterest
      1. Applying the 1% increase