« Back to the CS1390 website

Assignment 1: Parallelism Techniques :computer: :link: :computer:

Parts 1-2 (code only) due Tuesday, February 4th at 6:00pm EST

All parts due Wednesday, February 19th at 6:00pm EST

Introduction

With increasingly larger models and training data becoming the norm, it has become challenging to train machine learning models efficiently due to constraints in computational resources and memory. These challenges have led to the development of various parallelism strategies to distribute computation and memory usage across multiple devices or nodes.

In this assignment, you will explore two common parallelism techniques in training machine learning models: data parallelism (DDP) and model parallelism. After this, you will explore fully sharded data parallelism (FSDP) and pipeline parallelism, which address some of the limitations with each of these techniques respectively.

Logistics and due dates

The official lateness policy is described on the course website, in terms of the total number of late hours and how they can be split across assignments. The grading server will display for your final submission, if turned in late, the number of late hours accrued.

This assignment has two due dates, one as an initial check in (deliverables described below), and one where the final assignment will be submitted. You cannot use late hours on the initial check in.

Initial check in due on February 4th at 6 PM:

Final assignment due on February 19th at 6 PM

We are still in the process of setting up the grading server; we will post instructions on registering with the grading server later this week.

We hope to have a way for you to submit runs of any parts of the assignment below so you can generate the data to graph on the grading server; we will have more information on this later.

Learning Goals

A note about resources: Your implementation will run on a CPU to demonstrate parallel training. Because of this, the observed speedups may not actually match what one would see on a GPU. However, we have designed the conceptual questions and explorations in the assignment for you to understand what the method should be doing, so later, if you are ever in a situation where you use one of these methods for parallel training, you have a better understanding of its tradeoffs.

Assignment Installation

The Github classroom link with the starter code can be found here.

Using Containers for Development

For this project and project 4, we'll use a course Docker container for development, that you can run locally on your own machine. From your introductory systems course (CS300 or CS33), you should have Docker installed on your local machine (if not, refer to these instructions). Clone your assignment repostiory onto your machine, and navigate to it in your terminal (for Windows users, you should perform these steps in WSL).

You can then use our provided run_docker script to pull and run the course container. To download the container, run the following commands:

chmod +x ./run_docker
./run_docker download

This will download the Docker image for the course container, as according to the platform you are running on.

To run the container, you can run the following command:

./run_docker run

which will run the container, by default, with 4 CPUs and 4 GB of memory, and mount the current directory into your container. You can also customize these options if needed, as such:

./run_docker run -s [mount_dir] -c [num_cpus] -m [mem_in_gb]

After executing this command, you should enter the container and see the mounted directory as your work files within the current directory.

Instructions to run manually, if the script gives an error:

Try running the following command (if you are using Podman instead, just replace docker with pod):

For linux/amd architecture:

docker pull cs1390mlsys/cs1390
docker tag cs1390mlsys/cs1390 cs1390
docker run -it --shm-size=1g --cpus=[cpu_num] -m=[memory] --mount type=bind,source=./,target=/home/cs1390-user cs1390 /bin/bash

For linux/arm architecture:

docker pull cs1390mlsys/cs1390.arm
docker tag cs1390mlsys/cs1390.arm cs1390.arm
docker run -it --shm-size=1g --cpus=[cpu_num] -m=[memory] --mount type=bind,source=./,target=/home/cs1390-user cs1390.arm /bin/bash
If you can't get Docker or Podman to work, here are alternate instructions:

For projects 1 and 4, you can alternately use a local python environment. Note that project 1 requires the use of a function os.sched_affinity, which is not implemented for MacOS. You can comment out this function (it is called within pin_to_core in utils.py) out for the purpose of getting the computation working, but your performance results may not make sense.

  1. Install Miniconda with the instructions here. Take note of the location where Miniconda is installed, which we will call $PATH_TO_MINICONDA.

  2. Shell initialization: we recommend adding the Conda initialization to your shell configuration (which may have been done during the install). To do this, run:

source $PATH_TO_MINICONDA/bin/activate
conda init --all
  1. Check that Conda is properly is initialized:
conda --version # should print "24.11" or something similar
  1. We have found that even if you modify your .bashrc/.zshrc to include the Conda initialization sequence, Conda does not always initialize. Simply check if the Conda initialization sequence is inside your ~/.bashrc, and if not, repeat step 2; if so, run source ~/.bashrc.

  2. Create a new environment for cs1390. The referenced requirements file is in the root folder of this assignment's repository:

