"""ParallelGeneralGraph for parallel directed graphs (DiGraph) module"""

import logging
import sys
import warnings
from multiprocessing import Queue
import multiprocessing as mp
from multiprocessing.sharedctypes import RawArray
import ctypes
import numpy as np
import networkx as nx

from .utils import chunk_it
from .general_graph import GeneralGraph

warnings.simplefilter(action='ignore', category=FutureWarning)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

[docs]class ParallelGeneralGraph(GeneralGraph): """ Class ParallelGeneralGraph for parallel implementation of directed graphs (DiGraph). Constructs a new graph given an input file. A DiGraph stores nodes and edges with optional data or attributes. DiGraphs hold directed edges. Nodes can be arbitrary python objects with optional key/value attributes. Edges are represented as links between nodes with optional key/value attributes. """
[docs] def __init__(self): super().__init__() self.manager = mp.Manager() self.num = mp.cpu_count()
[docs] def measure_iteration(self, nodes, record, kernel, *measure_args): """ Inner iteration for parallel measures, to update shared dictionary. :param list nodes: nodes for which to compute the shortest path between them and all the other nodes. :param multiprocessing.managers.dict record: shared dictionary to be updated. :param callable kernel: kernel measure to be computed. :param *measure_args: arguments for kernel measures. Have a look at specific kernel measures in GeneralGraph for the particular variables/types for each measure. """ partial_dict = kernel(nodes, *measure_args) record.update(partial_dict)
[docs] def measure_processes(self, record, kernel, *measure_args): """ Division of total number of nodes in chuncks and parallel distribution of tasks into processes, for different kernel measure functions. :param multiprocessing.managers.dict record: shared dictionary to be updated :param callable kernel: kernel measure to be computed :param *measure_args: arguments for kernel measures (have a look at specific kernel measures in GeneralGraph for the particular variables/types for each measure) """ node_chunks = chunk_it(list(self.nodes()), self.num) processes = [ mp.Process( target=self.measure_iteration, args=(node_chunks[p], record, kernel, *measure_args) ) for p in range(self.num) ] for proc in processes: proc.start() for proc in processes: proc.join()
[docs] def floyd_warshall_predecessor_and_distance(self): """ Parallel Floyd Warshall's APSP algorithm. The predecessors and distance matrices are evaluated, together with the nested dictionaries for shortest-path, length of the paths and efficiency attributes. .. note:: Edges weight is taken into account in the distance matrix. Edge weight attributes must be numerical. Distances are calculated as sums of weighted edges traversed. :return: nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path; nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path length. :rtype: dict, dict """ dist, pred = self.floyd_warshall_initialization() shared_d = mp.sharedctypes.RawArray(ctypes.c_double, dist.shape[0]**2) dist_shared = np.frombuffer(shared_d, 'float64').reshape(dist.shape) dist_shared[:] = dist shared_p = mp.sharedctypes.RawArray(ctypes.c_double,pred.shape[0]**2) pred_shared = np.frombuffer(shared_p, 'float64').reshape(pred.shape) pred_shared[:] = pred n = len(self.nodes()) chunk = [(0, int(n / self.num))] node_chunks = chunk_it(list(self.nodes()), self.num) for i in range(1, self.num): chunk.append((chunk[i - 1][1], chunk[i - 1][1] + len(node_chunks[i]))) barrier = mp.Barrier(self.num) processes = [ mp.Process( target=self.floyd_warshall_kernel, args=(dist_shared, pred_shared, chunk[p][0], chunk[p][1], barrier)) for p in range(self.num) ] for proc in processes: proc.start() for proc in processes: proc.join() all_shortest_path = self.manager.dict() processes = [ mp.Process( target=self.measure_iteration, args=(list(map(self.ids_reversed.get, node_chunks[p])), all_shortest_path, self.construct_path_kernel, pred_shared) ) for p in range(self.num) ] for proc in processes: proc.start() for proc in processes: proc.join() nonempty_shortest_path = {} for k in all_shortest_path.keys(): nonempty_shortest_path[k] = { key: value for key, value in all_shortest_path[k].items() if value } shortest_path_length = {} for i in list(self.H): shortest_path_length[self.ids[i]] = {} for key, value in nonempty_shortest_path[self.ids[i]].items(): length_path = dist_shared[self.ids_reversed[value[0]], self.ids_reversed[value[-1]]] shortest_path_length[self.ids[i]][key] = length_path return nonempty_shortest_path, shortest_path_length
[docs] def dijkstra_iteration_parallel(self, out_queue, nodes): """ Parallel SSSP algorithm based on Dijkstra’s method. :param multiprocessing.queues.Queue out_queue: multiprocessing queue :param list nodes: list of starting nodes from which the SSSP should be computed to every other target node in the graph .. note:: Edges weight is taken into account. Edge weight attributes must be numerical. Distances are calculated as sums of weighted edges traversed. """ for n in nodes: ssspp = (n, nx.single_source_dijkstra(self, n, weight='weight')) out_queue.put(ssspp)
[docs] def dijkstra_single_source_shortest_path(self): """ Wrapper for parallel SSSP algorithm based on Dijkstra’s method. The nested dictionaries for shortest-path, length of the paths and efficiency attributes are evaluated. .. note:: Edges weight is taken into account. Edge weight attributes must be numerical. Distances are calculated as sums of weighted edges traversed. :return: nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path; nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path length. :rtype: dict, dict """ attribute_ssspp = [] out_queue = Queue() node_chunks = chunk_it(list(self.nodes()), self.num) processes = [ mp.Process( target=self.dijkstra_iteration_parallel, args=( out_queue,node_chunks[p] )) for p in range(self.num) ] for proc in processes: proc.start() while 1: running = any(p.is_alive() for p in processes) while not out_queue.empty(): attribute_ssspp.append(out_queue.get()) if not running: break shortest_path = {} shortest_path_length = {} for ssspp in attribute_ssspp: n = ssspp[0] shortest_path[n] = ssspp[1][1] shortest_path_length[n] = ssspp[1][0] return shortest_path, shortest_path_length
[docs] def calculate_shortest_path(self): """ Choose the most appropriate way to compute the all-pairs shortest path depending on graph size and density. For a dense graph choose Floyd Warshall algorithm. For a sparse graph choose SSSP algorithm based on Dijkstra's method. .. note:: Edge weights of the graph are taken into account in the computation. :return: nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path; nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path length. :rtype: dict, dict """ n_of_nodes = self.order() graph_density = nx.density(self) logging.debug('Number of processors: %s', self.num) logging.debug('In the graph are present %s nodes', n_of_nodes) if graph_density <= 0.000001: logging.debug('The graph is sparse, density = %s', graph_density) shpath, shpath_len = self.dijkstra_single_source_shortest_path() else: logging.debug('The graph is dense, density = %s', graph_density) shpath, shpath_len = self.floyd_warshall_predecessor_and_distance() return shpath, shpath_len
[docs] def compute_efficiency(self): """ Efficiency calculation. .. note:: The efficiency of a path connecting two nodes is defined as the inverse of the path length, if the path has length non-zero, and zero otherwise. :return: efficiency computed for every node. The keys correspond to source, while as value a dictionary keyed by target and valued by the source-target efficiency. :rtype: multiprocessing.managers.dict """ shortest_path_length = self.shortest_path_length efficiency = self.manager.dict() self.measure_processes(efficiency, self.efficiency_kernel, shortest_path_length) return efficiency
[docs] def compute_nodal_efficiency(self): """ Nodal efficiency calculation. .. note:: The nodal efficiency of the node is equal to zero for a node without any outgoing path and equal to one if from it we can reach each node of the digraph. :return: nodal efficiency computed for every node. :rtype: multiprocessing.managers.dict """ graph_size = len(list(self)) efficiency = self.efficiency nodal_efficiency = self.manager.dict() self.measure_processes(nodal_efficiency, self.nodal_efficiency_kernel, efficiency, graph_size) return nodal_efficiency
[docs] def compute_local_efficiency(self): """ Local efficiency calculation. .. note:: The local efficiency shows the efficiency of the connections between the first-order outgoing neighbors of node v when v is removed. Equivalently, local efficiency measures the resilience of the digraph to the perturbation of node removal, i.e. if we remove a node, how efficiently its first-order outgoing neighbors can communicate. It is in the range [0, 1]. :return: local efficiency computed for every node. :rtype: multiprocessing.managers.dict """ nodal_efficiency = self.nodal_efficiency local_efficiency = self.manager.dict() self.measure_processes(local_efficiency, self.local_efficiency_kernel, nodal_efficiency) return local_efficiency
[docs] def shortest_path_list_iteration(self, nodes, shortest_path, tot_shortest_paths_list): """ Inner iteration for parallel shortest path list calculation, to update shared list. :param list nodes: list of nodes for which to compute the shortest path between them and all the other nodes :param dict shortest_path: nested dictionary with key corresponding to source, while as value a dictionary keyed by target and valued by the source-target shortest path. :param tot_shortest_paths_list: list of shortest paths with at least two nodes :type tot_shortest_paths_list: multiprocessing.managers.list """ partial_shortest_paths_list = self.shortest_path_list_kernel(nodes, shortest_path) tot_shortest_paths_list.extend(partial_shortest_paths_list)
[docs] def compute_betweenness_centrality(self): """ Betweenness_centrality calculation. .. note:: Betweenness centrality is an index of the relative importance of a node and it is defined by the number of shortest paths that run through it. Nodes with the highest betweenness centrality hold the higher level of control on the information flowing between different nodes in the network, because more information will pass through them. :return: betweenness centrality computed for every node. :rtype: multiprocessing.managers.dict """ shortest_path = self.shortest_path tot_shortest_paths_list = self.manager.list() node_chunks = chunk_it(list(self.nodes()), self.num) processes = [ mp.Process( target=self.shortest_path_list_iteration, args=(node_chunks[p], shortest_path, tot_shortest_paths_list) ) for p in range(self.num) ] for proc in processes: proc.start() for proc in processes: proc.join() betweenness_centrality = self.manager.dict() self.measure_processes(betweenness_centrality, self.betweenness_centrality_kernel, tot_shortest_paths_list) return betweenness_centrality
[docs] def compute_closeness_centrality(self): """ Closeness_centrality calculation. .. note:: Closeness centrality measures the reciprocal of the average shortest path distance from a node to all other reachable nodes in the graph. Thus, the more central a node is, the closer it is to all other nodes. This measure allows to identify good broadcasters, that is key elements in a graph, depicting how closely the nodes are connected with each other. :return: closeness centrality computed for every node. :rtype: multiprocessing.managers.dict """ graph_size = len(list(self)) shortest_path = self.shortest_path shortest_path_length = self.shortest_path_length tot_shortest_paths_list = self.manager.list() node_chunks = chunk_it(list(self.nodes()), self.num) processes = [ mp.Process( target=self.shortest_path_list_iteration, args=(node_chunks[p], shortest_path, tot_shortest_paths_list) ) for p in range(self.num) ] for proc in processes: proc.start() for proc in processes: proc.join() closeness_centrality = self.manager.dict() self.measure_processes(closeness_centrality, self.closeness_centrality_kernel, shortest_path_length, tot_shortest_paths_list, graph_size) return closeness_centrality
[docs] def compute_degree_centrality(self): """ Degree centrality calculation. .. note:: Degree centrality is a simple centrality measure that counts how many neighbors a node has in an undirected graph. The more neighbors the node has the most important it is, occupying a strategic position that serves as a source or conduit for large volumes of flux transactions with other nodes. A node with high degree centrality is a node with many dependencies. :return: degree centrality computed for every node. :rtype: multiprocessing.managers.dict """ graph_size = len(list(self)) degree_centrality = self.manager.dict() self.measure_processes(degree_centrality, self.degree_centrality_kernel, graph_size) return degree_centrality
[docs] def compute_indegree_centrality(self): """ In-degree centrality calculation. .. note:: In-degree centrality is measured by the number of edges ending at the node in a directed graph. Nodes with high in-degree centrality are called cascade resulting nodes. :return: in-degree centrality computed for every node. :rtype: multiprocessing.managers.dict """ graph_size = len(list(self)) indegree_centrality = self.manager.dict() self.measure_processes(indegree_centrality, self.indegree_centrality_kernel, graph_size) return indegree_centrality
[docs] def compute_outdegree_centrality(self): """ Out-degree centrality calculation. .. note:: Out-degree centrality is measured by the number of edges starting from a node in a directed graph. Nodes with high out-degree centrality are called cascade inititing nodes. :return: out-degree centrality computed for every node. :rtype: multiprocessing.managers.dict """ graph_size = len(list(self)) outdegree_centrality = self.manager.dict() self.measure_processes(outdegree_centrality, self.outdegree_centrality_kernel, graph_size) return outdegree_centrality