Implementing UPGMA in Python

Code along video

In this code along, we implement the UPGMA algorithm in functions.py, writing the main function and building up each of its helpers one at a time.

Setup

To complete this code along, download the starter code as upgma_python.zip ( found [here]), expand the folder, and move the resulting upgma directory into your python/src source code directory.

The upgma directory has the following structure:

  • Data: a directory containing eight biological datasets, each in its own subfolder. Every subfolder includes a .mtx file (the distance matrix you will pass to UPGMA) and, where available, a .fasta file of the original sequences. The datasets are described below.
  • Output: an initially empty directory where your output images and tree files will be written.
  • datatypes.py: a file containing the data type declarations we will use throughout this code along.
  • functions.py: a file where we will implement the UPGMA algorithm and all of its helper functions. This is the main focus of this code along.
  • io_util.py: a provided file containing functions for reading a distance matrix from a .mtx file and writing results to file.
  • main.py: a file containing def main(), where we will read in a distance matrix, call our UPGMA implementation, and write the resulting tree to file.

Here is what our main() contains at the start of this code along.

def main() -> None:
    print("Happy trees.")

if __name__ == "__main__":
    main()

Datasets

Each subfolder in Data corresponds to one biological question. All pairwise distances are Levenshtein (edit) distances computed between protein or nucleotide sequences, meaning each entry in the matrix counts the minimum number of insertions, deletions, or substitutions needed to transform one sequence into another.

  • HBA1: Hemoglobin Subunit Alpha. Protein sequences for hemoglobin subunit alpha from 108 animals, downloaded from UniProt. Hemoglobin is the oxygen-carrying protein in red blood cells; because it is under strong functional constraint, its rate of evolution is relatively steady, making it a classic candidate for a molecular clock analysis.
  • Cytochrome-C: Somatic Cytochrome C. Sequences (~104–112 amino acids) from 16 diverse eukaryotes. Cytochrome c is a small electron-carrier protein essential to mitochondrial respiration and one of the most conserved proteins in all of biology. The resulting tree broadly recapitulates the accepted phylogeny of eukaryotes, with plants and fungi as outgroups to animals and primates clustering most closely together.
  • Cetaceans: The Whale-Hippo Problem. Mitochondrial cytochrome b sequences for 12 mammalian species. This dataset illustrates one of the most surprising discoveries in molecular phylogenetics: whales and dolphins (order Cetacea) did not merely split off from hoofed mammals; they evolved from within the even-toed ungulates (Artiodactyla). The hippopotamus is more closely related to whales than it is to horses or rhinos, a result that was deeply counterintuitive based on anatomy alone.
  • Giant-Panda-Bears: Where Does the Giant Panda Belong? Cytochrome b sequences for 10 species in and around family Ursidae. For decades, biologists debated whether the giant panda was a true bear, a relative of the red panda, or something else entirely. Molecular data definitively placed it within the bears, and placed the red panda in its own completely separate family.
  • HIV-Subtypes: The Origins of HIV. Gag polyprotein sequences (~500 amino acids) from reference strains of HIV-1 (subtypes A, B, C, D, G, and groups M, N, O), HIV-2 (groups A and B), and SIVcpz (chimpanzee SIV). HIV-1 and HIV-2 arose from independent transmissions of primate SIVs to humans, and the tree makes these separate origins visible: HIV-1 and HIV-2 are far more distant from each other than the subtypes within each group.
  • Mitochondrial-Haplogroups: Tracing Human Migration Out of Africa. HVR1 (Hypervariable Region 1, ~546 nucleotides) sequences extracted from complete human mitochondrial genomes representing 13 major haplogroups. Because mitochondrial DNA is inherited maternally without recombination, mutations accumulate along strictly maternal lineages, and the tree traces human migration out of Africa. The deepest branch, haplogroup L0 from the San Bushmen of southern Africa, represents a lineage ~200,000 years old, while haplogroups A, B, C, and D are among the five founding haplogroups of Indigenous Americans.
  • UK-SARS-CoV-2: Watching a Virus Evolve in Real Time. Spike protein sequences from 261 UK SARS-CoV-2 genomes sampled over every two-week period between November 2020 and March 2024, three genomes per time point. Unlike the other datasets, this tree has a time axis: samples collected further apart in time should appear further apart in the tree, giving a direct view of the virus evolving through the pandemic.

