Skip to content

Commit

Permalink
engineering_optim_trials - merge recent code optimisations on CARTESI…
Browse files Browse the repository at this point in the history
…US local branch
  • Loading branch information
CKehl committed Dec 18, 2020
2 parents 9957c3b + b2b82ec commit 42ba546
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 23 deletions.
113 changes: 93 additions & 20 deletions parcels/particleset_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import timedelta as delta
import psutil
import os
from platform import system as system_name
import matplotlib.pyplot as plt
import sys

Expand All @@ -20,10 +21,35 @@
from parcels.particle import JITParticle
from parcels.kernel_benchmark import Kernel_Benchmark, Kernel
from parcels.tools.loggers import logger
from parcels.tools.performance_logger import TimingLog, ParamLogging
from parcels.tools.performance_logger import TimingLog, ParamLogging, Asynchronous_ParamLogging

from resource import getrusage, RUSAGE_SELF

__all__ = ['ParticleSet_Benchmark']

def measure_mem():
process = psutil.Process(os.getpid())
pmem = process.memory_info()
pmem_total = pmem.shared + pmem.text + pmem.data + pmem.lib
# print("psutil - res-set: {}; res-shr: {} res-text: {}, res-data: {}, res-lib: {}; res-total: {}".format(pmem.rss, pmem.shared, pmem.text, pmem.data, pmem.lib, pmem_total))
return pmem_total

def measure_mem_rss():
process = psutil.Process(os.getpid())
pmem = process.memory_info()
pmem_total = pmem.shared + pmem.text + pmem.data + pmem.lib
# print("psutil - res-set: {}; res-shr: {} res-text: {}, res-data: {}, res-lib: {}; res-total: {}".format(pmem.rss, pmem.shared, pmem.text, pmem.data, pmem.lib, pmem_total))
return pmem.rss

def measure_mem_usage():
rsc = getrusage(RUSAGE_SELF)
print("RUSAGE - Max. RES set-size: {}; shr. mem size: {}; ushr. mem size: {}".format(rsc.ru_maxrss, rsc.ru_ixrss, rsc.ru_idrss))
if system_name() == "Linux":
return rsc.ru_maxrss*1024
return rsc.ru_maxrss

USE_ASYNC_MEMLOG = True
USE_RUSE_SYNC_MEMLOG = False # can be faulty

class ParticleSet_Benchmark(ParticleSet):

Expand All @@ -37,9 +63,13 @@ def __init__(self, fieldset, pclass=JITParticle, lon=None, lat=None, depth=None,
self.plot_log = TimingLog()
self.nparticle_log = ParamLogging()
self.mem_log = ParamLogging()
self.async_mem_log = Asynchronous_ParamLogging()
self.process = psutil.Process(os.getpid())

#@profile
def set_async_memlog_interval(self, interval):
self.async_mem_log.measure_interval = interval

# @profile
def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1.,
moviedt=None, recovery=None, output_file=None, movie_background_field=None,
verbose_progress=None, postIterationCallbacks=None, callbackdt=None):
Expand Down Expand Up @@ -169,12 +199,22 @@ def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1.,
next_input = self.fieldset.computeTimeChunk(time, np.sign(dt))

tol = 1e-12
walltime_start = None
if verbose_progress is None:
walltime_start = time_module.time()
pbar = None
if verbose_progress:
pbar = self._create_progressbar_(_starttime, endtime)

if USE_ASYNC_MEMLOG:
self.async_mem_log.measure_func = measure_mem
mem_used_start = measure_mem()

while (time < endtime and dt > 0) or (time > endtime and dt < 0) or dt == 0:
self.total_log.start_timing()
if USE_ASYNC_MEMLOG:
self.async_mem_log.measure_start_value = mem_used_start
self.async_mem_log.start_partial_measurement()
if verbose_progress is None and time_module.time() - walltime_start > 10:
# Showing progressbar if runtime > 10 seconds
if output_file:
Expand All @@ -193,22 +233,27 @@ def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1.,
self.kernel.execute(self, endtime=time, dt=dt, recovery=recovery, output_file=output_file, execute_once=execute_once)
if abs(time-next_prelease) < tol:
# creating new particles equals a memory-io operation
if isinstance(self.kernel, Kernel_Benchmark):
self.mem_io_log.start_timing()
if not isinstance(self.kernel, Kernel_Benchmark):
self.compute_log.stop_timing()
self.compute_log.accumulate_timing()

