66 lines
1.9 KiB
Python
66 lines
1.9 KiB
Python
import os
|
|
import json
|
|
import networkx as nx
|
|
from repository.file_system_repository import FileSystemRepository
|
|
|
|
|
|
class GraphProcessor:
|
|
file_path: str
|
|
graph = None
|
|
|
|
def __init__(self, file_path: str):
|
|
self.file_path = file_path
|
|
self.graph = self.load_graph_from_json()
|
|
|
|
def load_graph_from_json(self):
|
|
with open(self.file_path, "r") as file:
|
|
data = json.load(file)
|
|
|
|
G = nx.Graph()
|
|
|
|
if "matrix" in data:
|
|
matrix = data["matrix"]
|
|
for part1, neighbors in matrix.items():
|
|
for neighbor in neighbors:
|
|
G.add_edge(part1, neighbor)
|
|
|
|
return G
|
|
|
|
|
|
class EdgeBetweensClustering:
|
|
def __init__(self, graph):
|
|
self.graph = graph.copy()
|
|
self.clusters = []
|
|
|
|
def cluster(self):
|
|
while self.graph.number_of_edges() > 0:
|
|
edge_betweens = nx.edge_betweenness_centrality(self.graph)
|
|
max_betweens_edge = max(edge_betweens, key=edge_betweens.get)
|
|
self.graph.remove_edge(*max_betweens_edge)
|
|
components = list(nx.connected_components(self.graph))
|
|
if components not in self.clusters:
|
|
self.clusters.append(components)
|
|
return []
|
|
|
|
def get_clusters(self):
|
|
return self.clusters
|
|
|
|
|
|
class ClusterisationSequenceUseCase:
|
|
def call(self, file_path: str):
|
|
graph_processor = GraphProcessor(file_path + "adjacency_matrix.json")
|
|
G = graph_processor.load_graph_from_json()
|
|
ebc = EdgeBetweensClustering(G)
|
|
ebc.cluster()
|
|
clusters = ebc.get_clusters()
|
|
|
|
for i in range(len(clusters)):
|
|
for j in range(len(clusters[i])):
|
|
clusters[i][j] = list(clusters[i][j])
|
|
|
|
FileSystemRepository.writeFile(
|
|
json.dumps(clusters, ensure_ascii=False, indent=2),
|
|
file_path,
|
|
"assembly_sequence.json",
|
|
)
|
|
return clusters
|