Data setup

In the previous code-along, we designed our Node dataclass with label and age attributes along with with two references to child nodes, as shown by the vomit-inducing Python code below.

@dataclass
class Node:
    label: str = "Unlabeled"
    age: float = 0.0
    child1: 'Node | None' = None
    child2: 'Node | None' = None

We also defined a Tree object class as having one attribute, a list of Node objects.

@dataclass
class Tree:
    nodes: list[Node] = field(default_factory=list)

When an object has only a single attribute, it makes sense to instead use a type alias. You may recall that we introduced aliases when working with cellular automata, we introduced the alias GameBoard to represent a two-dimensional list of integers. Similarly, we define Tree as a name for list[Node] in datatypes.py. We also will establish a DistanceMatrix alias for a two-dimensional list of floats.

Tree = list[Node]
DistanceMatrix = list[list[float]]

The UPGMA function

Now let’s open functions.py and write the main upgma() function. If you would like a refresher on how the UPGMA algorithm works before we start coding, refer to the core text.

UPGMA is a nontrivial algorithm, but the function itself is not very long. That is because we aggressively delegate work to subroutines, each step of the algorithm becoming a single function call.

from datatypes import Node, Tree, DistanceMatrix

def upgma(mtx: DistanceMatrix, species_names: list[str]) -> Tree:
    """
    Build a phylogenetic tree using UPGMA.

    Args:
        mtx: A square distance matrix.
        species_names: One name per row of the matrix.

    Returns:
        A Tree whose first len(species_names) elements
        are leaves and whose last element is the root.
    """
    assert_square_matrix(mtx)
    assert_same_number_species(mtx, species_names)

    num_leaves = len(mtx)
    t = initialize_tree(species_names)
    clusters = t[:num_leaves]

    for p in range(num_leaves, 2*num_leaves - 1):
        row, col, min_val = find_min_element(mtx)
        t[p].age = min_val / 2.0
        t[p].child1 = clusters[row]
        t[p].child2 = clusters[col]
        cluster_size_1 = t[p].child1.count_leaves()
        cluster_size_2 = t[p].child2.count_leaves()

        mtx = add_row_col(row, col, cluster_size_1, cluster_size_2, mtx)
        mtx = delete_row_col(mtx, row, col)
        clusters.append(t[p])
        clusters = delete_clusters(clusters, row, col)

    return t

A few things are worth noting. The loop runs exactly n - 1 times, once for each internal node we create. Each iteration finds the closest pair of clusters, creates a new internal node, and then shrinks both the matrix and the cluster list by replacing two entries with one. After the loop, t[len(t)-1] (the last node we added) is the root of the tree.

Notice that we call t[p].child1.count_leaves() to compute cluster sizes. We will implement count_leaves() soon. For now, let’s implement the helper functions one by one.

Validating input

The first thing upgma() does is call two validation functions. The worst things that could happen to our algorithm are receiving a matrix that is not square, or a mismatch between the number of rows and the number of species names. Let’s hammer these out first.

def assert_square_matrix(mtx: DistanceMatrix) -> None:
    """
    Raise ValueError if mtx is empty or not square.

    Args:
        mtx: The distance matrix to check.
    """
    num_rows = len(mtx)

    if num_rows == 0:
        raise ValueError("Error: empty matrix.")

    for r in range(num_rows):
        if len(mtx[r]) != num_rows:
            raise ValueError("Error: matrix not square.")


