« Back to the CS1390 website
Assignment 1: Parallelism Techniques

Parts 1-2 (code only) due Tuesday, February 4th at 6:00pm EST
All parts due Wednesday, February 19th at 6:00pm EST
Introduction
With increasingly larger models and training data becoming the norm, it has become challenging to train machine learning models efficiently due to constraints in computational resources and memory. These challenges have led to the development of various parallelism strategies to distribute computation and memory usage across multiple devices or nodes.
In this assignment, you will explore two common parallelism techniques in training machine learning models: data parallelism (DDP) and model parallelism. After this, you will explore fully sharded data parallelism (FSDP) and pipeline parallelism, which address some of the limitations with each of these techniques respectively.
Logistics and due dates
The official lateness policy is described on the course website, in terms of the total number of late hours and how they can be split across assignments. The grading server will display for your final submission, if turned in late, the number of late hours accrued.
This assignment has two due dates, one as an initial check in (deliverables described below), and one where the final assignment will be submitted. You cannot use late hours on the initial check in.
Initial check in due on February 4th at 6 PM:
- By this point, you should complete the implementation tasks in Part 1 (DDP) and Part 2 (Model Parallelism), and submit your code to the grading server. We run the correctness tests for both these parts on your implementation.
- It is ok if your code does not pass the correctness tests; we are mostly looking to see that you have made an effort to complete these parts. This deadline is there to ensure that you have adequate time to complete the rest of the assignment!
- You do not need to finish the performance analysis or conceptual question sections for Parts 1 and 2 for this initial check-in.
Final assignment due on February 19th at 6 PM
- For your final submission, you should submit your code for all parts of the assignment to the grading server, and the writeup containing your generated graphs, explanations of the generated graphs, and answers to the math/conceptual questions.
- These questions are included in the
README.md
of the assignment repository; simply fill in your answers directly in the README.md
and generate your graphs as directed in the assignment (which your README.md
will automatically display).
We are still in the process of setting up the grading server; we will post instructions on registering with the grading server later this week.
We hope to have a way for you to submit runs of any parts of the assignment below so you can generate the data to graph on the grading server; we will have more information on this later.
Learning Goals
- Learn about some of the different ways to parallelize training machine learning models, and understand their tradeoffs with respect to the following:
- Memory Overhead: does this parallelism method require replicating any model state, causing extra memory overhead as we scale to multiple nodes?
- Communication Overhead: does this parallellism method require any communication between workers to synchronize any model state? how does that affect its scalability?
- Get hands on experience implementing these methods in PyTorch, and measuring and reasoning about the performance of your implementations.
A note about resources: Your implementation will run on a CPU to demonstrate parallel training. Because of this, the observed speedups may not actually match what one would see on a GPU. However, we have designed the conceptual questions and explorations in the assignment for you to understand what the method should be doing, so later, if you are ever in a situation where you use one of these methods for parallel training, you have a better understanding of its tradeoffs.
Assignment Installation
The Github classroom link with the starter code can be found here.
Using Containers for Development
For this project and project 4, we'll use a course Docker container for development, that you can run locally on your own machine. From your introductory systems course (CS300 or CS33), you should have Docker installed on your local machine (if not, refer to these instructions). Clone your assignment repostiory onto your machine, and navigate to it in your terminal (for Windows users, you should perform these steps in WSL).
You can then use our provided run_docker
script to pull and run the course container. To download the container, run the following commands:
chmod +x ./run_docker
./run_docker download
This will download the Docker image for the course container, as according to the platform you are running on.
To run the container, you can run the following command:
which will run the container, by default, with 4 CPUs and 4 GB of memory, and mount the current directory into your container. You can also customize these options if needed, as such:
./run_docker run -s [mount_dir] -c [num_cpus] -m [mem_in_gb]
After executing this command, you should enter the container and see the mounted directory as your work files within the current directory.
Instructions to run manually, if the script gives an error:
Try running the following command (if you are using Podman instead, just replace docker
with pod
):
For linux/amd
architecture:
docker pull cs1390mlsys/cs1390
docker tag cs1390mlsys/cs1390 cs1390
docker run -it --shm-size=1g --cpus=[cpu_num] -m=[memory] --mount type=bind,source=./,target=/home/cs1390-user cs1390 /bin/bash
For linux/arm
architecture:
docker pull cs1390mlsys/cs1390.arm
docker tag cs1390mlsys/cs1390.arm cs1390.arm
docker run -it --shm-size=1g --cpus=[cpu_num] -m=[memory] --mount type=bind,source=./,target=/home/cs1390-user cs1390.arm /bin/bash
If you can't get Docker or Podman to work, here are alternate instructions:
For projects 1 and 4, you can alternately use a local python environment. Note that project 1 requires the use of a function os.sched_affinity
, which is not implemented for MacOS. You can comment out this function (it is called within pin_to_core
in utils.py
) out for the purpose of getting the computation working, but your performance results may not make sense.
-
Install Miniconda with the instructions here. Take note of the location where Miniconda is installed, which we will call $PATH_TO_MINICONDA
.
-
Shell initialization: we recommend adding the Conda initialization to your shell configuration (which may have been done during the install). To do this, run:
source $PATH_TO_MINICONDA/bin/activate
conda init --all
- Check that Conda is properly is initialized:
-
We have found that even if you modify your .bashrc
/.zshrc
to include the Conda initialization sequence, Conda does not always initialize. Simply check if the Conda initialization sequence is inside your ~/.bashrc
, and if not, repeat step 2; if so, run source ~/.bashrc.
-
Create a new environment for cs1390. The referenced requirements file is in the root folder of this assignment's repository:
conda env create --name cs1390 python=3.12
conda activate cs1390
conda install pytorch torchvision -c pytorch
pip install -r requirements.txt
Part 1: Distributed Data Parallel (DDP)
Distributed Data Parallel (DDP) works by replicating the model on each device, splitting the dataset into smaller shards, one per device, and training the model on each shard independently. After each forward and backward pass, gradients are synchronized across all devices to ensure consistency, before the optimizer step is performed.
In the first part of the assignment, you will use PyTorch to create a wrapper class for a machine learning model, which performs DDP training of the VGG16 model on CPUs, and inspect its scalability.
Implement DDP
Now you'll complete an implementation of DDP!
Task 1.1: Implement the DistributedDataParallel
class in ddp.py
. This class serves as a wrapper for the model, enabling distributed training across multiple devices. In particular, implement the methods broadcast_params
and average_gradients
.
Feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the __init__
function.
Hints:
Task 1.2: Complete the training loop in train_vgg16_cifar10_ddp_worker
. This involves:
- Zeroing all gradients and performing a forward pass on the model on each worker
- Calculating loss and performing the backward pass on each worker
- Updating gradients across all ranks
- Updating model weights using the accumulated gradients and optimizer
We have provided a debugging print statement that can be called as utils.debug_print()
(we've already given several debug statements that call this method). These debug print statements can be toggled on and off with the DEBUG_PRINT
flag in utils.py
.
You can verify the correctness of your implementation by running the below command.
python3 train.py ddp --num_batches 2 --num_workers 2 --learning_rate 1e-3 --batch_size 32 --check_weights --check_output
The test will set PyTorch's random seeding to ensure deterministic training results, and compare the weights of your trained model, along with the output of it on a test batch, to that of a baseline model trained without any parallelism techniques.
Before analyzing the performance of your DDP model, please be cognizant that your machine may not support 4 workers (due to its memory overhead). If this is the case, feel free to use up to 3 (or 2) workers for the below measurements. We recommend shutting down memory intensive applications (e.g. Chrome, Slack).
By default, train.py
uses 1 CPU core per worker; you can also increase this by using the --cores_per_worker
argument, if you have enough resources.
Split Time of Each Worker
In distributed training, each node/worker will spend time performing computation and on communcation with other nodes. Understanding how time is spent relatively between each stage is crucial for evaluating the scalability of your implementation.
Task 1.3: train.py
will record statistics on the time spent in each stage within the training loop and save them to a file. We've provided a script which generates a graph from this stored data that breaks down the time each worker spent on each stage (communication, computation, and optimizer updates) and shows how this breakdown changes as the number of workers increase.
You can use the following commands to generate the data for the graph and plot the graph:
python3 train.py ddp --num_workers 1
python3 train.py ddp --num_workers 2
python3 train.py ddp --num_workers 4
python3 plot.py split_time ddp=1,ddp=2,ddp=4 --output graphs/ddp_split_time.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 1.1:
Please use your graph or the reference graph below to answer the following questions:
- What is the expected behavior for how computation time, optimizer update time, and communication time change as the number of workers increase?
- Does the generated graph match your expectations? If it looks different, try to analyze the reasons for the discrepancy.
- How might the relative difference between computation and communication time change if we instead ran on a system containing GPUs connected by high-bandwidth interconnects?
If you could not generate the graph locally, we have provided a reference from the grading server:
Hint
Differences between your graph and the expected graph could be influenced by factors related to your OS, concurrently running applications, or access/use of computational resources. But feel free to comment on any factors you believe are reasonable.
Throughput
Throughput, when referring to training, generally refers to the number of samples a model can process per second. We calculate it by the total of number of samples the model is trained on divided by the time taken to train.
Task 1.4: Now analyze how throughput scales with the number of workers for your implementation of DDP. To generate a graph displaying this data (using the data of your previous runs for Task 1.3), run the following:
python3 plot.py throughput ddp=1,ddp=2,ddp=4 --output graphs/ddp_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 1.2: Analyze the scalability of your implementation. Consider what the ideal case would look like (perfect scalability with no overheads); then think about how potential bottlenecks would influence performance. Does your graph match your expectations for the ideal case? Why or why not?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math time: Memory Usage and Communication in DDP
The main limitation of data parallelism is a high memory requirement, as the model is replicated on each device.
Assume we have a model with parameters. This means that we need to store in memory model parameters, gradients, and optimizer states ( represents how many variables the optimizer holds per parameter).
Question 1.3: For the data given above, determine the memory consumption when there are parallel workers. Let be the precision (in bits) used to store the model parameters, the gradients, and the optimizer states. Provide your answer in bytes (B); your answer should be in terms of , , , and .
Question 1.4: Now, consider a real-world scenario: suppose that we use the optimizer SGD (which stores parameters as state, i.e. just re-stores the parameters), with 8 workers, to train a model with 10 billion parameters. We will use FP16 to store the model parameters and gradients, but use FP32 to store the optimizer states (this is referred to as mixed-precision training). How much memory is required to run DDP with this setup? Provide your answer in gigabytes (GB).
Question 1.5: In DDP, the workers have to communicate after each iteration in order to synchronize gradients. Suppose that we have workers, our model has parameters, our network bandwidth is Gb/s (gigabits per second) between any pair of workers, and we use a ring all-reduce algorithm. Let be the precision (in bits) used to store the model parameters, be that used to store the gradients, and be that used to store the optimizer states. Write down an expression that captures the time (in seconds) it takes for the gradient synchronization. Your answer should be in terms of , , , , , , and .
Question 1.6: Now let's plug in real values to the above expression. Assume that we have 8 workers, 10 billion parameters, and we use FP16 for parameters and gradients, but FP32 to store optimizer states. Assume the network bandwidth is 100 Gb/s. Provide your answer in seconds.
Part 2: Model Parallelism
Model parallelism works by splitting the model into separate pieces, each of which are placed onto different nodes. For this assignment, the model will be split into partitions of contiguous layers, which are each placed on separate devices. The forward pass goes through the partitions in order, while the backward pass propagates gradients in reverse order.
As we will go over in class, this variation of model parallelism is not the only way to split a model across nodes (for instance, you can also split parameters on the same layer between devices, referred to as tensor model parallelism).
This form of model parallelism reduces communication overhead compared to DDP because communication only happens at partition boundaries between devices. However, it has very low worker utilization because only one worker is running at a time. We will explore a way to improve this later in the assignment.
In this part of the assignment, you will use PyTorch to create a wrapper class for model parallel training of the VGG16 model on CPUs, and inspect its throughput, as well as a timeline of when computation occurs on each worker.
Model Parallelism Code Structure
The code for model parallelism is structured slightly differently than the DDP wrapper. We recommend taking a look at it before starting to understand the differences (located in model_parallel.py
):
- Instead of a single wrapper, there are two wrappers:
ModelParallelWrapper
which the training loop uses to orchestrate forward and backward passes on each worker, and ModelParallelWorker
which actually does the forward and backward pass on each worker, communicating with the previous and next partition.
train_vgg16_cifar10_model_parallel
additionally calls split_vgg16
to partition the VGG16 model, and then passes a partition to each worker. This shows how with model parallelism, the entire model is actually split across workers, so there is no immediate memory overhead.
Implement Model Parallelism
We have divided the implementation for model parallelism into 4 parts: figuring out the communication sizes for each partition boundary, implementing the forward pass, implementing the backward pass, and then combining the forward and backward pass into a single training step.
Task 2.1: In utils.py
, fill in the function analyze_communication_with_partitions
. You will be using point-to-point communication functions to move data between workers during the forward and backward passes, and these functions require the receiver to know what exactly is the size of data that will be received.
Hints:
- You'll want to perform a dummy forward pass to determine the output activations of each layer.
- Consider what the gradient is that each partition receives. How could you determine its size?
You can verify the correctness of your implementation by running:
Task 2.2: Next, implement ModelParallelWrapper::forward
and ModelParallelWorker::forward
in model_parallel.py
.
As with DDP, feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the __init__
function.
Hint: We recommend using the point-to-point communication primitives given in the torch.distributed
library (imported as dist
); see its send
and recv
functions.
Task 2.3: Now, implement ModelParallelWrapper::backward
and ModelParallelWorker::backward
in model_parallel.py
.
To help you understand the memory requirements of this technique, you must use torch.autograd.grad
to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. (In other words, the number of calls your implementation makes to torch.autograd.grad
should scale with the number of layers in the partition.) We will manually review your code to ensure that this is the case.
Task 2.4: Finally, combine the previous two steps by implementing ModelParallelWrapper::train_step
and ModelParallelWorker::optimizer_step
in model_parallel.py
.
You can verify the correctness of your implementation by running the following command:
python3 train.py model --num_batches 2 --num_workers 2 --learning_rate 1e-3 --batch_size 32 --check_output
Task 2.5: Generate a timeline graph for model parallelism with 3 workers. This timeline graph shows what computation is happening on each worker across an entire training step. To do this, run the following commands:
python3 train.py model --num_workers 3
python3 plot.py timeline model=3 --output graphs/model_timeline.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 2.1: Analyze your (or the reference) timeline graph. What does it indicate about how work is distributed between each worker? How does this reflect the internals of how model parallelism is implemented?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Task 2.6: Generate a throughput graph for model parallelism.
python3 train.py model --num_workers 1
python3 train.py model --num_workers 2
python3 train.py model --num_workers 3
python3 plot.py throughput model=1,model=2,model=3 --output graphs/model_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 2.2: Analyze your (or the reference) throughput graph. Does it match your expectations of how model parallelism should scale as the number of workers increases? Why or why not?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math time: Comparison of Communication in Model Parallel vs. DDP
Question 2.3: In DDP, the workers have to communicate after each iteration in order to synchronize gradients (as you explored in Questions 1.5 and 1.6). In model parallelism, communication happens at partition boundaries. Assume that there are total layers, divided into partitions (equally). Let the input batch size be .
- Let be the size (in bytes) of the input to each layer in the model (i.e. assume this is uniform across all layers). How much communication occurs (across all partitions in aggregate) during the forward pass?
- Let be the size (in bytes) of the gradient with respect to each layer's output in the model (i.e. assume this is uniform across all layers). How much communication occurs (across all partitions in aggregate) during the backward pass?
When we say "how much", we mean the amount of data (in bytes) that is being transferred in one iteration of training (i.e. with one batch).
Part 3: Reducing the Memory Overhead of DDP
One downside of DDP is that it requires the entire model to be loaded on a single device. To reduce the memory overhead of the model, its tensors (parameters, gradients, and optimizer states) can be sharded across devices. Whenever a device requires the entire parameter tensor (during a forward or backward pass), the tensor can be gathered to perform the computation and then freed, with the result of the computation averaged and then sharded across devices. Each device is responsible for its own shard, and updates it accordingly.
In this part of the assignment, you will implement a simplified version of FSDP (Fully Sharded Data Parallel). Once implemented, we'll compare FSDP to DDP to understand how the two methods scale.
Implement FSDP
Now you'll complete an implementation of FSDP! Note that we keep a copy of the entire model in self.layers
; this is so that we can determine the architecture of the underlying model (and thus, how accumulated parameters should be used to construct a layer of the model). In an actual FSDP implementation, each worker wouldn't store the entire model, just the architecture and its own local shard of parameters.
To get started, take a look at init_layers_and_params
and FullyShardedDataParallel::get_local_info
in fsdp.py
, which determine how the model's parameter tensors are sharded.
Task 3.1: Implement the forward pass of the model in the methods gather_param_data
and forward
of the FullyShardedDataParallel
class within fsdp.py
.
As before, feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the __init__
function.
Hint: Take a look at how the parameter tensors are obtained from a layer in init_layers_and_params
and sharded in FullyShardedDataParallel::get_local_info
. (Note that these parameter shards and associated metadata are stored in self.local_params
and self.unsharded_param_shapes
.) How would you therefore gather the parameter tensors of a layer in self.layers
from all ranks, and set them within the layer?
Task 3.2: Implement the backward pass of the model in the methods get_local_grad_shard
and backward
of the FullyShardedDataParallel
class within fsdp.py
. Then implement FullyShardedDataParallel::optimizer_step
.
Just as for model parallelism, you must use torch.autograd.grad
to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. (Think about what would be required if you performed a constant number of calls to torch.autograd.grad
- which properties of FSDP would you need to violate?) We will manually review your code to ensure that this is the case.
Hints:
- Recall that the computational graph for gradient calculation is constructed along the forward pass. Therefore, you'll need to calculate gradients using the same
Tensor
s used in the forward pass.
- However, you should only be updating/storing gradients for the local parameter shards (take a look at how
self.optimizer
is constructed in FullyShardedDataParallel
!).
- The backend we are using,
gloo
, does not support the reduce scatter operation. You can instead perform an all-reduce across all ranks, and then correspondingly slice the globally reduced tensor.
You can verify the correctness of your implementation by running the following command:
python3 train.py fsdp --num_batches 2 --num_workers 2 --learning_rate 1e-2 --batch_size 32 --check_weights --check_output
Task 3.3: Now use your implementation of FSDP to analyze the throughput as the number of workers scale.
To run this experiment, please run the following:
python3 train.py fsdp --num_workers 1
python3 train.py fsdp --num_workers 2
python3 train.py fsdp --num_workers 4
python3 plot.py throughput fsdp=1,fsdp=2,fsdp=4 --output graphs/fsdp_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 3.1: Please analyze the scalability of your FSDP implementation. Compare this graph to the previous DDP throughout graph. Do the two methods scale as you expected? Why or why not? Which one has better raw performance? Why? How does the communication required for FSDP and DDP differ?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math Time: Memory Overhead of DDP vs. FSDP
Question 3.2: Our implementation of FSDP only handled sharding the parameter states, but in reality, a full implementation would shard the gradients during the backward pass, and the optimizer state as well. Let's return to the scenario from Question 1.3 and 1.4, but instead consider the optimizer Adam. Specifically:
- We have a model with parameters.
- The Adam optimizer stores the parameter, momentum, and variance for each parameter.
- Again assume that all data is stored with the same precision (in bits).
Write expressions that capture the following, when we have workers:
- The memory overhead (in bytes) on each worker when nothing is sharded (same as DDP)
- The memory overhead (in bytes) when the optimizer state is sharded.
- The memory overhead (in bytes) when the optimizer state and gradients are sharded.
- The memory overhead (in bytes) when everything is sharded.
Question 3.3
Now let's plug in some values. We are again using mixed-precision training, where models and gradients are stored in FP16, but the optimizer states are stored in FP32. Our model has 10 billion parameters, and we split training across 8 workers. In bytes, what is the memory overhead per worker, when:
- The optimizer is sharded?
- The optimizer and gradients are sharded?
- Everything is sharded?
For full credit, please write out exactly how you arrived at these values.
Part 4: Pipeline Parallelism
In model parallelism, the forward/backward pass for each partition begins only after that for the previous/next partition is completed, which leads to low resource utilization on worker nodes. To improve resource utilization, we can introduce pipelining, by dividing the data into multiple microbatches. By doing so, the computation of different microbatches can overlap in time, enhancing resource utilization.
Implement Pipeline Parallelism
Task 4.1: The overall workflow of pipeline parallelsim is very similar to model parallelism, but notice the new microbatch_idx
parameter which is used to keep track of the currently processed microbatch of the current batch.
To start with, implement the forward
, backward
, and optimizer_step
methods for the PipelineParallelWorker
class, and the forward
and backward
methods for the PipelineParallel
class.
Just as for model parallelism, you must use torch.autograd.grad
to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. We will manually review your code to ensure that this is the case.
Hint: Your code for these sections should be very similar to that for model parallelism!
Task 4.2: Implement the train_step
and eval
methods for the PipelineParallel
class. Note that each of these steps take in a full batch of inputs.
You can verify the correctness of your implementation by running the below command.
python3 train.py pipeline --num_batches 2 --num_workers 2 --learning_rate 1e-2 --batch_size 32 --check_output
Task 4.3: Generate a timeline graph for pipeline parallelism with 3 workers. To do this, run the following commands:
python3 train.py pipeline --num_workers 3
python3 plot.py timeline pipeline=3 --output graphs/pipeline_timeline.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 4.1: Analyze your (or the reference) timeline graph. What does it indicate about how work is distributed between each worker? How does this reflect the internals of how pipeline parallelism is implemented?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Task 4.4: Generate a throughput graph for pipeline parallelism.
python3 train.py --num_workers 1 pipeline
python3 train.py --num_workers 2 pipeline
python3 train.py --num_workers 3 pipeline
python3 plot.py throughput pipeline=1,pipeline=2,pipeline=3 --output graphs/pipeline_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md
.
Question 4.2: Analyze your (or the reference) throughput graph. Does it match your expectations of how pipeline parallelism should scale as the number of workers increases? Why or why not? How does the performance of pipeline parallelism compare with model parallelism and why?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math Time: Pipeline Bubble Size
Question 4.3: In the GPipe paper, section 2.3 entitled Performance Optimization, the author proposed that the size of the pipeline bubble (the fraction of times when workers are not being utilized) is:
where is the number of partitions and is the number of microbatches. Please derive this expression based on Figure 2a in the paper.
Even though the approach in GPipe improves utilization over model parallelism, we can still observe idle gaps where workers are not being used. In the paper PipeDream, the authors proposed a different approach (with a schedule called 1F1B).
Question 4.4
Referring to Figure 4, does 1F1B reduce bubble size compared to the GPipe version of pipeline parallelism? If so – why? If not – what are the benefits of the alternate schedule in terms of active memory use, compared to GPipe?
Congratulations, you've completed the first CS 1390 assignment! 
Further References
We recommend taking a look at these resources if you would like to learn more; we also drew on some of these resources to create the assignment!
Acknowledgements: This project was developed for CS 1390 by Sid Boppana, Nathan Harbison, Alice Song, and Deepti Raghavan.
« Back to the CS1390 website
Assignment 1: Parallelism Techniques

Parts 1-2 (code only) due Tuesday, February 4th at 6:00pm EST
All parts due Wednesday, February 19th at 6:00pm EST
Introduction
With increasingly larger models and training data becoming the norm, it has become challenging to train machine learning models efficiently due to constraints in computational resources and memory. These challenges have led to the development of various parallelism strategies to distribute computation and memory usage across multiple devices or nodes.
In this assignment, you will explore two common parallelism techniques in training machine learning models: data parallelism (DDP) and model parallelism. After this, you will explore fully sharded data parallelism (FSDP) and pipeline parallelism, which address some of the limitations with each of these techniques respectively.
Logistics and due dates
The official lateness policy is described on the course website, in terms of the total number of late hours and how they can be split across assignments. The grading server will display for your final submission, if turned in late, the number of late hours accrued.
This assignment has two due dates, one as an initial check in (deliverables described below), and one where the final assignment will be submitted. You cannot use late hours on the initial check in.
Initial check in due on February 4th at 6 PM:
Final assignment due on February 19th at 6 PM
README.md
of the assignment repository; simply fill in your answers directly in theREADME.md
and generate your graphs as directed in the assignment (which yourREADME.md
will automatically display).We are still in the process of setting up the grading server; we will post instructions on registering with the grading server later this week.
We hope to have a way for you to submit runs of any parts of the assignment below so you can generate the data to graph on the grading server; we will have more information on this later.
Learning Goals
A note about resources: Your implementation will run on a CPU to demonstrate parallel training. Because of this, the observed speedups may not actually match what one would see on a GPU. However, we have designed the conceptual questions and explorations in the assignment for you to understand what the method should be doing, so later, if you are ever in a situation where you use one of these methods for parallel training, you have a better understanding of its tradeoffs.
Assignment Installation
The Github classroom link with the starter code can be found here.
Using Containers for Development
For this project and project 4, we'll use a course Docker container for development, that you can run locally on your own machine. From your introductory systems course (CS300 or CS33), you should have Docker installed on your local machine (if not, refer to these instructions). Clone your assignment repostiory onto your machine, and navigate to it in your terminal (for Windows users, you should perform these steps in WSL).
You can then use our provided
run_docker
script to pull and run the course container. To download the container, run the following commands:chmod +x ./run_docker ./run_docker download
This will download the Docker image for the course container, as according to the platform you are running on.
To run the container, you can run the following command:
which will run the container, by default, with 4 CPUs and 4 GB of memory, and mount the current directory into your container. You can also customize these options if needed, as such:
After executing this command, you should enter the container and see the mounted directory as your work files within the current directory.
Instructions to run manually, if the script gives an error:
Try running the following command (if you are using Podman instead, just replace
docker
withpod
):For
linux/amd
architecture:docker pull cs1390mlsys/cs1390 docker tag cs1390mlsys/cs1390 cs1390 docker run -it --shm-size=1g --cpus=[cpu_num] -m=[memory] --mount type=bind,source=./,target=/home/cs1390-user cs1390 /bin/bash
For
linux/arm
architecture:docker pull cs1390mlsys/cs1390.arm docker tag cs1390mlsys/cs1390.arm cs1390.arm docker run -it --shm-size=1g --cpus=[cpu_num] -m=[memory] --mount type=bind,source=./,target=/home/cs1390-user cs1390.arm /bin/bash
If you can't get Docker or Podman to work, here are alternate instructions:
For projects 1 and 4, you can alternately use a local python environment. Note that project 1 requires the use of a function
os.sched_affinity
, which is not implemented for MacOS. You can comment out this function (it is called withinpin_to_core
inutils.py
) out for the purpose of getting the computation working, but your performance results may not make sense.Install Miniconda with the instructions here. Take note of the location where Miniconda is installed, which we will call
$PATH_TO_MINICONDA
.Shell initialization: we recommend adding the Conda initialization to your shell configuration (which may have been done during the install). To do this, run:
source $PATH_TO_MINICONDA/bin/activate conda init --all
conda --version # should print "24.11" or something similar
We have found that even if you modify your
.bashrc
/.zshrc
to include the Conda initialization sequence, Conda does not always initialize. Simply check if the Conda initialization sequence is inside your~/.bashrc
, and if not, repeat step 2; if so, runsource ~/.bashrc.
Create a new environment for cs1390. The referenced requirements file is in the root folder of this assignment's repository:
conda env create --name cs1390 python=3.12 conda activate cs1390 conda install pytorch torchvision -c pytorch pip install -r requirements.txt
Part 1: Distributed Data Parallel (DDP)
Distributed Data Parallel (DDP) works by replicating the model on each device, splitting the dataset into smaller shards, one per device, and training the model on each shard independently. After each forward and backward pass, gradients are synchronized across all devices to ensure consistency, before the optimizer step is performed.
In the first part of the assignment, you will use PyTorch to create a wrapper class for a machine learning model, which performs DDP training of the VGG16 model on CPUs, and inspect its scalability.
Implement DDP
Now you'll complete an implementation of DDP!
Task 1.1: Implement the
DistributedDataParallel
class inddp.py
. This class serves as a wrapper for the model, enabling distributed training across multiple devices. In particular, implement the methodsbroadcast_params
andaverage_gradients
.Feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the
__init__
function.Hints:
torch.distributed
(imported asdist
).Parameter
s, which can be obtained from a modelmodel
viamodel.parameters()
.Tensor
t
's accumulated gradient is stored int.grad
.Task 1.2: Complete the training loop in
train_vgg16_cifar10_ddp_worker
. This involves:We have provided a debugging print statement that can be called as
utils.debug_print()
(we've already given several debug statements that call this method). These debug print statements can be toggled on and off with theDEBUG_PRINT
flag inutils.py
.You can verify the correctness of your implementation by running the below command.
The test will set PyTorch's random seeding to ensure deterministic training results, and compare the weights of your trained model, along with the output of it on a test batch, to that of a baseline model trained without any parallelism techniques.
Performance Analysis
Before analyzing the performance of your DDP model, please be cognizant that your machine may not support 4 workers (due to its memory overhead). If this is the case, feel free to use up to 3 (or 2) workers for the below measurements. We recommend shutting down memory intensive applications (e.g. Chrome, Slack).
By default,
train.py
uses 1 CPU core per worker; you can also increase this by using the--cores_per_worker
argument, if you have enough resources.Split Time of Each Worker
In distributed training, each node/worker will spend time performing computation and on communcation with other nodes. Understanding how time is spent relatively between each stage is crucial for evaluating the scalability of your implementation.
Task 1.3:
train.py
will record statistics on the time spent in each stage within the training loop and save them to a file. We've provided a script which generates a graph from this stored data that breaks down the time each worker spent on each stage (communication, computation, and optimizer updates) and shows how this breakdown changes as the number of workers increase.You can use the following commands to generate the data for the graph and plot the graph:
# Run DDP for 1, 2, and 4 workers python3 train.py ddp --num_workers 1 python3 train.py ddp --num_workers 2 python3 train.py ddp --num_workers 4 # Plot the data python3 plot.py split_time ddp=1,ddp=2,ddp=4 --output graphs/ddp_split_time.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 1.1: Please use your graph or the reference graph below to answer the following questions:
If you could not generate the graph locally, we have provided a reference from the grading server:
Hint
Differences between your graph and the expected graph could be influenced by factors related to your OS, concurrently running applications, or access/use of computational resources. But feel free to comment on any factors you believe are reasonable.
Throughput
Throughput, when referring to training, generally refers to the number of samples a model can process per second. We calculate it by the total of number of samples the model is trained on divided by the time taken to train.
Task 1.4: Now analyze how throughput scales with the number of workers for your implementation of DDP. To generate a graph displaying this data (using the data of your previous runs for Task 1.3), run the following:
# Plot the data python3 plot.py throughput ddp=1,ddp=2,ddp=4 --output graphs/ddp_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 1.2: Analyze the scalability of your implementation. Consider what the ideal case would look like (perfect scalability with no overheads); then think about how potential bottlenecks would influence performance. Does your graph match your expectations for the ideal case? Why or why not?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math time: Memory Usage and Communication in DDP
The main limitation of data parallelism is a high memory requirement, as the model is replicated on each device.
Assume we have a model with parameters. This means that we need to store in memory model parameters, gradients, and optimizer states ( represents how many variables the optimizer holds per parameter).
Question 1.3: For the data given above, determine the memory consumption when there are parallel workers. Let be the precision (in bits) used to store the model parameters, the gradients, and the optimizer states. Provide your answer in bytes (B); your answer should be in terms of , , , and .
Question 1.4: Now, consider a real-world scenario: suppose that we use the optimizer SGD (which stores parameters as state, i.e. just re-stores the parameters), with 8 workers, to train a model with 10 billion parameters. We will use FP16 to store the model parameters and gradients, but use FP32 to store the optimizer states (this is referred to as mixed-precision training). How much memory is required to run DDP with this setup? Provide your answer in gigabytes (GB).
Question 1.5: In DDP, the workers have to communicate after each iteration in order to synchronize gradients. Suppose that we have workers, our model has parameters, our network bandwidth is Gb/s (gigabits per second) between any pair of workers, and we use a ring all-reduce algorithm. Let be the precision (in bits) used to store the model parameters, be that used to store the gradients, and be that used to store the optimizer states. Write down an expression that captures the time (in seconds) it takes for the gradient synchronization. Your answer should be in terms of , , , , , , and .
Question 1.6: Now let's plug in real values to the above expression. Assume that we have 8 workers, 10 billion parameters, and we use FP16 for parameters and gradients, but FP32 to store optimizer states. Assume the network bandwidth is 100 Gb/s. Provide your answer in seconds.
Part 2: Model Parallelism
Model parallelism works by splitting the model into separate pieces, each of which are placed onto different nodes. For this assignment, the model will be split into partitions of contiguous layers, which are each placed on separate devices. The forward pass goes through the partitions in order, while the backward pass propagates gradients in reverse order.
As we will go over in class, this variation of model parallelism is not the only way to split a model across nodes (for instance, you can also split parameters on the same layer between devices, referred to as tensor model parallelism).
This form of model parallelism reduces communication overhead compared to DDP because communication only happens at partition boundaries between devices. However, it has very low worker utilization because only one worker is running at a time. We will explore a way to improve this later in the assignment.
In this part of the assignment, you will use PyTorch to create a wrapper class for model parallel training of the VGG16 model on CPUs, and inspect its throughput, as well as a timeline of when computation occurs on each worker.
Model Parallelism Code Structure
The code for model parallelism is structured slightly differently than the DDP wrapper. We recommend taking a look at it before starting to understand the differences (located in
model_parallel.py
):ModelParallelWrapper
which the training loop uses to orchestrate forward and backward passes on each worker, andModelParallelWorker
which actually does the forward and backward pass on each worker, communicating with the previous and next partition.train_vgg16_cifar10_model_parallel
additionally callssplit_vgg16
to partition the VGG16 model, and then passes a partition to each worker. This shows how with model parallelism, the entire model is actually split across workers, so there is no immediate memory overhead.Implement Model Parallelism
We have divided the implementation for model parallelism into 4 parts: figuring out the communication sizes for each partition boundary, implementing the forward pass, implementing the backward pass, and then combining the forward and backward pass into a single training step.
Task 2.1: In
utils.py
, fill in the functionanalyze_communication_with_partitions
. You will be using point-to-point communication functions to move data between workers during the forward and backward passes, and these functions require the receiver to know what exactly is the size of data that will be received.Hints:
You can verify the correctness of your implementation by running:
Task 2.2: Next, implement
ModelParallelWrapper::forward
andModelParallelWorker::forward
inmodel_parallel.py
.As with DDP, feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the
__init__
function.Hint: We recommend using the point-to-point communication primitives given in the
torch.distributed
library (imported asdist
); see itssend
andrecv
functions.Task 2.3: Now, implement
ModelParallelWrapper::backward
andModelParallelWorker::backward
inmodel_parallel.py
.To help you understand the memory requirements of this technique, you must use
torch.autograd.grad
to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. (In other words, the number of calls your implementation makes totorch.autograd.grad
should scale with the number of layers in the partition.) We will manually review your code to ensure that this is the case.Task 2.4: Finally, combine the previous two steps by implementing
ModelParallelWrapper::train_step
andModelParallelWorker::optimizer_step
inmodel_parallel.py
.You can verify the correctness of your implementation by running the following command:
Performance Analysis
Task 2.5: Generate a timeline graph for model parallelism with 3 workers. This timeline graph shows what computation is happening on each worker across an entire training step. To do this, run the following commands:
# Train using model parallelism with 3 workers python3 train.py model --num_workers 3 # Plot the data python3 plot.py timeline model=3 --output graphs/model_timeline.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 2.1: Analyze your (or the reference) timeline graph. What does it indicate about how work is distributed between each worker? How does this reflect the internals of how model parallelism is implemented?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Task 2.6: Generate a throughput graph for model parallelism.
# Train using model parallelism with 1, 2, and 3 workers python3 train.py model --num_workers 1 python3 train.py model --num_workers 2 python3 train.py model --num_workers 3 # Plot the data python3 plot.py throughput model=1,model=2,model=3 --output graphs/model_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 2.2: Analyze your (or the reference) throughput graph. Does it match your expectations of how model parallelism should scale as the number of workers increases? Why or why not?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math time: Comparison of Communication in Model Parallel vs. DDP
Question 2.3: In DDP, the workers have to communicate after each iteration in order to synchronize gradients (as you explored in Questions 1.5 and 1.6). In model parallelism, communication happens at partition boundaries. Assume that there are total layers, divided into partitions (equally). Let the input batch size be .
When we say "how much", we mean the amount of data (in bytes) that is being transferred in one iteration of training (i.e. with one batch).
Part 3: Reducing the Memory Overhead of DDP
One downside of DDP is that it requires the entire model to be loaded on a single device. To reduce the memory overhead of the model, its tensors (parameters, gradients, and optimizer states) can be sharded across devices. Whenever a device requires the entire parameter tensor (during a forward or backward pass), the tensor can be gathered to perform the computation and then freed, with the result of the computation averaged and then sharded across devices. Each device is responsible for its own shard, and updates it accordingly.
In this part of the assignment, you will implement a simplified version of FSDP (Fully Sharded Data Parallel). Once implemented, we'll compare FSDP to DDP to understand how the two methods scale.
Implement FSDP
Now you'll complete an implementation of FSDP! Note that we keep a copy of the entire model in
self.layers
; this is so that we can determine the architecture of the underlying model (and thus, how accumulated parameters should be used to construct a layer of the model). In an actual FSDP implementation, each worker wouldn't store the entire model, just the architecture and its own local shard of parameters.To get started, take a look at
init_layers_and_params
andFullyShardedDataParallel::get_local_info
infsdp.py
, which determine how the model's parameter tensors are sharded.Task 3.1: Implement the forward pass of the model in the methods
gather_param_data
andforward
of theFullyShardedDataParallel
class withinfsdp.py
.As before, feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the
__init__
function.Hint: Take a look at how the parameter tensors are obtained from a layer in
init_layers_and_params
and sharded inFullyShardedDataParallel::get_local_info
. (Note that these parameter shards and associated metadata are stored inself.local_params
andself.unsharded_param_shapes
.) How would you therefore gather the parameter tensors of a layer inself.layers
from all ranks, and set them within the layer?Task 3.2: Implement the backward pass of the model in the methods
get_local_grad_shard
andbackward
of theFullyShardedDataParallel
class withinfsdp.py
. Then implementFullyShardedDataParallel::optimizer_step
.Just as for model parallelism, you must use
torch.autograd.grad
to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. (Think about what would be required if you performed a constant number of calls totorch.autograd.grad
- which properties of FSDP would you need to violate?) We will manually review your code to ensure that this is the case.Hints:
Tensor
s used in the forward pass.self.optimizer
is constructed inFullyShardedDataParallel
!).gloo
, does not support the reduce scatter operation. You can instead perform an all-reduce across all ranks, and then correspondingly slice the globally reduced tensor.You can verify the correctness of your implementation by running the following command:
Performance Analysis
Task 3.3: Now use your implementation of FSDP to analyze the throughput as the number of workers scale. To run this experiment, please run the following:
# Run FSDP for 1, 2 and 4 workers python3 train.py fsdp --num_workers 1 python3 train.py fsdp --num_workers 2 python3 train.py fsdp --num_workers 4 # Plot the data python3 plot.py throughput fsdp=1,fsdp=2,fsdp=4 --output graphs/fsdp_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 3.1: Please analyze the scalability of your FSDP implementation. Compare this graph to the previous DDP throughout graph. Do the two methods scale as you expected? Why or why not? Which one has better raw performance? Why? How does the communication required for FSDP and DDP differ?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math Time: Memory Overhead of DDP vs. FSDP
Question 3.2: Our implementation of FSDP only handled sharding the parameter states, but in reality, a full implementation would shard the gradients during the backward pass, and the optimizer state as well. Let's return to the scenario from Question 1.3 and 1.4, but instead consider the optimizer Adam. Specifically:
Write expressions that capture the following, when we have workers:
Question 3.3 Now let's plug in some values. We are again using mixed-precision training, where models and gradients are stored in FP16, but the optimizer states are stored in FP32. Our model has 10 billion parameters, and we split training across 8 workers. In bytes, what is the memory overhead per worker, when:
For full credit, please write out exactly how you arrived at these values.
Part 4: Pipeline Parallelism
In model parallelism, the forward/backward pass for each partition begins only after that for the previous/next partition is completed, which leads to low resource utilization on worker nodes. To improve resource utilization, we can introduce pipelining, by dividing the data into multiple microbatches. By doing so, the computation of different microbatches can overlap in time, enhancing resource utilization.
Implement Pipeline Parallelism
Task 4.1: The overall workflow of pipeline parallelsim is very similar to model parallelism, but notice the new
microbatch_idx
parameter which is used to keep track of the currently processed microbatch of the current batch.To start with, implement the
forward
,backward
, andoptimizer_step
methods for thePipelineParallelWorker
class, and theforward
andbackward
methods for thePipelineParallel
class.Just as for model parallelism, you must use
torch.autograd.grad
to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. We will manually review your code to ensure that this is the case.Hint: Your code for these sections should be very similar to that for model parallelism!
Task 4.2: Implement the
train_step
andeval
methods for thePipelineParallel
class. Note that each of these steps take in a full batch of inputs.You can verify the correctness of your implementation by running the below command.
Performance Analysis
Task 4.3: Generate a timeline graph for pipeline parallelism with 3 workers. To do this, run the following commands:
# Train using model parallelism with 3 workers python3 train.py pipeline --num_workers 3 # Plot the data python3 plot.py timeline pipeline=3 --output graphs/pipeline_timeline.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 4.1: Analyze your (or the reference) timeline graph. What does it indicate about how work is distributed between each worker? How does this reflect the internals of how pipeline parallelism is implemented?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Task 4.4: Generate a throughput graph for pipeline parallelism.
# Train using pipeline parallelism with 1, 2, and 3 workers python3 train.py --num_workers 1 pipeline python3 train.py --num_workers 2 pipeline python3 train.py --num_workers 3 pipeline # Plot the data python3 plot.py throughput pipeline=1,pipeline=2,pipeline=3 --output graphs/pipeline_throughput_comparison.png
If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your
README.md
.Question 4.2: Analyze your (or the reference) throughput graph. Does it match your expectations of how pipeline parallelism should scale as the number of workers increases? Why or why not? How does the performance of pipeline parallelism compare with model parallelism and why?
If you were not able to generate the graph, we have provided a reference version from the grading server:
Math Time: Pipeline Bubble Size
Question 4.3: In the GPipe paper, section 2.3 entitled Performance Optimization, the author proposed that the size of the pipeline bubble (the fraction of times when workers are not being utilized) is:
where is the number of partitions and is the number of microbatches. Please derive this expression based on Figure 2a in the paper.
Even though the approach in GPipe improves utilization over model parallelism, we can still observe idle gaps where workers are not being used. In the paper PipeDream, the authors proposed a different approach (with a schedule called 1F1B).
Question 4.4 Referring to Figure 4, does 1F1B reduce bubble size compared to the GPipe version of pipeline parallelism? If so – why? If not – what are the benefits of the alternate schedule in terms of active memory use, compared to GPipe?
Congratulations, you've completed the first CS 1390 assignment!
Further References
We recommend taking a look at these resources if you would like to learn more; we also drew on some of these resources to create the assignment!
Acknowledgements: This project was developed for CS 1390 by Sid Boppana, Nathan Harbison, Alice Song, and Deepti Raghavan.