import os
import shutil
import numpy as np
from joblib import Parallel, delayed, parallel_backend
import subprocess
import time
[docs]def run(output_dir, max_processors, omnet_path, sim_time, repetitions, analyze, iter_path, inifile, makefile, verbose, NED_files_dir):
"""
Return the results relative to the desired parameter space in the form of an xarray data structure.csv.
Args:
output_dir: (path) The space of structure.csv to export.
max_processors: (int) The max number of cpus to use. By default, all cpus are used.
omnet_path: (path) Path to the OMNET++ installation.
By default the script try to find the installation path.
sim_time: (int) The simulation time. Common for all scenarios.
repetitions: (int) The number of runs for each iteration parameter.
analyze: (bool) If true execute simulation campaign. Otherwise analyze result files
from a simulation campaign to find missing simulations.
inifile: (path) Path to .ini file of veins project.
makefile: (path) Path to executable veins project.
"""
# Try to read scenarios from .ini file in veins_ini_file_path
sim_scenarios_list = get_scenarios(inifile)
print("\n Configuration file ({0}):\n".format(inifile))
# List scenarios found in .ini file
for i, scenario in enumerate(sim_scenarios_list):
print(' {0}) {1} '.format(i, scenario))
# Ask for scenarios to simulate or analyze
scenarios_to_sim = ask_for_scenarios_to_simulate(sim_scenarios_list)
# clear memory, swap after simulations
clear_memory()
# Build the simulation campaign
build_simulation_campaign(max_processors, output_dir, omnet_path, sim_time, repetitions, scenarios_to_sim,
iter_path, inifile, analyze, makefile, verbose, NED_files_dir)
# clear memory, swap after simulations
clear_memory()
[docs]def get_scenarios(veins_ini_file):
"""
Try to read simulation scenarios in VEINs project .ini file
OMNET++ defines [Config ] as the structure.csv in ini file to declare an scenario.
This function return a list with [Config ] declarations found in ini file.
"""
scenarios = []
with open(veins_ini_file, 'r') as ini_file:
for line in ini_file:
if isNotBlank(line):
if '[Config' in line:
s_name = line.split()
s_name = s_name[1]
s_name = s_name.strip(']')
if isNotBlank(s_name):
scenarios.append(s_name.strip())
return scenarios
[docs]def read_iteration_variables_from_file(scenarios_to_sim, iter_parameters_file_path):
"""
Get the number of iteration structure.csv (defined as specified in OMNET++ Simulation manual 10.4 Parameter Studies)
of each scenario (sim_scenarios) defined in .ini file.
e.g. OMNET++ study parameter definition -> *.numHosts = ${1, 2, 5, 10..50 step 10}
Args:
sim_scenarios (list): List of selected scenarios to include in the campaign
veins_ini_file_path (path): Path to .ini file of veins project.
Return a dictionary with scenarios as dic keys and read_iteration_variables_from_file as values.
Values in read_iteration_variables_from_file dictionary are defined as follows:
scenario1: [parameters1, parameter2, ... , # of read_iteration_variables_from_file]
scenario2: [parameters1, parameter2, ... , # of read_iteration_variables_from_file]
"""
dic_iter_vars = {}
dic_scenario_iter_vars = {}
iter_parameters_file_path = os.path.join(iter_parameters_file_path, 'variables.txt')
for s in scenarios_to_sim:
with open(iter_parameters_file_path, 'r') as ini_file:
for line in ini_file:
if isNotBlank(line):
ivar, value = line.split('=')
temp, units = value.split('}')
temp = temp.strip().strip('{')
vlist = temp.strip().split(',')
iter_var_tuple = (vlist, units.strip())
dic_iter_vars[ivar.strip()] = iter_var_tuple
dic_scenario_iter_vars[s] = dic_iter_vars
return dic_iter_vars, dic_scenario_iter_vars
[docs]def scenario_runs_set(scenario_iteration_variables_dictionary, repetitions):
"""
Generate runs list per scenario in OMNET++ format (opp_run all OMNET++ simulation manual 11.20 Running Simulation Campaigns)
for create batches e.g. -r 0,1,2,3.
Args:
scenario_iteration_variables_dictionary (dict): Dictionary with scenarios as keys and the number of iteration variables.txt as values
repetitions (int): The number of runs for each iteration parameter.
"""
runs_dic_command = {}
runs_dic_compute = {}
simulations_count = 0
for scenario in scenario_iteration_variables_dictionary:
values_list = [len(scenario_iteration_variables_dictionary[scenario][x][0]) for x in scenario_iteration_variables_dictionary[scenario]]
scenario_runs = np.prod(values_list) * repetitions
simulations_count += scenario_runs
runs_range = list(range(int(scenario_runs)))
# compute
runs_dic_compute[scenario] = runs_range
# opp_runall syntax
runs_dic_command[scenario] = str(runs_range).strip('[]').replace(" ", "")
total_simulations = scenario_runs * len(scenario_iteration_variables_dictionary)
return total_simulations, runs_dic_command, runs_dic_compute
[docs]def sim_campaign_info(scenarios_to_sim, iteration_variables_dictionary, repetitions, simtime, total_sims):
"""
Print simulation campaign summary:
Args:
"""
iter_var = [len(iteration_variables_dictionary[k][0]) for k in iteration_variables_dictionary.keys()]
banner = ' Campaign Info'
print('\n{}\n'.format(banner), '-' * len(banner))
print("\n Scenarios to simulate: {0}"
"\n Iteration variables.txt: {1} = {2}"
"\n Repetitions per scenario: {3}"
"\n Simulation time: {4}s"
"\n Total Runs: {5}".format(scenarios_to_sim, len(list(iteration_variables_dictionary.keys())), iter_var, repetitions,
simtime, total_sims))
print('', '-' * len(banner), '\n')
[docs]def run_simulations():
try:
s_build_campaign = str(input(' Run simulations (*Y/N): '))
except:
print('Wrong input!')
return False
else:
if s_build_campaign in ['y', 'Y', '']:
return True
else:
return False
[docs]def build_simulation_campaign(max_processors, output_dir, omnet_path, sim_time, repetitions, sim_scenarios_list,
variables_path, inifile, analyze, makefile, verbose, NED_files_dir):
"""
Execute parallel simulations of simulation campaign elements. If there is not enough processors,
a bath is used for queue simulations and distribute among processors.
Args:
@param max_processors: (int) The max number of cpus to use. By default, all cpus are used.
@param output_dir: (path) The space of structure.csv to export.
@param omnet_path: (path) The OMNET++ installation path
@param sim_time: (int) The simulation time. Common for all scenarios.
@param repetitions: (int) The number of runs for each iteration parameter.
@param sim_scenarios: (list) List of selected scenarios to include in the campaign
@param inifile: (path) Path to .ini file of veins project.
@param analyze: (bool) If true execute simulation campaign. Otherwise analyze result files
@param makefile: (path) Path to executable veins project.
@return:
"""
# Return a dictionary with scenarios as dic keys and read_iteration_variables_from_file as values
iteration_variables_dictionary, scenario_iteration_variables_dictionary = read_iteration_variables_from_file(
sim_scenarios_list, variables_path)
# Total number of runs = scenarios * iter variable * repetitions_per_scenario
# return -r 0,1,2,3,4 parameter per scenario according to iter variables.txt
total_sims, runs_dic_command, runs_dic_compute = scenario_runs_set(scenario_iteration_variables_dictionary, repetitions)
# simulation campaign information
sim_campaign_info(sim_scenarios_list, iteration_variables_dictionary, repetitions, sim_time, total_sims)
if analyze:
# check missing files of simulation campaign
missing_files(total_sims, output_dir)
else:
if run_simulations():
# excecute script cpu memory
#subprocess.Popen('/root/cpu_mem_check.sh')
# create temp ini file for the simulation campaign
temp_ini_name = create_temp_ini_file(output_dir, repetitions, inifile, iteration_variables_dictionary)
# create results folder
new_folder(output_dir)
if total_sims <= max_processors:
batch = 1 # default when b <= 1 enough cpus
else:
#b = total_sims / max_processors
#batch = math.ceil(b)
batch = allocate_processors(runs_dic_compute, max_processors)
# execute parallel simulations
parallel(max_processors, omnet_path, batch, sim_time, runs_dic_command, temp_ini_name, sim_scenarios_list, makefile, verbose, NED_files_dir)
[docs]def allocate_processors(df_scenarios, Processors):
len_scenarios = []
processors_per_scenario_tmp = []
[len_scenarios.append(len(df_scenarios[s])) for s in df_scenarios]
processors_per_scenario = (np.true_divide(len_scenarios, sum(len_scenarios))) * Processors
[processors_per_scenario_tmp.append(1 if s < 1 else np.math.floor(s)) for s in processors_per_scenario]
batch_per_scenario = np.ceil(np.divide(len_scenarios, processors_per_scenario_tmp))
#np.repeat(batch_per_scenario, processors_per_scenario_tmp)
return batch_per_scenario
[docs]def missing_files(total_sims, output_dir):
"""
Check in results folder if there are missing files of simulation campaign
@param total_sims: (int) Total number of runs = scenarios * iter variable * repetitions_per_scenario
@param output_dir: (path) The space of structure.csv to export.
@return:
"""
# Analyze results looking for missing result simulation files
files_in_results_folder = len(os.listdir(output_dir))
if total_sims == files_in_results_folder:
print('\n Files successfully generated: %s' % total_sims)
else:
print('\n Missing files in results folder: %s' % (total_sims - files_in_results_folder))
[docs]def parallel(max_processors, omnet_path, batch, sim_time, runs_bundle, temp_ini_name, sim_scenarios_list,
makefile, verbose, NED_files_dir):
"""
Execute parallel summary. If the number of cpus < # of summary a batch of runs is set
Args:
@param max_processors: (int) The max number of cpus to use. By default, all cpus are used.
@param omnet_path: (path) Path to the OMNET++ installation.
@param batch: (int) Number of simulations per cpu
@param sim_time: (int) The simulation time. Common for all scenarios.
@param runs_bundle: (int) Bundle of runs (e.g. 0,1,2,3..)
@param temp_ini_name: (string) VEINs ini configuration file name
@param iter_var_per_scenario: (dict) Dictionary with scenarios as keys and iterations as values
@param makefile: (path) Path to executable veins project.
@return:
"""
# Parallelize simulations
with parallel_backend("loky"):
Parallel(n_jobs=max_processors, verbose=10, pre_dispatch='1*n_jobs')(delayed(execute_sim)(makefile, max_processors, omnet_path, k, 1 if isinstance(batch, int) else int(batch[i]), sim_time, runs_bundle[k], temp_ini_name, verbose, NED_files_dir) for i, k in enumerate(sim_scenarios_list))
[docs]def new_folder(new_directory):
"""
Create new folder and replace if it exists. Used to creates the results folder where results files are saved.
Args:
new_directory (path): Path of the new folder
"""
if os.path.exists(new_directory):
shutil.rmtree(new_directory) # Removes all the subdirectories!
os.makedirs(new_directory)
[docs]def folder_permissions(veins_exec_project_path):
# TO DO assign folder permissions
pass
# os.chmod(PROJECT_EXECUTABLE, stat.S_IXGRP)
# os.system('chmod -R +x {0}'.format(os.path.dirname(PROJECT_EXECUTABLE)))
# os.system('chmod -R +x {0}'.format(os.path.dirname(VEINS_INI_PATH)))
# exec = '{0}/../../src/{1}'.format(temp_ini_name,) # TO DO find executable
[docs]def execute_sim(veins_exec_project_path, max_processors, omnet_path, scenario, batch, sim_time, runs, temp_ini_name,
verbose, NED_files_dir):
"""
Execute scenario simulation using OMNET++ funcionality (opp_run all OMNET++ simulation manual 11.20 Running Simulation Campaigns)
Args:
@param veins_exec_project_path: (path) Path to executable veins project.
@param max_processors: (int) The max number of cpus to use. By default, all cpus are used.
@param omnet_path: (path) Path to the OMNET++ installation.
@param scenario: Scenario to simulate
@param batch: Batch of simulations
@param sim_time: (int) The simulation time. Common for all scenarios.
@param runs: Bundle of runs (e.g. 0,1,2,3....)
@param temp_ini_name: (string) VEINs ini configuration file name
@return:
:param verbose:
"""
'-n .:../../src/veins '
# Change directory before execute simulation
os.chdir(os.path.dirname(temp_ini_name))
# Allow executions and file creation in simulations paths
folder_permissions(veins_exec_project_path)
if isNotBlank(scenario):
cmd = '{0}opp_runall -j{1} -b{2} {3} -u Cmdenv -c {4} -r {5} ' \
'-n {6} ' \
'--cmdenv-performance-display=false ' \
'--sim-time-limit={7}s ' \
'--cmdenv-redirect-output=true ' \
'--cmdenv-express-mode=true ' \
'{8}'.format(omnet_path, max_processors, batch, veins_exec_project_path, scenario, runs, NED_files_dir,
sim_time, temp_ini_name)
print(cmd)
# Execute command
if verbose:
os.system(cmd)
else:
to_log_file = subprocess.check_output(cmd, shell=True) # TO DO save in log file
[docs]def isNotBlank(myString):
"""
Check if string is empty or null.
Args:
@param myString: (string) Any string
@return: (bool)
"""
if myString and myString.strip():
return True
return False
[docs]def clear_memory():
"""Clean memory cache at the end of simulation execution"""
if os.name != 'nt': # Linux system
os.system('sync')
os.system('echo 3 > /proc/sys/vm/drop_caches')
os.system('swapoff -a && swapon -a')
# print("Memory cleaned")
[docs]def create_temp_ini_file(output_results, repetitions, veins_ini_file_name, iteration_variables_dictionary):
"""
Instantiates a temp.ini file with simulation campaign configurations.
Args:
output_results (path): The space of structure.csv to export.
repetitions (int): The number of runs for each iteration parameter.
veins_ini_file_name (path): Path to .ini file of veins project.
"""
file_name = veins_ini_file_name.strip('.ini')
temp_file_name = "{0}.temp.ini".format(file_name)
# declare variable where results will be save
var_save_result_in_ini = '*.*.appl.filename' # TO DO check in ini file a common variable
outputfile_extension = '.csv'
filename = '${configname},${iterationvarsf},${repetition}'+outputfile_extension
log_output_file = 'cmdenv-output-file' # TO DO check in ini file a common variable
with open(veins_ini_file_name, 'r') as ini_file:
with open(temp_file_name, 'w') as temp_ini_file:
for line in ini_file:
if 'repeat =' in line:
pass
elif 'repeat=' in line:
pass
elif log_output_file in line:
scenario_log_name = '{}.out'.format(filename)
temp_ini_file.write(
"{0} = {1}/../logs/{2}\n".format(log_output_file, output_results, scenario_log_name))
elif '[Config' in line:
temp_ini_file.writelines(line)
temp_ini_file.write("repeat = {0}\n".format(repetitions))
add_iteration_variables_to_scenarions_ini_file(temp_ini_file, iteration_variables_dictionary)
results_file_name = os.path.join(output_results, filename)
temp_ini_file.write('{0} = "{1}"\n'.format(var_save_result_in_ini, results_file_name))
else:
temp_ini_file.writelines(line)
return temp_file_name
[docs]def add_iteration_variables_to_scenarions_ini_file(temp_ini_file, iteration_variables_dictionary):
"""
Scenarios with same iteration variables.txt
"""
for iteration_variable in iteration_variables_dictionary:
values = '{' + ','.join(iteration_variables_dictionary[iteration_variable][0]) + '}'
temp_ini_file.write('{0} = ${1}{2}\n'.format(iteration_variable, values, iteration_variables_dictionary[iteration_variable][1]))
[docs]def ask_for_scenarios_to_simulate(sim_scenarios_list):
"""
Select scenarios to include in the simulation campaign
Args:
sim_scenarios_list(list): List of scenarios found in .ini file specified as argument
@param sim_scenarios_list:
@return:
"""
try:
scenarios_to_simulate = list(input('\n Select scenarios to simulate (e.g. 1,5,2 default=all): '))
# TO DO CHECK VALID SCENARIOS
except:
print('Wrong input!. Default value [all]')
else:
if not scenarios_to_simulate:
selected_scenarios_list = sim_scenarios_list
else:
if len(scenarios_to_simulate) > 1:
scenarios_to_simulate.remove(',')
selected_scenarios_list = [sim_scenarios_list[i] for i in list(map(int, scenarios_to_simulate))]
return selected_scenarios_list