def assert_same_number_species(mtx: DistanceMatrix, species_names: list[str]) -> None:
    """
    Raise ValueError if species count mismatches matrix size.

    Args:
        mtx: The distance matrix.
        species_names: The names to check against mtx.
    """
    if len(species_names) != len(mtx):
        raise ValueError("Error: mismatched number of species and matrix rows.")

Initializing the tree

Before the main loop can begin, we will allocate all the nodes we will ever use in initialize_tree(). In UPGMA, a tree with n leaves has exactly 2n - 1 nodes in total: n leaves and n - 1 internal nodes. Note that we cannot infer any edge relationships yet.

def initialize_tree(species_names: list[str]) -> Tree:
    """
    Allocate all 2n - 1 nodes for a tree with n leaves.

    Args:
        species_names: Names for the n leaf nodes.

    Returns:
        A Tree of labeled leaves followed by internal nodes.
    """
    num_leaves = len(species_names)
    t: Tree = []

    for i in range(2 * num_leaves - 1):
        if i < num_leaves:
            t.append(Node(label=species_names[i]))
        else:
            t.append(Node(label=f"Ancestor Species: {i}"))

    return t

Initializing clusters and list slicing

In upgma(), we initialize the cluster list with clusters = t[:num_leaves]. The [:] notation is called slicing because it creates a new list containing a subset of the original. But this raises a question: if we modify something in the slice, does the change show up in the original list? In other words, will clusters and t refer to the same nodes, or will copies of these nodes be made? Let’s find out in main.py.

def main() -> None:
    a = [10, 20, 30, 40]
    b = a[:2]

    print(b)       # [10, 20]
    b[0] = 99
    print(a)       # ???

if __name__ == "__main__":
    main()
STOP: Before running this code, predict what print(a) will output. Does changing b[0] affect a?

When we run the above code, it outputs [10, 20, 30, 40]. Slicing creates a new list, so replacing an element in b does not affect a. But what about a list of mutable objects, like our nodes?

from datatypes import Node

def main() -> None:
    a = [Node(label="Axolotl"), Node(label="Blobfish")]
    b = a[:1]

    print(b[0].label)    # Axolotl
    b[0].label = "Capybara"
    print(a[0].label)    # ???

if __name__ == "__main__":
    main()
STOP: This time, predict what print(a[0].label) will output. Is it still "Axolotl", or has it changed?

This time, print(a[0].label) outputs "Capybara". The slice created a new list, but the entries in that list are references to the same objects. Modifying the node through b modifies it in a too, because both lists point to the same Node.

This is exactly what we want for UPGMA: for example, when we later set t[p].child1 = clusters[row], the child pointer refers directly to the node in our tree, not to a stale copy.

Finding the minimum element

Each UPGMA step merges the two clusters that are closest together. We scan the upper triangle of the distance matrix (the entries where j > i) to find the minimum value and return its coordinates.

def find_min_element(mtx: DistanceMatrix) -> tuple[int, int, float]:
    """
    Scan the upper triangle of mtx for the minimum value.

    Args:
        mtx: A square distance matrix (at least 2x2).

    Returns:
        A tuple (row, col, min_val) where row < col.
    """
    if len(mtx) <= 1 or len(mtx[0]) <= 1:
        raise ValueError("Matrix has only one row or column.")

    row = 0
    col = 1
    min_val = mtx[row][col]

    for i in range(len(mtx) - 1):
        for j in range(i + 1, len(mtx[i])):
            if mtx[i][j] < min_val:
                row, col, min_val = i, j, mtx[i][j]

    return row, col, min_val

