import os
import scanpy as sc
import pandas as pd
import numpy as np
import scipy as sp
import scipy.sparse as sp_sparse
# ------------------------------------------------------
# Hotfix for SOMDE: SOMDE uses `import scipy as sp` and then calls numpy-like APIs
# Newer SciPy removed these aliases (sp.arange, sp.array, sp.argsort, ...)
# We patch them back from NumPy for runtime compatibility.
# ------------------------------------------------------
_np_aliases = [
"array", "arange", "argsort", "asarray",
"zeros", "ones", "empty",
"zeros_like", "ones_like", "full", "full_like",
"log", "exp", "sqrt",
"sum", "mean", "std", "var",
"where", "isfinite", "isnan",
"maximum", "minimum", "clip",
"cumsum", "cumprod",
"floor", "ceil", "abs",
]
for _name in _np_aliases:
if not hasattr(sp, _name):
setattr(sp, _name, getattr(np, _name))
from somde import SomNode
DATA_DIR = "./data"
OUTPUT_DIR = "./results/somde"
SOM_K = 5
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ------------------------------------------------------
# Scanpy gene filtering thresholds
# ------------------------------------------------------
MIN_CELLS = 50
MIN_COUNTS = 50
def load_DLPFC(root_dir=DATA_DIR, section_id="151507"):
ad = sc.read_visium(
path=os.path.join(root_dir, section_id),
count_file=section_id + "_filtered_feature_bc_matrix.h5"
)
ad.var_names_make_unique()
gt_dir = os.path.join(root_dir, section_id, "gt")
gt_df = pd.read_csv(
os.path.join(gt_dir, "tissue_positions_list_GTs.txt"),
sep=",", header=None, index_col=0
)
ad.obs["original_clusters"] = gt_df.loc[:, 6]
keep_bcs = ad.obs.dropna().index
ad = ad[keep_bcs].copy()
ad.obs["original_clusters"] = ad.obs["original_clusters"].astype(int).astype(str)
print(f"{section_id} loading done")
return ad
def compute_and_save_somde_csv(
adata: sc.AnnData,
section_id: str,
output_dir: str = OUTPUT_DIR,
som_k: int = SOM_K,
min_cells: int = MIN_CELLS,
min_counts: int = MIN_COUNTS,
):
"""
Compute SOMDE spatially variable genes for one slice after filtering genes
with Scanpy:
1) keep genes expressed in at least `min_cells` spots
2) keep genes with total counts at least `min_counts`
Saves:
somde_{section_id}.csv
somde_{section_id}_filter_stats.csv
somde_{section_id}_filtered_out_genes.csv
"""
print(f"[SOMDE] Processing slice {section_id}")
# Keep a copy of original adata for diagnostics
adata_raw = adata.copy()
# ------------------------------------------------------
# Build pre-filter gene stats from raw counts
# ------------------------------------------------------
X_raw = adata_raw.X
gene_names_raw = np.array(adata_raw.var_names).astype(str)
if sp_sparse.issparse(X_raw):
total_count_raw = np.asarray(X_raw.sum(axis=0)).ravel()
nonzero_spots_raw = np.asarray((X_raw > 0).sum(axis=0)).ravel()
else:
X_tmp = np.asarray(X_raw)
total_count_raw = X_tmp.sum(axis=0)
nonzero_spots_raw = (X_tmp > 0).sum(axis=0)
# ------------------------------------------------------
# Filter genes using Scanpy
# ------------------------------------------------------
adata_filt = adata.copy()
n_before = adata_filt.n_vars
sc.pp.filter_genes(adata_filt, min_cells=min_cells)
n_after_min_cells = adata_filt.n_vars
sc.pp.filter_genes(adata_filt, min_counts=min_counts)
n_after_min_counts = adata_filt.n_vars
kept_genes = set(adata_filt.var_names.astype(str))
keep_mask = np.array([g in kept_genes for g in gene_names_raw])
n_keep = int(keep_mask.sum())
n_drop = int((~keep_mask).sum())
print(f"[SOMDE] {section_id}: total genes before filtering = {n_before}")
print(f"[SOMDE] {section_id}: after min_cells>={min_cells} = {n_after_min_cells}")
print(f"[SOMDE] {section_id}: after min_counts>={min_counts} = {n_after_min_counts}")
print(f"[SOMDE] {section_id}: keeping {n_keep} genes, dropping {n_drop} genes")
# ------------------------------------------------------
# Save filtering diagnostics
# ------------------------------------------------------
filter_df = pd.DataFrame({
"g": gene_names_raw,
"total_count": total_count_raw,
"nonzero_spots": nonzero_spots_raw,
"keep_for_somde": keep_mask,
})
filter_df[f"nonzero_spots_lt_{min_cells}"] = filter_df["nonzero_spots"] < min_cells
filter_df[f"total_count_lt_{min_counts}"] = filter_df["total_count"] < min_counts
filter_stats_path = os.path.join(output_dir, f"somde_{section_id}_filter_stats.csv")
filter_df.to_csv(filter_stats_path, index=False)
filtered_out_path = os.path.join(output_dir, f"somde_{section_id}_filtered_out_genes.csv")
filter_df.loc[~filter_df["keep_for_somde"]].to_csv(filtered_out_path, index=False)
# ------------------------------------------------------
# Run SOMDE on filtered genes only
# ------------------------------------------------------
pts = adata_filt.obsm["spatial"].astype(np.float32)
if sp_sparse.issparse(adata_filt.X):
X_filt = adata_filt.X.toarray()
else:
X_filt = np.asarray(adata_filt.X)
df_expr = pd.DataFrame(X_filt, columns=adata_filt.var_names.astype(str))
som = SomNode(pts, som_k)
som.mtx(df_expr.T)
som.norm()
result, _ = som.run()
output_path = os.path.join(output_dir, f"somde_{section_id}.csv")
result.to_csv(output_path, index=False)
print(f"[SOMDE] Saved result to {output_path}")
print(f"[SOMDE] Saved filter stats to {filter_stats_path}")
print(f"[SOMDE] Saved filtered-out genes to {filtered_out_path}")
section_ids = [
"151508",
"151509"
]
for sid in section_ids:
print(f"\n=== SOMDE for {sid} ===")
try:
adata = load_DLPFC(section_id=sid)
compute_and_save_somde_csv(
adata=adata,
section_id=sid,
min_cells=MIN_CELLS,
min_counts=MIN_COUNTS,
)
print(f"{sid} somde done")
except Exception as e:
print(f"[ERROR] {sid} failed: {e}")
print("\nAll SOMDE computations finished.")