Lab 5 - Thread Pools
Class: CSCE-313
Notes:
Instructions
Introduction
In this lab, you will modify the banking system code to use concurrency by implementing a thread pool. This thread pool will manage several worker threads and tasks to split several tasks among its threads.
Starter Code
Code Structure
- ThreadPool files (thread_pool.h, thread_pool.cpp): These files contain skeleton code for a thread pool implementation. You will need to modify the cpp file and implement the functions.
- Server Programs (finance.cpp, file.cpp, logging.cpp): These files implement the three server components of the banking system. The finance server will need to be modified during this lab, but the file and logging servers are complete.
- Client Program (client.cpp): The main user interface for the banking system. Slight modifications were made to allow users to select a new action, but this is complete and implemented for you.
- Makefile (Makefile): This file compiles and builds the source files when you type the make command in the terminal. You can use ‘make’ to compile, ‘make clean’ to remove executables, and ‘make distclean’ to remove both the executables and the files the executables generate.
- Channel.h/cpp: Implements the communication channel between client and servers. These files are complete and should not be modified
- Utilities (common.h): This file contains useful classes and functions shared between the server and the client, such as Request and Response definitions. Do not modify these files.
Objectives
By the end of this lab, you should be familiar with:
- What a thread pool is and its benefits
- How to create new threads and use them for concurrency
- How to synchronize threads to prevent race conditions
Getting Started
Read through the code. There have been changes made to only a few files, so the overall structure and function are the same.
thread_pool.h, thread_pool.cpp:
This is a new addition. You will be able to use this to improve processing time in a controlled manner.
thread_pool.h
class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
std::atomic<int> activeTasks;
public:
ThreadPool(size_t numThreads);
~ThreadPool();
void enqueuefunction<void()> task;
};
ThreadPool contains a number of private variables and only makes public a constructor, destructor, and enqueue function. Let’s walk through each of these.
The workers vector contains as many threads as desired. These “worker” threads will continuously perform tasks as they become available.
The tasks queue is a queue of functions to be completed. Worker threads obtain their tasks from here, popping and performing the first task. Since this is a shared resource that all worker threads will be accessing, you should be careful about using synchronization to prevent race conditions.
To prevent such issues, you should use the queueMutex mutex and the condition condition_variable. These can be used to conditionally lock access to the tasks queue.
The stop boolean should notify threads when there will no longer be any tasks added to the queue. However, it does not mean that the current tasks should be stopped. All tasks in the queue should run to completion, even if stop is set to true or the destructor is called.
The constructor is responsible for setting up the threads. Num_threads threads should be created, and each of these threads should have the behavior described above.
The destructor should set the stop boolean to true without preventing active tasks from running to completion. (hint, that logic can be done partially in either the destructor or constructor.) It should also join all threads once they all run the queued tasks to completion.
The enqueue function should add a new task to the tasks queue. As such, it may need to wait for access to the queue and notify threads that the queue has been updated on completion.
Common utils
We have added a new request type that is more computationally expensive, but can be done concurrently. The new request type is EARN_INTEREST, which can be sent to the finance server to earn interest on all active accounts. The desired behavior is explained below
Finance server
Here, you will both implement the handling for the new EARN_INTEREST request as well as utilize the ThreadPool implementation you create.
void applyInterest(Account& account);
applyInterest will check that an account is both active and has a positive balance. It will then increase the balance by a flat 1% rate.
You can check and increase the number of cores (and accordingly threads) in your vm settings.
else if (r.type == EARN_INTEREST) {
try {
// TODO: Create a ThreadPool and add all tasks to it
} catch (const std::exception& e) {
// basic error handling as you decide, and update response with false success
}
}
When the finance server receives “EARN_INTEREST” requests, it should attempt to create a threadPool and add applyInterest functions to it with each account as the parameter, regardless of account status.
If any of this fails, you do not need to revert any updated accounts. Simply have any basic error handling and set the response success value to false.
Client
The client has already been updated for your convenience, and you do not need to make any modifications. We have updated the menu to allow users to select “Update Interest for All Accounts” as option 9. This is only available to user 0.
What is a Thread Pool?
A thread pool is a group of persistent and reusable threads. Because the threads are pre-instantiated, reusable, and typically persist throughout a program’s lifetime, they can efficiently get the advantages of concurrency without the overhead of creating new threads (although we do not take full advantage of persistence in our program by destroying our threads early, our ThreadPool implementation is still capable of the same thing). Thread Pools are typically most useful when tasks are continuously created. Some ThreadPool implementations are even capable of resizing to fit changes in workload, but we will not do this.
Refer to the slides, readings, or online resources such as this to learn more.
Tasks
Implement ThreadPool Constructor (50 pts)
- Create num_threads threads
- Each thread should synchronously access the tasks queue to obtain a task when stop is not true, and there are available tasks.
- Each thread should release the lock before completing a task to allow other threads to modify the queue
Implement ThreadPool Destructor (20 pts)
- Should join all worker threads once all tasks are completed
Implement ThreadPool Enqueue (15 pts)
- Should synchronously access the queue and add a new task to the end
Implement finance server (15 pts)
EARN_INTEREST Request Handling
- Create an instance of the thread pool
- Enqueue an “applyInterest” task for every account (regardless of account status)
Implement applyInterest
- If an account is active and it has a positive balance, increase the balance by 1%
- Otherwise, do not make any changes
Implementation
thread_pool.cpp
Implement ThreadPool Constructor
The constructor should create numThreads worker threads, and each worker should:
- wait until there is a task available or the pool is stopping
- take one task from the queue while holding the lock
- release the lock
- run the task outside the lock
- decrement activeTasks when finished
- exit only when stop == true and there are no tasks left
Correct implementation:
ThreadPool::ThreadPool(size_t numThreads) : stop(false), activeTasks(0) {
for (size_t i = 0; i < numThreads; i++) {
workers.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this]() {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = tasks.front();
tasks.pop();
activeTasks++;
}
task();
activeTasks--;
}
});
}
}
Explanation
When you create a ThreadPool, you are creating a group of worker threads that sit around waiting for jobs.
So if you do:
ThreadPool pool(4);
you want 4 worker threads to be created immediately.
Those threads do not know ahead of time what tasks they will run. They just keep checking the shared tasks queue and processing work as it appears.
That is the whole purpose of a thread pool:
- create threads once
- reuse them for many tasks
- avoid making a brand-new thread for every tiny job
Why each worker is a lambda with [this]
This part:
workers.emplace_back([this]() {
creates a new std::thread and gives it a lambda function to run.
Why [this]?
Because the worker needs access to the current object’s members:
- tasks
- queueMutex
- condition
- stop
- activeTasks
Those are all inside the ThreadPool object, so the lambda captures this.
Without [this], the lambda would not be able to access those members.
Why there is an infinite loop
Inside each worker:
while (true) {
The worker thread is supposed to stay alive and keep processing tasks until the pool is shutting down.
So each worker repeatedly does:
- wait for work
- get one task
- run it
- go back and wait again
If there were no loop, each worker would only do one task and then die, which would defeat the point of a thread pool.
Why we declare std::function<void()> task;
std::function<void()> task;
Each item in the queue is a task represented as a function that takes no arguments and returns nothing.
So before running work, the thread needs a local variable to hold the task it removes from the queue.
That local copy is important because:
- we want to pop the task from the shared queue
- then release the lock
- then run the task independently
Why the lock is inside its own block
This part:
{
std::unique_lock<std::mutex> lock(queueMutex);
...
}
is very important.
The braces create a small scope so that the lock is automatically released when the block ends.
That means:
- inside the block: protected access to shared data
- outside the block: no lock held
This matches the lab instruction that the thread should release the lock before completing a task so other threads can modify the queue.
Why we use std::unique_lock<std::mutex>
std::unique_lock<std::mutex> lock(queueMutex);
This locks queueMutex, which protects the shared task queue and related shared state.
We need synchronization because multiple threads may try to do things like:
- inspect
tasks.empty() - pop from tasks
- check stop
at the same time.
Without the mutex, two threads could both think they got the same task, or one thread could read the queue while another is modifying it.
That would be a race condition.
Why we use condition.wait(...)
condition.wait(lock, [this]() {
return stop || !tasks.empty();
});
This is how worker threads sleep efficiently until something useful happens.
The worker should wake up when either:
- stop becomes true, meaning shutdown is happening
- the queue is not empty, meaning there is work to do
Why not just keep looping and checking?
Because that would waste CPU time. The thread would be busy doing nothing.
The condition variable lets the thread block efficiently until another thread signals it.
Why use the predicate version?
Because condition variables can wake up spuriously, meaning they may wake up even if nothing meaningful happened.
So the predicate:
stop || !tasks.empty()
forces the worker to continue only when there is actually a reason to proceed.
Why we check if (stop && tasks.empty())
if (stop && tasks.empty()) {
return;
}
This is the shutdown condition.
The lab says the thread should return only once:
- all tasks are completed
- and stop is true
That is exactly what this condition means.
Why both parts matter
- If
stop == truebut the queue still has tasks, workers should still keep processing the remaining tasks. - If the queue is empty but
stop == false, workers should keep waiting because more tasks may be added later.
Only when:
- the pool is stopping
- and no tasks remain
should the worker exit.
Why we pop the task while holding the lock
task = tasks.front();
tasks.pop();
activeTasks++;
These operations happen while the mutex is still locked.
That is necessary because tasks is shared among all workers.
If two workers tried to do front() and pop() at the same time without locking, they could corrupt the queue or take the same task.
So the safe sequence is:
- lock the mutex
- take the first task
- remove it from the queue
- increment activeTasks
- unlock the mutex
Why activeTasks++ happens before running the task
activeTasks++;
This keeps track of how many tasks are currently being executed by workers.
That is different from how many tasks are waiting in the queue.
Example
Suppose:
- 2 tasks are running right now
- 3 more are still waiting in the queue
Then:
- activeTasks == 2
- tasks.size() == 3
That information is useful later for cleanup/shutdown logic.
You want to increment it right after a worker claims a task, because that task is no longer just “queued” — it is now “active.”
Why the task runs outside the lock
After the block ends, the mutex is released, and then:
task();
runs.
This is one of the most important parts of the design.
The lab explicitly says the worker should release the lock before completing a task. That is exactly what this does.
Why?
Because tasks might take time.
If a worker kept the queue locked while running the task:
- no other worker could take another task
- no producer thread could add tasks safely
- the whole thread pool would become almost serialized
By releasing the lock before task(), other threads are free to:
- enqueue new tasks
- grab queued tasks
- keep the pool busy
That is what makes the pool concurrent.
Why activeTasks-- happens after task()
activeTasks--;
Once the task finishes, it is no longer active.
So:
- increment before starting
- decrement after finishing
That way activeTasks accurately reflects how many tasks are currently running.
Since activeTasks is an atomic integer in your header, incrementing and decrementing it this way is okay. The header declares it as std::atomic<int> activeTasks;.
The main concurrency idea
This constructor is setting up a classic producer-consumer system:
• producer side: enqueue() will later add tasks to tasks
• consumer side: worker threads remove tasks and execute them
Shared resources:
• tasks
• stop
Protected by:
• queueMutex
• condition
Progress tracking:
• activeTasks
That is the structure you are building here.
Implement ThreadPool Destructor
The constructor created worker threads that keep looping until:
stop == true- and there are no remaining tasks in the queue
So the destructor’s job is to trigger that shutdown and then wait for every worker thread to finish cleanly.
Correct implementation:
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for thread &worker : workers {
if (worker.joinable()) {
worker.join();
}
}
}
What each part is doing
1. Lock the mutex before changing stop
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
This is important because stop is shared with all the worker threads.
Your worker threads are doing things like:
condition.wait(lock, [this]() {
return stop || !tasks.empty();
});
and later:
if (stop && tasks.empty()) {
return;
}
So if the destructor changes stop, that change must be synchronized with the worker threads.
By locking queueMutex before setting stop = true, you ensure that access to shared thread-pool state is coordinated properly.
Why the braces?
The braces create a small scope so the lock is released before notify_all() runs.
That is a common pattern:
- lock
- modify shared state
- unlock
- notify waiting threads
2. Set stop = true
stop = true;
This tells the workers:
- no more normal work should be expected
- once the queue is empty, exit the worker loop
This does not mean “drop all queued tasks immediately.”
Because your worker logic is:
if (stop && tasks.empty()) {
return;
}
So if there are still tasks in the queue, workers will keep processing them.
That is exactly what the lab wants:
- complete all tasks
- then stop and join the workers
3. Wake up all worker threads
condition.notify_all();
This is necessary because some worker threads may be asleep in:
condition.wait(...)
If you only set stop = true but never notify them, they might stay sleeping forever.
notify_all() wakes every waiting worker so they can re-check the condition.
After waking up, each worker will do one of two things:
- if there are still tasks, it takes a task and runs it
- if
stop == trueand the queue is empty, it exits
That is the shutdown mechanism.
4. Join all worker threads
for thread &worker : workers {
if (worker.joinable()) {
worker.join();
}
}
This waits for every worker thread to finish before the destructor returns.
Why is this required?
Because if a std::thread object is destroyed while still joinable, the program calls std::terminate().
So before the workers vector is destroyed, each thread must be:
- joined, or
- detached
In a thread pool, you almost always want join(), because you want orderly shutdown.
Why joinable()?
- This is a safety check. It ensures the thread is in a state where it can be joined.
Big picture of destructor flow
When the thread pool is being destroyed:
- mark the pool as stopping
- wake up all workers
- let workers finish remaining queued tasks
- once no tasks remain, workers exit their loops
- join all workers
- destructor returns safely
That is the intended lifecycle.
Why not hold the lock during join()
You should not do this:
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
condition.notify_all();
for (...) {
worker.join();
}
because then the main thread would still be holding queueMutex while waiting for workers to finish.
That is bad because the workers may need that mutex to:
- wake up
- check the queue
- pop tasks
- exit cleanly
If the destructor kept the mutex locked while joining, you could easily deadlock.
That is why the lock is scoped only around:
stop = true;
and released before notify_all() and join().
Implement ThreadPool Enqueue
enqueue() is the function that puts new work into the thread pool.
Think of the thread pool as having:
- a line of tasks waiting to be done → tasks
- a set of worker threads waiting for work → workers
So enqueue(task) means:
- “take this task and put it at the back of the waiting line”
- “then wake up one worker so it can do it”
Correct implementation:
void ThreadPool::enqueuefunction<void()> task {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(task);
}
condition.notify_one();
}
What this function is supposed to do
The instructions say:
- synchronously access the queue
- add a new task to the end
That means:
- lock the shared queue
- push the new task into it
- unlock the queue
- notify a worker thread that work is available
First: what is std::function<void()>?
This type can look scary at first, but it is just a way to store “something callable.”
std::function<void()> task
means:
- task is something you can call like a function
- it takes no arguments
- it returns nothing
So examples of things that match this type are:
[]() {
std::cout << "Hello\n";
}
or
someFunction
- if
someFunctiontakes no parameters and returns void.
So inside the thread pool, each queued item is basically:
- “a piece of work to run later”
What is tasks?
In your thread pool, tasks is a queue.
A queue is a data structure that works like a line at a store:
- first task added = first task removed
- new tasks go to the back
- workers take tasks from the front
So when enqueue() runs:
tasks.push(task);
- it adds the new task to the back of the queue.
Later, a worker thread does something like:
task = tasks.front();
tasks.pop();
- which takes the oldest waiting task.
That is why this is called a queue.
Why do we need a mutex?
The queue is shared by multiple threads:
- the main thread or producer thread may call
enqueue() - worker threads may remove tasks from the queue at the same time
If two threads access the queue at once without protection, bad things can happen:
- one thread could be pushing while another is popping
- the internal state of the queue could become corrupted
- two threads might see inconsistent data
That is called a race condition.
To prevent that, we use a mutex.
What is a mutex?
A mutex is like a key to a room.
- if one thread has the key, it can enter the room
- other threads must wait until the key is returned
Here, the “room” is access to shared data like:
- tasks
Your mutex is:
queueMutex
So before touching tasks, a thread should lock queueMutex.
What is std::unique_lock<std::mutex>?
This line:
std::unique_lock<std::mutex> lock(queueMutex);
means:
- lock queueMutex now
- keep it locked while lock exists
- automatically unlock it when lock goes out of scope
This is safer than manually doing:
- lock
- remember to unlock later
because automatic unlocking helps avoid mistakes.
So in this block:
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(task);
}
- the mutex is locked only for the duration of that block.
- As soon as the block ends, the lock is destroyed and the mutex is unlocked.
- That is why the braces matter.
Why do we lock only around tasks.push(task)?
Because the only shared resource we are modifying here is the task queue.
So we want to keep the critical section as short as possible:
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(task);
}
This is good because:
- we protect the queue
- but we do not hold the lock longer than necessary
Short critical sections are important in concurrency, because they reduce waiting and contention between threads.
If you notified while still holding the lock, the worker might wake up immediately but then still have to wait for the lock anyway.
Why do we call condition.notify_one()?
After pushing a task into the queue, we need to wake up a worker thread.
Remember, in your constructor the workers are probably doing something like:
condition.wait(lock, [this]() {
return stop || !tasks.empty();
});
That means workers may be sleeping, waiting for:
- a task to appear
- or the pool to stop
When we add a task, we should notify one of them:
condition.notify_one();
This means:
- “wake up one waiting thread”
That awakened worker can then:
- lock the queue
- see that tasks is not empty
- remove the task
- execute it
Why notify_one() instead of notify_all()?
Because adding one task usually only requires one worker to wake up.
If you used notify_all():
- all workers might wake up
- but only one gets the task
- the others wake up unnecessarily and go back to sleep
That is inefficient.
So for a single new task:
notify_one()is the better choice
finance.cpp
EARN_INTEREST Request Handling
The lab says that for an EARN_INTEREST request, you should:
- create a thread pool
- enqueue one
applyInteresttask for every account - do this regardless of whether the account is active
So the idea is:
- make a pool with
numThreadsworker threads - loop through all accounts
- for each account, add a task to the pool
- each task calls
applyInterest()on one account - when the pool goes out of scope, its destructor joins all worker threads, so all tasks finish before you send the response
That matches the thread-pool behavior you just implemented: the pool starts workers in the constructor, workers pull tasks from the queue, and the destructor joins workers after remaining tasks are finished.
Correct implementation:
else if (r.type == EARN_INTEREST) {
try {
int numThreads = 2;
if (r.amount > 0) numThreads = r.amount;
ThreadPool pool(numThreads);
for (int i = 0; i < max_accounts; i++) {
pool.enqueue([accounts, i]() {
applyInterest(accounts[i]);
});
}
resp.message = "Interest applied successfully";
} catch (const std::exception& e) {
resp.success = false;
resp.message = e.what();
}
}
Big picture first
You have:
Account* accounts = new Account[max_accounts];
That means accounts points to an array of Account objects in memory.
You can think of it like this:
accounts[0]accounts[1]accounts[2]- …
accounts[max_accounts - 1]
Each one is a bank account object.
For EARN_INTEREST, you want to apply interest to all of them.
Instead of doing it one-by-one in the main thread, the lab wants you to split that work across multiple worker threads using the thread pool.
Step 1: create the thread pool
ThreadPool pool(numThreads);
This creates a ThreadPool object named pool.
When this line runs:
- the constructor is called
- numThreads worker threads are created
- those workers start waiting for tasks
So if numThreads == 4, you now have 4 worker threads ready to process queued work.
Step 2: loop through every account
for (int i = 0; i < max_accounts; i++) {
This goes through every valid index in the accounts array.
That is important because the instructions say:
- enqueue an applyInterest task for every account
- regardless of account status
So you do not skip inactive accounts.
- You do not check
accounts[i].activehere.
You simply create one task per account.
Why?
- Because the lab explicitly says to do it for every account.
- Usually this means one of two things:
- either
applyInterest()itself safely handles inactive accounts - or the assignment wants the tasks created uniformly for all accounts
- either
- Either way, based on the instruction, you should not filter them here.
Step 3: enqueue one task per account
pool.enqueue([accounts, i]() {
applyInterest(accounts[i]);
});
This is the most important line, so let’s unpack it carefully.
What is this thing?
- This is a lambda function.
- A lambda is just an anonymous function — a little chunk of code you can create inline without naming it separately.
This lambda:
[accounts, i]() {
applyInterest(accounts[i]);
}
means:
- capture accounts
- capture i
- take no parameters
- run
applyInterest(accounts[i]);
This matches the thread pool’s expected task type:
std::function<void()>
which means:
- a callable thing
- takes no arguments
- returns nothing
So each queued task is:
- “apply interest to account number i”
Why do we capture accounts and i?
accounts is a pointer to the array of accounts.
By capturing it:
[accounts, i]
each task knows where the account array is stored.
i is the current loop index.
This is very important.
- Each task needs to remember which account it is supposed to work on.
- So when
i = 7, the lambda for that iteration should apply interest toaccounts[7]. - By capturing
iby value, each task keeps its own copy of that index.
Why capture i by value and not by reference?
This is one of the biggest concurrency/lambda gotchas.
Suppose you wrote this instead:
pool.enqueue([&accounts, &i]() {
accounts[i].applyInterest();
});
That would be dangerous, because all the tasks would share the same i variable from the loop.
But the loop keeps changing i:
- first i = 0
- then i = 1
- then i = 2
- …
By the time worker threads actually run the task, the loop may already have moved on, or even finished.
That could cause:
- wrong account index
- repeated last index
- out-of-range access
So you want:
[accounts, i]
Capturing i by value means:
- each task stores its own copy of the number
- safe and correct
Why is it okay to capture accounts by value?
Because accounts is a pointer.
- Capturing it by value means:
- each lambda gets its own copy of the pointer
- all copies still point to the same array in memory
- So each task has:
- the same accounts pointer
- its own private i
Step 4: why we do not manually wait here
You might wonder:
- “How do I wait until all tasks are done?”
The answer is: the thread pool destructor does that for you.
Because pool is a local variable inside the try block:
try {
ThreadPool pool(numThreads);
for (...) {
pool.enqueue(...);
}
resp.message = "Interest applied successfully";
}
when execution leaves that block, pool is destroyed.
Its destructor runs, and based on your lab TODO, it:
- sets stop = true
- wakes worker threads
- joins them after queued tasks are complete
So by the time control leaves the try block, all applyInterest() tasks should already be finished.
That means it is safe to send the response after that.
Step 5: handle errors in the catch block
The TODO says:
- add error handling
- set the response to have a false success value
So inside the catch, do:
resp.success = false;
resp.message = e.what();
What this means
- If something throws a standard exception, then:
- the response is marked as failed
- the exception message is returned to the client
e.what() is a C++ standard way to get the text message from an exception.
For example, if your enqueue() later throws on a stopped pool, or something else goes wrong, the client gets a meaningful failure message.
Conceptual summary
For EARN_INTEREST, you are doing this:
- choose number of worker threads
- create a thread pool
- create one task per account
- each task calls applyInterest() on that account
- let the pool finish all tasks
- return a success response
- if anything throws, mark the response as failed
Implement applyInterest
Correct implementation:
void applyInterest(Account& account) {
if (!account.active || account.balance <= 0) {
return;
}
account.balance *= 1.01;
}
Applying the 1% increase
account.balance *= 1.01;
This means:
- multiply the current balance by 1.01
- which increases it by 1%
Example:
If balance is:
- 100 → becomes 101
- 200 → becomes 202
- 0 → unchanged (because we return early)
- -50 → unchanged
Important detail: type of balance
This works correctly if balance is a floating-point type like:
double
If balance were an integer, then:
account.balance *= 1.01;
would truncate and not behave correctly.