We only scan the upper triangle because the matrix is symmetric ( mtx[i][j] is equal to mtx[j][i] for all choices of i and j) so every pair of clusters appears exactly once. We also require that col > row (a consequence of their values in find_min_element(), which we will rely on when deleting the old clusters and matrix entries.

Adding a new row and column

After merging clusters at positions row and col, we need to compute the distance from the new merged cluster to every other remaining cluster. UPGMA uses a size-weighted average: if the two merging clusters have sizes s1 and s2, the distance to another cluster k is

(s1 * d(k, row) + s2 * d(k, col)) / (s1 + s2)

We compute these values into a new list, then append that list as both a new row and a new column of the matrix.

def add_row_col(row: int, col: int, cluster_size1: int, cluster_size2: int, mtx: DistanceMatrix) -> DistanceMatrix:
    """
    Append a new row and column for the merged cluster.

    Args:
        row: Index of the first cluster to merge.
        col: Index of the second cluster to merge.
        cluster_size1: Leaf count of the first cluster.
        cluster_size2: Leaf count of the second cluster.
        mtx: The current distance matrix.

    Returns:
        The matrix with one new row and column added.
    """
    num_rows = len(mtx)
    new_row = [0.0] * (num_rows + 1)

    for r in range(len(new_row) - 1):
        if r != row and r != col:
            new_row[r] = (cluster_size1 * mtx[r][row] + cluster_size2 * mtx[r][col]) / (cluster_size1 + cluster_size2)

    mtx.append(new_row)

    for r in range(len(new_row) - 1):
        mtx[r].append(new_row[r])

    return mtx

It is easy to forget the second loop. Appending new_row to mtx adds the new row, but the distance matrix must remain symmetric; that is, the same distances must also appear as a new column. The loop

    for r in range(len(new_row) - 1):
        mtx[r].append(new_row[r])

appends the column value to every existing row.

Deleting clusters

After appending the new merged cluster to the end of clusters, we remove the two old entries at indices row and col.

def delete_clusters(clusters: list[Node], row: int, col: int) -> list[Node]:
    """
    Remove two merged clusters from the cluster list.

    Args:
        clusters: The active cluster list.
        row: Index of the first cluster to remove.
        col: Index of the second cluster (col > row).

    Returns:
        The updated cluster list.
    """
    del clusters[row]
    del clusters[col]

    return clusters
STOP: Does anything look wrong with this code? (Hint: try tracing through a small example where row is 1 and col is 3.)

And yet our function has a subtle bug. To see why, consider what happens when row=1 and col=3. After we delete the element having index row, all elements past this element slide one element to the left. As a result, when we delete the element at col = 3, this element was the element that was previously at index col+1 = 4.

The fix is simple: always delete the higher index first. Because col > row, deleting col first leaves row untouched.

def delete_clusters(clusters: list[Node], row: int, col: int) -> list[Node]:
    """
    Remove two merged clusters from the cluster list.

    Args:
        clusters: The active cluster list.
        row: Index of the first cluster to remove.
        col: Index of the second cluster (col > row).

    Returns:
        The updated cluster list.
    """
    # Delete higher index first to avoid shifting the lower index
    del clusters[col]
    del clusters[row]

    return clusters

Deleting rows and columns

We apply exactly the same principle to the distance matrix. Now that the new cluster’s row and column are at the end of the matrix, we delete the two old rows and columns at indices row and col, always removing the higher index first. The code below removes the rows first, and then removes the columns.

def delete_row_col(mtx: DistanceMatrix, row: int, col: int) -> DistanceMatrix:
    """
    Remove two rows and columns from the distance matrix.

    Args:
        mtx: The current distance matrix.
        row: Row/column index of the first cluster.
        col: Row/column index of the second cluster (col > row).

    Returns:
        The matrix with two rows and columns removed.
    """
    # Delete rows -- always delete the higher index first
    del mtx[col]
    del mtx[row]

    num_rows = len(mtx)

    # Delete columns -- same ordering: higher index first
    for r in range(num_rows):
        del mtx[r][col]
        del mtx[r][row]

    return mtx

Looking ahead

We have written the complete UPGMA algorithm and all of its helper functions. But we cannot run it yet because the algorithm calls count_leaves(), which we have not implemented. In the next code-along, we will learn about a powerful algorithmic technique called recursion, use it to finish UPGMA as well as to write trees to file, and then run our algorithm on real biological datasets.

Scroll to Top
Programming for Lovers banner no background
programming for lovers logo cropped

Join our community!