import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import os
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from scipy.stats import pearsonr
if torch.backends.mps.is_available():
torch.set_default_dtype(torch.float32)
print("Set default to float32 for MPS compatibility")05 · TF Binding Prediction — Starter Code
This notebook is a reference implementation. For Homework 3 you need to submit three things:
Your notebook — with your own model, training, and evaluation code.
model_weights.pt— saved usingstate_dict(not the full model object):torch.save(model.state_dict(), 'model_weights.pt')Note: the original starter used
'dna_cnn_weights.pt'— use'model_weights.pt'for your submission.predict.py— a self-contained script the TA will run against a held-out sequences file (no scores provided). A template is included at the bottom of this notebook. Run it as:python predict.py --data <path_to_sequences> --out predictions.csv
def set_seed(seed: int = 42) -> None:
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
if torch.backends.mps.is_available():
torch.mps.manual_seed(seed)
elif torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
print(f"Random seed set as {seed}")
set_seed(42)DEVICE = torch.device('mps' if torch.backends.mps.is_available()
else 'cuda' if torch.cuda.is_available()
else 'cpu')
print(f"Using device: {DEVICE}")Load Data
DIR = '/Users/sofiasalazar/Library/CloudStorage/Box-Box/imlab-data/Courses/AI-in-Genomics-2025/data/'
sequences = pd.read_csv(os.path.join(DIR, 'chr22_sequences.txt.gz'), sep='\t', compression='gzip')
scores = pd.read_csv(os.path.join(DIR, 'chr22_scores.txt.gz'), sep='\t', compression='gzip', dtype='float32')
print("Sequences:", sequences.shape)
print("Scores:", scores.shape)
sequences.head(3)One-Hot Encoding
def one_hot_encode(seq):
"""One-hot encode a DNA sequence into a (seq_len, 4) numpy array."""
allowed = set("ACTGN")
if not set(seq).issubset(allowed):
invalid = set(seq) - allowed
raise ValueError(f"Invalid characters in sequence: {invalid}")
nuc_d = {'A': [1.0, 0.0, 0.0, 0.0],
'C': [0.0, 1.0, 0.0, 0.0],
'G': [0.0, 0.0, 1.0, 0.0],
'T': [0.0, 0.0, 0.0, 1.0],
'N': [0.0, 0.0, 0.0, 0.0]}
return np.array([nuc_d[x] for x in seq], dtype=np.float32)Train / Val / Test Split
train_df, test_df = train_test_split(sequences, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
print("Train:", train_df.shape)
print("Val: ", val_df.shape)
print("Test: ", test_df.shape)Dataset and DataLoader
class SeqDatasetOHE(Dataset):
"""Dataset that one-hot encodes DNA sequences and retrieves multi-TF scores."""
def __init__(self, seq_df, scores_df, seq_col='sequence', target_col='window_name'):
window_names = seq_df[target_col].tolist()
# scores_df columns are window names; transpose so rows = windows
self.labels = torch.tensor(
scores_df[window_names].T.values.astype('float32')
) # shape: (num_windows, num_TFs)
self.ohe_seqs = torch.stack(
[torch.tensor(one_hot_encode(s)) for s in seq_df[seq_col].tolist()]
) # shape: (num_windows, seq_len, 4)
def __len__(self):
return len(self.ohe_seqs)
def __getitem__(self, idx):
return self.ohe_seqs[idx], self.labels[idx]def build_dataloaders(train_df, val_df, test_df, scores_df, batch_size=64):
"""Create DataLoaders from train, val, and test DataFrames."""
train_ds = SeqDatasetOHE(train_df, scores_df)
val_ds = SeqDatasetOHE(val_df, scores_df)
test_ds = SeqDatasetOHE(test_df, scores_df)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=batch_size)
test_dl = DataLoader(test_ds, batch_size=batch_size)
return train_dl, val_dl, test_dltrain_dl, val_dl, test_dl = build_dataloaders(train_df, val_df, test_df, scores)Model
class DNA_CNN(nn.Module):
def __init__(self, seq_len, num_filters=32, kernel_size=8, num_outputs=300):
super().__init__()
self.conv = nn.Conv1d(4, num_filters, kernel_size=kernel_size)
self.relu = nn.ReLU(inplace=True)
self.linear = nn.Linear(num_filters * (seq_len - kernel_size + 1), num_outputs)
def forward(self, xb):
xb = xb.permute(0, 2, 1) # (batch, seq_len, 4) → (batch, 4, seq_len)
x = self.relu(self.conv(xb))
x = x.flatten(1)
return self.linear(x)SEQ_LEN = len(sequences['sequence'].iloc[0])
NUM_TFS = scores.shape[0]
print(f"Sequence length: {SEQ_LEN}, Number of TFs: {NUM_TFS}")
model = DNA_CNN(seq_len=SEQ_LEN, num_filters=32, kernel_size=8, num_outputs=NUM_TFS).to(DEVICE)Training
def train_model(model, train_dl, val_dl, device, lr=0.01, epochs=50):
"""Train a model and return loss histories."""
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
loss_fn = nn.MSELoss()
train_losses, val_losses = [], []
for epoch in range(epochs):
# --- Training ---
model.train()
batch_losses, batch_sizes = [], []
for xb, yb in train_dl:
xb, yb = xb.to(device), yb.to(device)
pred = model(xb.float()) # forward pass
loss = loss_fn(pred, yb.float()) # compute loss
optimizer.zero_grad() # clear gradients
loss.backward() # backprop
optimizer.step() # update weights
batch_losses.append(loss.item())
batch_sizes.append(len(xb))
train_loss = np.average(batch_losses, weights=batch_sizes)
train_losses.append(train_loss)
# --- Validation ---
model.eval()
with torch.no_grad():
vl, ns = [], []
for xb, yb in val_dl:
xb, yb = xb.to(device), yb.to(device)
loss = loss_fn(model(xb.float()), yb.float())
vl.append(loss.item())
ns.append(len(xb))
val_loss = np.average(vl, weights=ns)
val_losses.append(val_loss)
print(f"E{epoch+1:03d} | train loss: {train_loss:.4f} | val loss: {val_loss:.4f}")
return train_losses, val_lossesEPOCHS = 100
LR = 0.01
train_losses, val_losses = train_model(model, train_dl, val_dl, DEVICE, lr=LR, epochs=EPOCHS)def quick_loss_plot(data_label_list, loss_type="MSE Loss"):
for i, (train_data, val_data, label) in enumerate(data_label_list):
plt.plot(train_data, linestyle='--', color=f"C{i}", label=f"{label} Train")
plt.plot(val_data, color=f"C{i}", label=f"{label} Val", linewidth=3.0)
plt.legend(bbox_to_anchor=(1, 1), loc='upper left')
plt.ylabel(loss_type)
plt.xlabel("Epoch")
plt.show()
quick_loss_plot([(train_losses, val_losses, "CNN")])Test Evaluation
def test_model(model, test_dl, device):
"""Run model on test set; return predictions, observations, and per-sample Pearson r."""
model.eval()
preds, obs = [], []
with torch.no_grad():
for xb, yb in test_dl:
xb = xb.to(device)
preds.append(model(xb.float()).cpu().numpy())
obs.append(yb.numpy())
predictions = np.vstack(preds)
observations = np.vstack(obs)
pearson_per_sample = np.array([
pearsonr(predictions[i], observations[i])[0]
for i in range(len(predictions))
])
return predictions, observations, pearson_per_sample
predictions, observations, pearson_per_sample = test_model(model, test_dl, DEVICE)
print(f"Mean Pearson r (test set): {pearson_per_sample.mean():.4f}")
print(f"Best Pearson r: {pearson_per_sample.max():.4f}")plt.figure(figsize=(8, 5))
plt.hist(pearson_per_sample, bins=20, edgecolor='black', alpha=0.7)
plt.xlabel('Pearson Correlation')
plt.ylabel('Frequency')
plt.title('Test set: predicted vs observed Pearson r per sequence')
plt.show()Example: Best-predicted sequence
def plot_comparison(pred, obs, r_value):
x = np.arange(len(pred))
bar_width = 0.4
plt.figure(figsize=(12, 5))
plt.bar(x - bar_width, pred, width=bar_width, label='Predicted', alpha=0.7, color='b')
plt.bar(x + bar_width, obs, width=bar_width, label='Observed', alpha=0.7, color='r')
plt.xlabel('TF index')
plt.ylabel('Score')
plt.title('Predicted vs Observed TF scores')
plt.legend(title=f'Pearson r = {r_value:.3f}')
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.show()
best_idx = int(np.argmax(pearson_per_sample))
plot_comparison(predictions[best_idx], observations[best_idx], pearson_per_sample[best_idx])Saving and Loading the Model
PyTorch has two main options:
| Option | What is saved | When to use |
|---|---|---|
state_dict |
Weights only | Recommended. Portable — load into any matching architecture. |
torch.save(model) |
Entire object | Quick and easy, but tied to the exact class definition. |
The state_dict approach is preferred because it separates weights from code, making it easier to share and version.
# --- Save ---
SAVE_PATH = 'model_weights.pt'
torch.save(model.state_dict(), SAVE_PATH)
print(f"Weights saved to {SAVE_PATH}")# --- Load ---
# Re-create the architecture with the same hyperparameters, then load weights into it.
loaded_model = DNA_CNN(seq_len=SEQ_LEN, num_filters=32, kernel_size=8, num_outputs=NUM_TFS).to(DEVICE)
loaded_model.load_state_dict(torch.load(SAVE_PATH, map_location=DEVICE))
loaded_model.eval()
print("Weights loaded successfully.")# Verify the loaded model produces identical predictions
_, _, pearson_loaded = test_model(loaded_model, test_dl, DEVICE)
print(f"Mean Pearson r (loaded model): {pearson_loaded.mean():.4f}")
assert np.allclose(pearson_per_sample, pearson_loaded), "Mismatch — something went wrong!"
print("Outputs match the original model.")Saving a checkpoint (weights + optimizer state)
If you want to resume training later (not just evaluate), save the optimizer state and epoch number alongside the weights.
# --- Save checkpoint ---
CHECKPOINT_PATH = 'dna_cnn_checkpoint.pt'
optimizer = torch.optim.SGD(model.parameters(), lr=LR)
torch.save({
'epoch': EPOCHS,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'train_losses': train_losses,
'val_losses': val_losses,
}, CHECKPOINT_PATH)
print(f"Checkpoint saved to {CHECKPOINT_PATH}")# --- Resume from checkpoint ---
checkpoint = torch.load(CHECKPOINT_PATH, map_location=DEVICE)
resumed_model = DNA_CNN(seq_len=SEQ_LEN, num_filters=32, kernel_size=8, num_outputs=NUM_TFS).to(DEVICE)
resumed_model.load_state_dict(checkpoint['model_state_dict'])
resumed_optimizer = torch.optim.SGD(resumed_model.parameters(), lr=LR)
resumed_optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch']
print(f"Resumed from epoch {start_epoch}. Ready to continue training.")Prediction Script Template
Use this skeleton for your predict.py submission. The TA will run it against a held-out sequences file — no scores file will be provided.
import argparse
import numpy as np
import pandas as pd
import torch
from torch import nn
# --- Copy your model class here ---
class DNA_CNN(nn.Module):
def __init__(self, seq_len, num_filters=32, kernel_size=8, num_outputs=300):
super().__init__()
self.conv = nn.Conv1d(4, num_filters, kernel_size=kernel_size)
self.relu = nn.ReLU(inplace=True)
self.linear = nn.Linear(num_filters * (seq_len - kernel_size + 1), num_outputs)
def forward(self, xb):
xb = xb.permute(0, 2, 1)
x = self.relu(self.conv(xb))
x = x.flatten(1)
return self.linear(x)
# --- One-hot encoding ---
def one_hot_encode(seq):
nuc_d = {'A': [1,0,0,0], 'C': [0,1,0,0], 'G': [0,0,1,0], 'T': [0,0,0,1], 'N': [0,0,0,0]}
return np.array([nuc_d[x] for x in seq], dtype=np.float32)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--data', required=True, help='Path to sequences .txt.gz file')
parser.add_argument('--out', required=True, help='Output file path for predictions')
args = parser.parse_args()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Load sequences
sequences = pd.read_csv(args.data, sep='\t', compression='gzip')
# One-hot encode
ohe = torch.stack([torch.tensor(one_hot_encode(s)) for s in sequences['sequence']])
# Load model
SEQ_LEN = ohe.shape[1]
model = DNA_CNN(seq_len=SEQ_LEN).to(device)
model.load_state_dict(torch.load('model_weights.pt', map_location=device))
model.eval()
# Predict
with torch.no_grad():
preds = model(ohe.to(device)).cpu().numpy()
# Save predictions — rows = sequences, cols = TFs
pd.DataFrame(preds, index=sequences['window_name']).to_csv(args.out)
print(f"Predictions saved to {args.out}")
if __name__ == '__main__':
main()