self.mem_io_log.start_timing()
pset_new = ParticleSet(fieldset=self.fieldset, time=time, lon=self.repeatlon,
lat=self.repeatlat, depth=self.repeatdepth,
pclass=self.repeatpclass, lonlatdepth_dtype=self.lonlatdepth_dtype,
partitions=False, pid_orig=self.repeatpid, **self.repeatkwargs)
for p in pset_new:
p.dt = dt
self.add(pset_new)
if isinstance(self.kernel, Kernel_Benchmark):
self.mem_io_log.stop_timing()
self.mem_io_log.accumulate_timing()
self.mem_io_log.stop_timing()
self.mem_io_log.accumulate_timing()
next_prelease += self.repeatdt * np.sign(dt)
if not isinstance(self.kernel, Kernel_Benchmark):
self.compute_log.stop_timing()
else:
if not isinstance(self.kernel, Kernel_Benchmark):
self.compute_log.stop_timing()
else:
pass
if isinstance(self.kernel, Kernel_Benchmark):
self.compute_log.add_aux_measure(self.kernel.compute_timings.sum())
self.kernel.compute_timings.reset()
self.io_log.add_aux_measure(self.kernel.io_timings.sum())
Expand All @@ -233,9 +278,13 @@ def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1.,
next_movie += moviedt * np.sign(dt)
# ==== insert post-process here to also allow for memory clean-up via external func ==== #
if abs(time-next_callback) < tol:
# ==== assuming post-processing functions largely use memory than hard computation ... ==== #
self.mem_io_log.start_timing()
if postIterationCallbacks is not None:
for extFunc in postIterationCallbacks:
extFunc()
self.mem_io_log.stop_timing()
self.mem_io_log.accumulate_timing()
next_callback += callbackdt * np.sign(dt)
if time != endtime: # ==== IO ==== #
self.io_log.start_timing()
Expand All @@ -252,8 +301,13 @@ def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1.,
self.total_log.stop_timing()
self.total_log.accumulate_timing()
mem_B_used_total = 0
mem_B_used_total = self.process.memory_info().rss
if USE_RUSE_SYNC_MEMLOG:
mem_B_used_total = measure_mem_usage()
else:
mem_B_used_total = measure_mem_rss()
self.mem_log.advance_iteration(mem_B_used_total)
if USE_ASYNC_MEMLOG:
self.async_mem_log.stop_partial_measurement() # does 'advance_iteration' internally