conda env create --name cs1390 python=3.12
conda activate cs1390
conda install pytorch torchvision -c pytorch
pip install -r requirements.txt

Part 1: Distributed Data Parallel (DDP)

Distributed Data Parallel (DDP) works by replicating the model on each device, splitting the dataset into smaller shards, one per device, and training the model on each shard independently. After each forward and backward pass, gradients are synchronized across all devices to ensure consistency, before the optimizer step is performed.

In the first part of the assignment, you will use PyTorch to create a wrapper class for a machine learning model, which performs DDP training of the VGG16 model on CPUs, and inspect its scalability.

Implement DDP

Now you'll complete an implementation of DDP!

Task 1.1: Implement the DistributedDataParallel class in ddp.py. This class serves as a wrapper for the model, enabling distributed training across multiple devices. In particular, implement the methods broadcast_params and average_gradients.

Feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the __init__ function.

Hints:

Task 1.2: Complete the training loop in train_vgg16_cifar10_ddp_worker. This involves:

We have provided a debugging print statement that can be called as utils.debug_print() (we've already given several debug statements that call this method). These debug print statements can be toggled on and off with the DEBUG_PRINT flag in utils.py.

You can verify the correctness of your implementation by running the below command.

python3 train.py ddp --num_batches 2 --num_workers 2 --learning_rate 1e-3 --batch_size 32 --check_weights --check_output

The test will set PyTorch's random seeding to ensure deterministic training results, and compare the weights of your trained model, along with the output of it on a test batch, to that of a baseline model trained without any parallelism techniques.

Performance Analysis

Before analyzing the performance of your DDP model, please be cognizant that your machine may not support 4 workers (due to its memory overhead). If this is the case, feel free to use up to 3 (or 2) workers for the below measurements. We recommend shutting down memory intensive applications (e.g. Chrome, Slack).

By default, train.py uses 1 CPU core per worker; you can also increase this by using the --cores_per_worker argument, if you have enough resources.

Split Time of Each Worker

In distributed training, each node/worker will spend time performing computation and on communcation with other nodes. Understanding how time is spent relatively between each stage is crucial for evaluating the scalability of your implementation.

Task 1.3: train.py will record statistics on the time spent in each stage within the training loop and save them to a file. We've provided a script which generates a graph from this stored data that breaks down the time each worker spent on each stage (communication, computation, and optimizer updates) and shows how this breakdown changes as the number of workers increase.

You can use the following commands to generate the data for the graph and plot the graph:

# Run DDP for 1, 2, and 4 workers
python3 train.py ddp --num_workers 1
python3 train.py ddp --num_workers 2
python3 train.py ddp --num_workers 4

# Plot the data
python3 plot.py split_time ddp=1,ddp=2,ddp=4 --output graphs/ddp_split_time.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 1.1: Please use your graph or the reference graph below to answer the following questions:

If you could not generate the graph locally, we have provided a reference from the grading server:

Hint

Differences between your graph and the expected graph could be influenced by factors related to your OS, concurrently running applications, or access/use of computational resources. But feel free to comment on any factors you believe are reasonable.

Throughput

Throughput, when referring to training, generally refers to the number of samples a model can process per second. We calculate it by the total of number of samples the model is trained on divided by the time taken to train.

Task 1.4: Now analyze how throughput scales with the number of workers for your implementation of DDP. To generate a graph displaying this data (using the data of your previous runs for Task 1.3), run the following:

# Plot the data
python3 plot.py throughput ddp=1,ddp=2,ddp=4 --output graphs/ddp_throughput_comparison.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 1.2: Analyze the scalability of your implementation. Consider what the ideal case would look like (perfect scalability with no overheads); then think about how potential bottlenecks would influence performance. Does your graph match your expectations for the ideal case? Why or why not?

If you were not able to generate the graph, we have provided a reference version from the grading server:

Math time: Memory Usage and Communication in DDP

The main limitation of data parallelism is a high memory requirement, as the model is replicated on each device.

Assume we have a model with P parameters. This means that we need to store in memory P model parameters, P gradients, and KP optimizer states (K represents how many variables the optimizer holds per parameter).

Question 1.3: For the data given above, determine the memory consumption when there are W parallel workers. Let S be the precision (in bits) used to store the model parameters, the gradients, and the optimizer states. Provide your answer in bytes (B); your answer should be in terms of P, K, W, and S.

Question 1.4: Now, consider a real-world scenario: suppose that we use the optimizer SGD (which stores P parameters as state, i.e. just re-stores the parameters), with 8 workers, to train a model with 10 billion parameters. We will use FP16 to store the model parameters and gradients, but use FP32 to store the optimizer states (this is referred to as mixed-precision training). How much memory is required to run DDP with this setup? Provide your answer in gigabytes (GB).

Question 1.5: In DDP, the workers have to communicate after each iteration in order to synchronize gradients. Suppose that we have W workers, our model has P parameters, our network bandwidth is B Gb/s (gigabits per second) between any pair of workers, and we use a ring all-reduce algorithm. Let Sp be the precision (in bits) used to store the model parameters, Sg be that used to store the gradients, and So be that used to store the optimizer states. Write down an expression that captures the time (in seconds) it takes for the gradient synchronization. Your answer should be in terms of P, K, W, Sp, Sg, So, and B.

Question 1.6: Now let's plug in real values to the above expression. Assume that we have 8 workers, 10 billion parameters, and we use FP16 for parameters and gradients, but FP32 to store optimizer states. Assume the network bandwidth is 100 Gb/s. Provide your answer in seconds.

Part 2: Model Parallelism

Model parallelism works by splitting the model into separate pieces, each of which are placed onto different nodes. For this assignment, the model will be split into partitions of contiguous layers, which are each placed on separate devices. The forward pass goes through the partitions in order, while the backward pass propagates gradients in reverse order.

As we will go over in class, this variation of model parallelism is not the only way to split a model across nodes (for instance, you can also split parameters on the same layer between devices, referred to as tensor model parallelism).

This form of model parallelism reduces communication overhead compared to DDP because communication only happens at partition boundaries between devices. However, it has very low worker utilization because only one worker is running at a time. We will explore a way to improve this later in the assignment.

In this part of the assignment, you will use PyTorch to create a wrapper class for model parallel training of the VGG16 model on CPUs, and inspect its throughput, as well as a timeline of when computation occurs on each worker.

Model Parallelism Code Structure

The code for model parallelism is structured slightly differently than the DDP wrapper. We recommend taking a look at it before starting to understand the differences (located in model_parallel.py):

  1. Instead of a single wrapper, there are two wrappers: ModelParallelWrapper which the training loop uses to orchestrate forward and backward passes on each worker, and ModelParallelWorker which actually does the forward and backward pass on each worker, communicating with the previous and next partition.
  2. train_vgg16_cifar10_model_parallel additionally calls split_vgg16 to partition the VGG16 model, and then passes a partition to each worker. This shows how with model parallelism, the entire model is actually split across workers, so there is no immediate memory overhead.

Implement Model Parallelism

We have divided the implementation for model parallelism into 4 parts: figuring out the communication sizes for each partition boundary, implementing the forward pass, implementing the backward pass, and then combining the forward and backward pass into a single training step.

Task 2.1: In utils.py, fill in the function analyze_communication_with_partitions. You will be using point-to-point communication functions to move data between workers during the forward and backward passes, and these functions require the receiver to know what exactly is the size of data that will be received.

Hints:

You can verify the correctness of your implementation by running:

python3 comm_test.py

Task 2.2: Next, implement ModelParallelWrapper::forward and ModelParallelWorker::forward in model_parallel.py.

As with DDP, feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the __init__ function.

Hint: We recommend using the point-to-point communication primitives given in the torch.distributed library (imported as dist); see its send and recv functions.

Task 2.3: Now, implement ModelParallelWrapper::backward and ModelParallelWorker::backward in model_parallel.py.

To help you understand the memory requirements of this technique, you must use torch.autograd.grad to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. (In other words, the number of calls your implementation makes to torch.autograd.grad should scale with the number of layers in the partition.) We will manually review your code to ensure that this is the case.

Task 2.4: Finally, combine the previous two steps by implementing ModelParallelWrapper::train_step and ModelParallelWorker::optimizer_step in model_parallel.py.

You can verify the correctness of your implementation by running the following command:

python3 train.py model --num_batches 2 --num_workers 2 --learning_rate 1e-3 --batch_size 32 --check_output

Performance Analysis

Task 2.5: Generate a timeline graph for model parallelism with 3 workers. This timeline graph shows what computation is happening on each worker across an entire training step. To do this, run the following commands:

# Train using model parallelism with 3 workers
python3 train.py model --num_workers 3

# Plot the data
python3 plot.py timeline model=3 --output graphs/model_timeline.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 2.1: Analyze your (or the reference) timeline graph. What does it indicate about how work is distributed between each worker? How does this reflect the internals of how model parallelism is implemented?

If you were not able to generate the graph, we have provided a reference version from the grading server:

Task 2.6: Generate a throughput graph for model parallelism.

# Train using model parallelism with 1, 2, and 3 workers
python3 train.py model --num_workers 1
python3 train.py model --num_workers 2
python3 train.py model --num_workers 3

# Plot the data
python3 plot.py throughput model=1,model=2,model=3 --output graphs/model_throughput_comparison.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 2.2: Analyze your (or the reference) throughput graph. Does it match your expectations of how model parallelism should scale as the number of workers increases? Why or why not?

If you were not able to generate the graph, we have provided a reference version from the grading server:

Math time: Comparison of Communication in Model Parallel vs. DDP

Question 2.3: In DDP, the workers have to communicate after each iteration in order to synchronize gradients (as you explored in Questions 1.5 and 1.6). In model parallelism, communication happens at partition boundaries. Assume that there are L total layers, divided into P partitions (equally). Let the input batch size be B.

When we say "how much", we mean the amount of data (in bytes) that is being transferred in one iteration of training (i.e. with one batch).

Part 3: Reducing the Memory Overhead of DDP

One downside of DDP is that it requires the entire model to be loaded on a single device. To reduce the memory overhead of the model, its tensors (parameters, gradients, and optimizer states) can be sharded across devices. Whenever a device requires the entire parameter tensor (during a forward or backward pass), the tensor can be gathered to perform the computation and then freed, with the result of the computation averaged and then sharded across devices. Each device is responsible for its own shard, and updates it accordingly.

In this part of the assignment, you will implement a simplified version of FSDP (Fully Sharded Data Parallel). Once implemented, we'll compare FSDP to DDP to understand how the two methods scale.

Implement FSDP

Now you'll complete an implementation of FSDP! Note that we keep a copy of the entire model in self.layers; this is so that we can determine the architecture of the underlying model (and thus, how accumulated parameters should be used to construct a layer of the model). In an actual FSDP implementation, each worker wouldn't store the entire model, just the architecture and its own local shard of parameters.

To get started, take a look at init_layers_and_params and FullyShardedDataParallel::get_local_info in fsdp.py, which determine how the model's parameter tensors are sharded.

Task 3.1: Implement the forward pass of the model in the methods gather_param_data and forward of the FullyShardedDataParallel class within fsdp.py.

As before, feel free to add/initialize any state to the wrapper class you might need (or for debugging) in the __init__ function.

Hint: Take a look at how the parameter tensors are obtained from a layer in init_layers_and_params and sharded in FullyShardedDataParallel::get_local_info. (Note that these parameter shards and associated metadata are stored in self.local_params and self.unsharded_param_shapes.) How would you therefore gather the parameter tensors of a layer in self.layers from all ranks, and set them within the layer?

Task 3.2: Implement the backward pass of the model in the methods get_local_grad_shard and backward of the FullyShardedDataParallel class within fsdp.py. Then implement FullyShardedDataParallel::optimizer_step.

Just as for model parallelism, you must use torch.autograd.grad to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. (Think about what would be required if you performed a constant number of calls to torch.autograd.grad - which properties of FSDP would you need to violate?) We will manually review your code to ensure that this is the case.

Hints:

You can verify the correctness of your implementation by running the following command:

python3 train.py fsdp --num_batches 2 --num_workers 2 --learning_rate 1e-2 --batch_size 32 --check_weights --check_output

Performance Analysis

Task 3.3: Now use your implementation of FSDP to analyze the throughput as the number of workers scale. To run this experiment, please run the following:

# Run FSDP for 1, 2 and 4 workers
python3 train.py fsdp --num_workers 1
python3 train.py fsdp --num_workers 2
python3 train.py fsdp --num_workers 4

# Plot the data
python3 plot.py throughput fsdp=1,fsdp=2,fsdp=4 --output graphs/fsdp_throughput_comparison.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 3.1: Please analyze the scalability of your FSDP implementation. Compare this graph to the previous DDP throughout graph. Do the two methods scale as you expected? Why or why not? Which one has better raw performance? Why? How does the communication required for FSDP and DDP differ?

If you were not able to generate the graph, we have provided a reference version from the grading server:

Math Time: Memory Overhead of DDP vs. FSDP

Question 3.2: Our implementation of FSDP only handled sharding the parameter states, but in reality, a full implementation would shard the gradients during the backward pass, and the optimizer state as well. Let's return to the scenario from Question 1.3 and 1.4, but instead consider the optimizer Adam. Specifically:

Write expressions that capture the following, when we have W workers:

  1. The memory overhead (in bytes) on each worker when nothing is sharded (same as DDP)
  2. The memory overhead (in bytes) when the optimizer state is sharded.
  3. The memory overhead (in bytes) when the optimizer state and gradients are sharded.
  4. The memory overhead (in bytes) when everything is sharded.

Question 3.3 Now let's plug in some values. We are again using mixed-precision training, where models and gradients are stored in FP16, but the optimizer states are stored in FP32. Our model has 10 billion parameters, and we split training across 8 workers. In bytes, what is the memory overhead per worker, when:

  1. The optimizer is sharded?
  2. The optimizer and gradients are sharded?
  3. Everything is sharded?

For full credit, please write out exactly how you arrived at these values.

Part 4: Pipeline Parallelism

In model parallelism, the forward/backward pass for each partition begins only after that for the previous/next partition is completed, which leads to low resource utilization on worker nodes. To improve resource utilization, we can introduce pipelining, by dividing the data into multiple microbatches. By doing so, the computation of different microbatches can overlap in time, enhancing resource utilization.

Implement Pipeline Parallelism

Task 4.1: The overall workflow of pipeline parallelsim is very similar to model parallelism, but notice the new microbatch_idx parameter which is used to keep track of the currently processed microbatch of the current batch.

To start with, implement the forward, backward, and optimizer_step methods for the PipelineParallelWorker class, and the forward and backward methods for the PipelineParallel class.

Just as for model parallelism, you must use torch.autograd.grad to calculate/propagate gradients individually for each layer within the partition to perform the backward pass. We will manually review your code to ensure that this is the case.

Hint: Your code for these sections should be very similar to that for model parallelism!

Task 4.2: Implement the train_step and eval methods for the PipelineParallel class. Note that each of these steps take in a full batch of inputs.

You can verify the correctness of your implementation by running the below command.

python3 train.py pipeline --num_batches 2 --num_workers 2 --learning_rate 1e-2 --batch_size 32 --check_output

Performance Analysis

Task 4.3: Generate a timeline graph for pipeline parallelism with 3 workers. To do this, run the following commands:

# Train using model parallelism with 3 workers
python3 train.py pipeline --num_workers 3

# Plot the data
python3 plot.py timeline pipeline=3 --output graphs/pipeline_timeline.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 4.1: Analyze your (or the reference) timeline graph. What does it indicate about how work is distributed between each worker? How does this reflect the internals of how pipeline parallelism is implemented?

If you were not able to generate the graph, we have provided a reference version from the grading server:

Task 4.4: Generate a throughput graph for pipeline parallelism.

# Train using pipeline parallelism with 1, 2, and 3 workers
python3 train.py --num_workers 1 pipeline
python3 train.py --num_workers 2 pipeline
python3 train.py --num_workers 3 pipeline

# Plot the data
python3 plot.py throughput pipeline=1,pipeline=2,pipeline=3 --output graphs/pipeline_throughput_comparison.png

If you are not able to finish this part of the assignment, we have provided a reference graph (generated on the grading server) for you to answer the question below and in your README.md.

Question 4.2: Analyze your (or the reference) throughput graph. Does it match your expectations of how pipeline parallelism should scale as the number of workers increases? Why or why not? How does the performance of pipeline parallelism compare with model parallelism and why?

If you were not able to generate the graph, we have provided a reference version from the grading server:

Math Time: Pipeline Bubble Size

Question 4.3: In the GPipe paper, section 2.3 entitled Performance Optimization, the author proposed that the size of the pipeline bubble (the fraction of times when workers are not being utilized) is: O(K1M+K1) where K is the number of partitions and M is the number of microbatches. Please derive this expression based on Figure 2a in the paper.

Even though the approach in GPipe improves utilization over model parallelism, we can still observe idle gaps where workers are not being used. In the paper PipeDream, the authors proposed a different approach (with a schedule called 1F1B).

Question 4.4 Referring to Figure 4, does 1F1B reduce bubble size compared to the GPipe version of pipeline parallelism? If so why? If not what are the benefits of the alternate schedule in terms of active memory use, compared to GPipe?

Congratulations, you've completed the first CS 1390 assignment! :tada:

Further References

We recommend taking a look at these resources if you would like to learn more; we also drew on some of these resources to create the assignment!


Acknowledgements: This project was developed for CS 1390 by Sid Boppana, Nathan Harbison, Alice Song, and Deepti Raghavan.