Source code for SimulationClient.MetricValues

import math
import socket
import logging
from multiprocessing.connection import Client

from SimulationUtilities.Communication_Codes import comm_code


[docs]def get_metric(curve, number_of_inner_points, metric_server_addresses, authkey): """ This function distributes the task of computing the metric values along the curve using the SimulationPotential instances. Args: curve (numpy.array): A list of NumPy arrays representing a local geodesic. number_of_inner_points (int): The number of points along the curve, less two. metric_server_addresses: A list of tuples of the form (str, int) containing the hostnames and port numbers of the SimulationPotential instances. authkey (str): The password used in order to communicate with the SimulationPotential instances. Returns: list: A list of float values called metric where metric[i] = a(curve[i]). """ # Initialise the memory for the metric values metric = [[]] * (number_of_inner_points + 2) # Compute how many SimulationPotential instances are available to the SimulationClient number_of_metric_servers = len(metric_server_addresses) # Estimate how many tasks per SimulationPotential is a uniform distribution tasks_per_server = int(math.floor((number_of_inner_points + 2) / number_of_metric_servers)) # Compute how many tasks are left over if I give each SimulationPotential tasks_per_server points to evaluate. remainder_tasks = number_of_inner_points + 2 - tasks_per_server * number_of_metric_servers # Create a flag to indicate which point along the curve is currently being issued current_point = 0 # Indicate whether the current PotentialServer instance needs an extra task to ensure all the nodes are computed extra_jobs = 1 # For each SimulationPotential instance... for address in xrange(number_of_metric_servers): # If all of the surplus tasks have been issued then indicate no extra tasks are needed if address >= remainder_tasks: extra_jobs = 0 # Initialise a list to contain the points to be computed for the current SimulationPotential instance points = [] # Get the points between and including curve[current_point] to curve[current_point+tasks_per_server+extra_jobs] # and prepare them to send to the current SimulationPotential instance for i in xrange(current_point, current_point + tasks_per_server + extra_jobs): points.append([curve[i], i]) # Increase current_point so the same points are not sent again current_point += tasks_per_server + extra_jobs # Attempt to contact the SimulationPotential instance try: # Create connection with the SimulationPotential instance metric_server = Client(metric_server_addresses[address], authkey=authkey) # Prepare a response containing the points to evaluate the metric on metric_server.send({'status_code': comm_code('CLIENT_PROVIDES_POINT'), 'points': points}) # Close the connection metric_server.close() # If the SimulationPotential instance couldn't be contacted then return None to shut down the SimulationClient except socket.error: # Write a warning to the log explaining which SimulationPotential couldn't be contacted. logging.warning('Failed to Make Connection to SimulationPotential at ' + str(metric_server_addresses[address]) + '.') # Return a None value return None # Now the code attempts to collect the computed values. # For each SimulationPotential instance... for address in xrange(number_of_metric_servers): # Attempt to contact the SimulationPotential instance try: # Create connection with the SimulationPotential instance metric_server = Client(metric_server_addresses[address], authkey=authkey) # Prepare a response indicating the SimulationClient is ready for the answer metric_server.send({'status_code': comm_code('CLIENT_ASKS_FOR_VALUES')}) # Receive the metric values at the given points metric_server_response = metric_server.recv() # Process the received values into the metric list for value in metric_server_response['values']: metric[value[1]] = value[0] # Close the connection metric_server.close() # If the SimulationPotential instance couldn't be contacted then return None to shut down the SimulationClient except socket.error: # Write a warning to the log explaining which SimulationPotential couldn't be contacted. logging.warning('Failed to Make Connection to SimulationPotential at ' + str(metric_server_addresses[address]) + '.') # Return a None value return None # If None hasn't been returned then return the metric values return metric
[docs]def shutdown_metric(metric_server_addresses, authkey): """ This function tells all of the SimulationPotential instances running on the addresses in metric_server_addresses to shutdown. This is called when a SimulationClient instance shuts down. Args: metric_server_addresses: A list of tuples of the form (str, int) containing the hostnames and port numbers of the SimulationPotential instances. authkey (str): The password used in order to communicate with the SimulationPotential instances. Returns: float: The length of the curve with metric values in metric. """ # For each SimulationPotential instance... for address in xrange(len(metric_server_addresses)): # Try making contact with the SimulationPotential instance... try: # Establish a connection with the SimulationPotential metric_server = Client(metric_server_addresses[address], authkey=authkey) # Send a status code indicating the SimulationPotential instance should stop running. metric_server.send({'status_code': comm_code('KILL')}) # Close the connection. metric_server.close() # If contact with the SimulationPotential instance cannot be made then... except socket.error: # Make a note in the log which SimulationPotential couldn't be contacted. logging.warning('Failed to Make Connection to SimulationPotential at ' + str(metric_server_addresses[address]) + '.')