self.compute_log.advance_iteration()
self.io_log.advance_iteration()
Expand Down Expand Up @@ -288,7 +342,7 @@ def Kernel(self, pyfunc, c_include="", delete_cfiles=True):
:param delete_cfiles: Boolean whether to delete the C-files after compilation in JIT mode (default is True)
"""
return Kernel_Benchmark(self.fieldset, self.ptype, pyfunc=pyfunc, c_include=c_include,
delete_cfiles=delete_cfiles)
delete_cfiles=delete_cfiles)

def plot_and_log(self, total_times = None, compute_times = None, io_times = None, plot_times = None, memory_used = None, nparticles = None, target_N = 1, imageFilePath = "", odir = os.getcwd(), xlim_range=None, ylim_range=None):
# == do something with the log-arrays == #
Expand Down Expand Up @@ -322,6 +376,10 @@ def plot_and_log(self, total_times = None, compute_times = None, io_times = None
if not isinstance(nparticles, np.ndarray):
nparticles = np.array(nparticles, dtype=np.int32)

memory_used_async = None
if USE_ASYNC_MEMLOG:
memory_used_async = np.array(self.async_mem_log.get_params(), dtype=np.int64)

t_scaler = 1. * 10./1.0
npart_scaler = 1.0 / 1000.0
mem_scaler = 1.0 / (1024 * 1024 * 1024)
Expand All @@ -331,12 +389,17 @@ def plot_and_log(self, total_times = None, compute_times = None, io_times = None
plot_drawt = (plot_times * t_scaler).tolist()
plot_npart = (nparticles * npart_scaler).tolist()
plot_mem = []
if memory_used is not None and len(memory_used)>1:
if memory_used is not None and len(memory_used) > 1:
plot_mem = (memory_used * mem_scaler).tolist()

plot_mem_async = None
if USE_ASYNC_MEMLOG:
plot_mem_async = (memory_used_async * mem_scaler).tolist()

do_iot_plot = True
do_drawt_plot = True
do_drawt_plot = False
do_mem_plot = True
do_mem_plot_async = True
do_npart_plot = True
assert (len(plot_t) == len(plot_ct))
if len(plot_t) != len(plot_iot):
Expand All @@ -361,7 +424,10 @@ def plot_and_log(self, total_times = None, compute_times = None, io_times = None
if do_drawt_plot:
ax.plot(x, plot_drawt, 'o-', label="draw-time spent [100ms]")
if (memory_used is not None) and do_mem_plot:
ax.plot(x, plot_mem, '.-', label="memory_used (cumulative) [1 GB]")
ax.plot(x, plot_mem, '--', label="memory_used (cumulative) [1 GB]")
if USE_ASYNC_MEMLOG:
if (memory_used_async is not None) and do_mem_plot_async:
ax.plot(x, plot_mem_async, ':', label="memory_used [async] (cum.) [1GB]")
if do_npart_plot:
ax.plot(x, plot_npart, '-', label="sim. particles [# 1000]")
if xlim_range is not None:
Expand All @@ -387,15 +453,22 @@ def plot_and_log(self, total_times = None, compute_times = None, io_times = None
ncores = 1
if MPI:
mpi_comm = MPI.COMM_WORLD
ncores = mpi_comm.Get_size()
ncores = mpi_comm.Get_size()
header_string = "target_N, start_N, final_N, avg_N, ncores, avg_kt_total[s], avg_kt_compute[s], avg_kt_io[s], avg_kt_plot[s], cum_t_total[s], cum_t_compute[s], com_t_io[s], cum_t_plot[s], max_mem[MB]\n"
f.write(header_string)
data_string = "{}, {}, {}, {}, {}, ".format(target_N, nparticles_t0, nparticles_tN, nparticles.mean(), ncores)
data_string+= "{:2.10f}, {:2.10f}, {:2.10f}, {:2.10f}, ".format(total_times.mean(), compute_times.mean(), io_times.mean(), plot_times.mean())
max_mem = 0
if memory_used is not None and len(memory_used)>1:
data_string += "{:2.10f}, {:2.10f}, {:2.10f}, {:2.10f}, ".format(total_times.mean(), compute_times.mean(), io_times.mean(), plot_times.mean())
max_mem_sync = 0
if memory_used is not None and len(memory_used) > 1:
memory_used = np.floor(memory_used / (1024*1024))
memory_used = memory_used.astype(dtype=np.uint32)
max_mem = memory_used.max()
data_string+= "{:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}, {}".format(total_times.sum(), compute_times.sum(), io_times.sum(), plot_times.sum(), max_mem)
max_mem_sync = memory_used.max()
max_mem_async = 0
if USE_ASYNC_MEMLOG:
if memory_used_async is not None and len(memory_used_async) > 1:
memory_used_async = np.floor(memory_used_async / (1024*1024))
memory_used_async = memory_used_async.astype(dtype=np.int64)
max_mem_async = memory_used_async.max()
max_mem = max(max_mem_sync, max_mem_async)
data_string += "{:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}, {}".format(total_times.sum(), compute_times.sum(), io_times.sum(), plot_times.sum(), max_mem)
f.write(data_string)
Loading

0 comments on commit 42ba546

Please sign in to comment.