Source code for COSIE.utils

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

import os
import numpy as np
import anndata as ad
import math
import scipy
import scipy.sparse as sp
import pandas as pd
import scanpy as sc
import random
import torch
from torch_geometric.utils import negative_sampling
import torch.nn.functional as F
from scipy.spatial import distance_matrix

from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import kneighbors_graph 


from annoy import AnnoyIndex
import hnswlib





[docs] def nn_approx(ds1, ds2, knn=10, metric='euclidean', n_trees=10, include_distances=False): """ Efficiently find approximate K-nearest neighbors using the Annoy library. Parameters ---------- ds1 : np.ndarray Query data of shape (n_query, dim), where neighbors will be searched for. ds2 : np.ndarray Reference data of shape (n_ref, dim), in which the neighbors are searched. knn : int, optional Number of neighbors to retrieve per query. Default is 10. metric : str, optional Distance metric used in Annoy. Must be one of: {'euclidean', 'manhattan', 'angular', 'hamming', 'dot'}. Default is 'euclidean'. n_trees : int, optional Number of trees used to build the Annoy index. Higher values increase accuracy at the cost of indexing time. Default is 10. include_distances : bool, optional Whether to also return distances to the nearest neighbors. Default is False. Returns ------- ind : np.ndarray If `include_distances` is False, returns an array of shape (n_query, knn) with indices of nearest neighbors. tuple of (ind, dist) : (np.ndarray, np.ndarray) If `include_distances` is True, returns a tuple: - `ind` : array of shape (n_query, knn) with indices of nearest neighbors. - `dist` : array of shape (n_query, knn) with corresponding distances. """ # Build index. a = AnnoyIndex(ds2.shape[1], metric=metric) for i in range(ds2.shape[0]): a.add_item(i, ds2[i, :]) a.build(n_trees) # Search index. ind, dist = [], [] for i in range(ds1.shape[0]): i_ind, i_dist = a.get_nns_by_vector(ds1[i, :], knn, search_k=-1, include_distances=True) ind.append(i_ind) dist.append(i_dist) ind = np.array(ind) if include_distances: return ind, np.array(dist) else: # return ind.flatten() return ind
[docs] def setup_seed(seed=8, mode='fast'): """ Set the random seed for Python, NumPy, and PyTorch to ensure reproducibility. Parameters ---------- seed : int, optional The random seed to be set for all relevant libraries. Default is 8. mode : str, optional Controls how strictly reproducibility is enforced. Must be one of {'fast', 'strict'}. - 'fast': Ensures reproducibility in most cases, with minimal performance impact. - 'strict': Enforces full determinism across all operations (including CUDA), but may significantly slow down certain models. It is recommended to use 'strict' only when exact reproducibility across runs is required. Returns ------- None """ random.seed(seed) np.random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) # control cuDNN torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # strict mode if mode == 'strict': ### This will slow down the learning process if hasattr(torch, 'use_deterministic_algorithms'): torch.use_deterministic_algorithms(True) os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':16:8'
[docs] def compute_knn_graph(input_data, n_neighbors): """ Construct a k-nearest neighbors (k-NN) graph based on an input feature matrix. Parameters ---------- input_data : np.ndarray An array of shape (n_cells, n_features) representing input features of cells, which can be spatial coordinates or feature vectors. n_neighbors : int Number of nearest neighbors to connect for each node. Returns ------- edge_index : torch.LongTensor A tensor of shape (2, num_edges) representing the edges of the graph. Each column (i, j) represents an edge from node i to its j-th nearest neighbor. """ nbrs = NearestNeighbors(n_neighbors=n_neighbors+1).fit(input_data) _ , indices = nbrs.kneighbors(input_data) x = indices[:, 0].repeat(n_neighbors+1) y = indices[:, 0:].flatten() edge_index = np.vstack((x, y)) edge_index = torch.tensor(edge_index, dtype=torch.long) return edge_index
[docs] def construct_knn_graph_hnsw(data, k=20, space='l2'): """ Efficiently compute approximate k-nearest neighbor (k-NN) graph using the hnswlib library. This method is suitable for large-scale datasets. Parameters ---------- data : torch.Tensor A tensor of shape (n_cells, n_features) representing the input feature matrix. k : int, optional Number of nearest neighbors to retrieve for each sample. Default is 20. space : str, optional Distance metric used to build the index. Must be one of {'l2', 'ip', 'cosine'}. - 'l2': Euclidean distance - 'ip': Inner product - 'cosine': Cosine similarity Default is 'l2'. Returns ------- edge_index : torch.LongTensor A tensor of shape (2, n_edges) representing the edges of the graph. Each column represents an edge from source node to target node. """ random.seed(42) np.random.seed(42) data = data.cpu().numpy().astype(np.float32) num_samples, dim = data.shape p = hnswlib.Index(space=space, dim=dim) p.init_index(max_elements=num_samples, ef_construction=200, M=16) p.add_items(data) p.set_ef(50) indices, distances = p.knn_query(data, k=k) row_indices = np.repeat(np.arange(num_samples), k) col_indices = indices.flatten() edge_index = torch.tensor(np.vstack((row_indices, col_indices)), dtype=torch.long) return edge_index
[docs] def compute_neighborhood_embedding(edge_index_spatial: torch.Tensor, embedding_matrix: torch.Tensor, device) -> torch.Tensor: """ Compute the neighborhood embedding of each cell based on its spatial k-nearest neighbor graph. Parameters ---------- edge_index_spatial : torch.Tensor A tensor of shape (2, num_edges), where each column represents a directed edge from source node to target node in the spatial graph. embedding_matrix : torch.Tensor A tensor of shape (n_cells, dim) representing the embedding of each cell. device : torch.device The computation device, e.g., `torch.device('cuda')` or `torch.device('cpu')`. Returns ------- neighborhood_embedding_matrix : torch.Tensor A tensor of shape (n_cells, dim) representing the average of each cell’s neighbors' embeddings. """ # Get the number of nodes and embedding dimension num_nodes = embedding_matrix.shape[0] embedding_dim = embedding_matrix.shape[1] # Initialize the neighborhood embedding matrix and count tensor neighborhood_embedding_matrix = torch.zeros(num_nodes, embedding_dim, device=device) neighbor_count = torch.zeros(num_nodes, dtype=torch.float32, device=device) # Use edge_index_spatial to index into the embedding matrix source_nodes = edge_index_spatial[0] # Get source nodes target_nodes = edge_index_spatial[1] # Get target nodes # Accumulate the embeddings from target nodes to the corresponding source nodes neighborhood_embedding_matrix.index_add_(0, source_nodes, embedding_matrix[target_nodes]) # Count neighbors for each source node neighbor_count.index_add_(0, source_nodes, torch.ones_like(target_nodes, dtype=torch.float32, device=device)) # Avoid division by zero by replacing zero counts with one neighbor_count[neighbor_count == 0] = 1 # Compute the average by dividing by the count of neighbors neighborhood_embedding_matrix /= neighbor_count.view(-1, 1) return neighborhood_embedding_matrix
##