From f477c5fefbdcdabc8ed9fb53a0b97e2542e74c36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Clasc=C3=A0?= Date: Wed, 17 Jul 2024 16:55:13 +0200 Subject: [PATCH 1/2] first implementation of buffered writing and progress bar --- list_buffer.py | 16 ++++++++++++++++ parse-nsys-stats.py | 37 ++++++++++++++++++++++++------------- 2 files changed, 40 insertions(+), 13 deletions(-) create mode 100644 list_buffer.py diff --git a/list_buffer.py b/list_buffer.py new file mode 100644 index 0000000..06e21a3 --- /dev/null +++ b/list_buffer.py @@ -0,0 +1,16 @@ + +class ListBuffer(): + """Use lists as a storage""" + def __init__(self): + self.__io = [] + + def clear(self): + old_val = self.value() + self.__init__() + return old_val + + def value(self): + return "".join(self.__io) + + def write(self, symbol): + self.__io.append(symbol) \ No newline at end of file diff --git a/parse-nsys-stats.py b/parse-nsys-stats.py index 39f0e70..6e06728 100755 --- a/parse-nsys-stats.py +++ b/parse-nsys-stats.py @@ -4,6 +4,7 @@ import numpy as np +import math import pandas as pd import argparse import time @@ -12,6 +13,8 @@ import os import locale import sqlite3 from sqlalchemy import create_engine +from list_buffer import ListBuffer +import tqdm locale.setlocale(locale.LC_ALL, '') @@ -331,12 +334,13 @@ nvtx_startend_df["thread"] = 0 nvtx_startend_df["task"] = 0 mpi_df["thread"] = 0 mpi_df["task"] = 0 -openacc_other_df["thread"] = 0 -openacc_other_df["task"] = 0 -openacc_launch_df["thread"] = 0 -openacc_launch_df["task"] = 0 -openacc_data_df["thread"] = 0 -openacc_data_df["task"] = 0 +if t_openacc: + openacc_other_df["thread"] = 0 + openacc_other_df["task"] = 0 + openacc_launch_df["thread"] = 0 + openacc_launch_df["task"] = 0 + openacc_data_df["thread"] = 0 + openacc_data_df["task"] = 0 threads['row_name'] = "THREAD 1." + threads['task'].astype(str) + '.' + threads['thread'].astype(str) @@ -789,16 +793,23 @@ prv_file.write(header) # Write events -chunk = "" +BLOCK_WRITE_SIZE = 2048 -print("-\tWriting kernel...") +num_rows = kernels_df.shape[0] +print("-\tWriting kernel... Number of rows: {}".format(num_rows)) +print(" -\tExpected size in disk: {}Gb".format((num_rows * 328)/10e9)) types = [event_type_kernels] + event_types_block_grid_values + [event_type_registers_thread, event_type_correlation] -for index, row in kernels_df.iterrows(): - values = [row["event_value"]] + [int(row['GrdX']), int(row['GrdY']), int(row['GrdZ']), int(row['BlkX']), int(row['BlkY']), int(row['BlkZ']), int(row['Reg/Trd']), row["CorrID"]] - chunk += create_combined_events_record(row.iloc[0], row.iloc[1], int(row["thread"]), int(row["task"]), types, values) -prv_file.write(chunk) -chunk = "" +lbuffer = ListBuffer() +for b in tqdm.tqdm(range(math.floor(num_rows / BLOCK_WRITE_SIZE))): + limit = min(BLOCK_WRITE_SIZE, num_rows - i) + for index in range(limit): + row = kernels_df.iloc[index + b*BLOCK_WRITE_SIZE] + values = [row["event_value"]] + [int(row['GrdX']), int(row['GrdY']), int(row['GrdZ']), int(row['BlkX']), int(row['BlkY']), int(row['BlkZ']), int(row['Reg/Trd']), row["CorrID"]] + lbuffer.write(create_combined_events_record(row.iloc[0], row.iloc[1], int(row["thread"]), int(row["task"]), types, values)) + prv_file.write(lbuffer.value()) + lbuffer.clear() +chunk = "" print("-\tWriting memory operations...") types_mem = [event_type_kernels, event_type_memcopy_size, event_type_correlation] for index, row in memops_df.iterrows(): -- GitLab From ec6dfb689f855f756601d34062049719e2ec60c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Clasc=C3=A0?= Date: Fri, 19 Jul 2024 12:22:23 +0200 Subject: [PATCH 2/2] Changes all writing loops to a generalized function with lambda for serializing the final prv line. This closes #9. --- EventWriter.py | 16 +++++++ parse-nsys-stats.ipynb | 2 +- parse-nsys-stats.py | 104 +++++++++++------------------------------ 3 files changed, 45 insertions(+), 77 deletions(-) create mode 100644 EventWriter.py diff --git a/EventWriter.py b/EventWriter.py new file mode 100644 index 0000000..79b6a5d --- /dev/null +++ b/EventWriter.py @@ -0,0 +1,16 @@ +import tqdm +import math +from list_buffer import ListBuffer + +BLOCK_WRITE_SIZE = 4096 + +def EventWriter(prv_file, df, name, serialization_f): + num_rows = df.shape[0] + lbuffer = ListBuffer() + for b in tqdm.tqdm(range(math.floor(num_rows / BLOCK_WRITE_SIZE)+1), desc="{} ({}Mb estimated size)".format(name, (num_rows * 180 * 2)/10e6), unit="blocks"): + limit = min(BLOCK_WRITE_SIZE, num_rows - b*BLOCK_WRITE_SIZE) + for index in range(limit): + row = df.iloc[index + b*BLOCK_WRITE_SIZE] + lbuffer.write(serialization_f(row)) + prv_file.write(lbuffer.value()) + lbuffer.clear() \ No newline at end of file diff --git a/parse-nsys-stats.ipynb b/parse-nsys-stats.ipynb index ae044a7..c680c86 100644 --- a/parse-nsys-stats.ipynb +++ b/parse-nsys-stats.ipynb @@ -4775,7 +4775,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.12.4" } }, "nbformat": 4, diff --git a/parse-nsys-stats.py b/parse-nsys-stats.py index 6e06728..0da5e32 100755 --- a/parse-nsys-stats.py +++ b/parse-nsys-stats.py @@ -2,9 +2,6 @@ # coding: utf-8 - -import numpy as np -import math import pandas as pd import argparse import time @@ -13,8 +10,7 @@ import os import locale import sqlite3 from sqlalchemy import create_engine -from list_buffer import ListBuffer -import tqdm +import EventWriter as ewr locale.setlocale(locale.LC_ALL, '') @@ -793,98 +789,54 @@ prv_file.write(header) # Write events -BLOCK_WRITE_SIZE = 2048 -num_rows = kernels_df.shape[0] -print("-\tWriting kernel... Number of rows: {}".format(num_rows)) -print(" -\tExpected size in disk: {}Gb".format((num_rows * 328)/10e9)) types = [event_type_kernels] + event_types_block_grid_values + [event_type_registers_thread, event_type_correlation] -lbuffer = ListBuffer() -for b in tqdm.tqdm(range(math.floor(num_rows / BLOCK_WRITE_SIZE))): - limit = min(BLOCK_WRITE_SIZE, num_rows - i) - for index in range(limit): - row = kernels_df.iloc[index + b*BLOCK_WRITE_SIZE] - values = [row["event_value"]] + [int(row['GrdX']), int(row['GrdY']), int(row['GrdZ']), int(row['BlkX']), int(row['BlkY']), int(row['BlkZ']), int(row['Reg/Trd']), row["CorrID"]] - lbuffer.write(create_combined_events_record(row.iloc[0], row.iloc[1], int(row["thread"]), int(row["task"]), types, values)) - prv_file.write(lbuffer.value()) - lbuffer.clear() - -chunk = "" -print("-\tWriting memory operations...") +ewr.EventWriter(prv_file, kernels_df, "Kernels", lambda r: + (create_combined_events_record(r.iloc[0], r.iloc[1], int(r["thread"]), int(r["task"]), types, [r["event_value"]] + [int(r['GrdX']), int(r['GrdY']), int(r['GrdZ']), int(r['BlkX']), int(r['BlkY']), int(r['BlkZ']), int(r['Reg/Trd']), r["CorrID"]]))) + types_mem = [event_type_kernels, event_type_memcopy_size, event_type_correlation] -for index, row in memops_df.iterrows(): - values = [row["event_value"], row["bytes_b"], row["CorrID"]] - chunk += create_combined_events_record(row.iloc[0], row.iloc[1], int(row["thread"]), int(row["task"]), types_mem, values) -prv_file.write(chunk) -chunk = "" +ewr.EventWriter(prv_file, memops_df, "Memory operations", lambda r: + (create_combined_events_record(r.iloc[0], r.iloc[1], int(r["thread"]), int(r["task"]), types_mem, [r["event_value"], r["bytes_b"], r["CorrID"]]))) if t_apicalls: - print("-\tWriting CUDA API calls...") types_api = [event_type_api, event_type_correlation] - for index, row in cuda_api_df.iterrows(): - values = [row["event_value"], row["CorrID"]] - chunk += create_combined_events_record(row.iloc[0], row.iloc[1], int(row["thread"]), int(row["task"]), types_api, values) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, cuda_api_df, "CUDA API calls", lambda r: + (create_combined_events_record(r.iloc[0], r.iloc[1], int(r["thread"]), int(r["task"]), types_api, [r["event_value"], r["CorrID"]]))) + if t_nvtx: - print("-\tWriting NVTX pushpop ranges...") - for index, row in nvtx_df_subset.iterrows(): - chunk += create_event_record(row.iloc[0], row.iloc[2], int(row["thread"]), int(row["task"]), event_type_nvtx, row["event_value"]) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, nvtx_df_subset, "NVTX pushpop ranges", lambda r: + (create_event_record(r.iloc[0], r.iloc[2], int(r["thread"]), int(r["task"]), event_type_nvtx, r["event_value"]))) if t_nvtx_startend: - print("-\tWriting NVTX startend ranges...") - for index, row in nvtx_startend_df.iterrows(): - chunk += create_event_record(row.iloc[0], row.iloc[2], int(row["thread"]), int(row["task"]), event_type_nvtx_startend, row["event_value"]) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, nvtx_startend_df, "NVTX startend ranges", lambda r: + (create_event_record(r.iloc[0], r.iloc[2], int(r["thread"]), int(r["task"]), event_type_nvtx_startend, r["event_value"]))) + if t_mpi: - print("-\tWriting MPI events...") - for index, row in mpi_df.iterrows(): - chunk += create_event_record(row.iloc[0], row.iloc[2], int(row["thread"]), int(row["task"]), event_type_mpi, row["event_value"]) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, mpi_df, "MPI events", lambda r: + (create_event_record(r.iloc[0], r.iloc[2], int(r["thread"]), int(r["task"]), event_type_mpi, r["event_value"]))) if t_openacc: - print("-\tWriting OpenACC events...") t_acc_d = [event_type_openacc_data, event_type_name_openacc_data, event_type_func_openacc_data, event_type_openacc_data_size] - for index, r in openacc_data_df.iterrows(): - values = [r["eventKind"], r["name_value"], r["func_value"], r["bytes"]] - chunk += create_combined_events_record(r["start"], r["end"] - r["start"], r["thread"], r["task"], t_acc_d, values) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, openacc_data_df, "OpenACC data constructs", lambda r: + (create_combined_events_record(r["start"], r["end"] - r["start"], r["thread"], r["task"], t_acc_d, [r["eventKind"], r["name_value"], r["func_value"], r["bytes"]]))) t_acc_l = [event_type_openacc_launch, event_type_name_openacc_launch, event_type_func_openacc_launch] - for index, r in openacc_launch_df.iterrows(): - values = [r["eventKind"], r["name_value"], r["func_value"]] - chunk += create_combined_events_record(r["start"], r["end"] - r["start"], r["thread"], r["task"], t_acc_l, values) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, openacc_launch_df, "OpenACC launch constructs", lambda r: + (create_combined_events_record(r["start"], r["end"] - r["start"], r["thread"], r["task"], t_acc_l, [r["eventKind"], r["name_value"], r["func_value"]]))) t_acc_o = [event_type_openacc, event_type_name_openacc, event_type_func_openacc] - for index, r in openacc_other_df.iterrows(): - values = [r["eventKind"], r["name_value"], r["func_value"]] - chunk += create_combined_events_record(r["start"], r["end"] - r["start"], r["thread"], r["task"], t_acc_o, values) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, openacc_other_df, "OpenACC other constructs", lambda r: + (create_combined_events_record(r["start"], r["end"] - r["start"], r["thread"], r["task"], t_acc_o, [r["eventKind"], r["name_value"], r["func_value"]]))) if t_metrics: - print("-\tWriting GPU metrics...") - for index, row in gpu_metrics_agg.iterrows(): - chunk += create_metrics_record(row) - prv_file.write(chunk) - chunk = "" + ewr.EventWriter(prv_file, gpu_metrics_agg, "GPU metrics", lambda r: + (create_metrics_record(r))) if t_apicalls: - print("-\tWriting correlation lines...") - for index, row in comm_kernel_df.iterrows(): - chunk += create_communication_record(row["task"], row["thread_call"], row["task"], row["thread_k"], (row["Start (ns)_call"]), row["Start (ns)_k"], 0, comm_tag_launch) - prv_file.write(chunk) - chunk = "" - for index, row in comm_memory_df.iterrows(): - chunk += create_communication_record(row["task"], row["thread_call"], row["task"], row["thread_mem"], (row["Start (ns)_call"]), row["Start (ns)_mem"], int(row["bytes_b"]), comm_tag_memory) - prv_file.write(chunk) + ewr.EventWriter(prv_file, comm_kernel_df, "Kernel correlation lines", lambda r: + (create_communication_record(r["task"], r["thread_call"], r["task"], r["thread_k"], (r["Start (ns)_call"]), r["Start (ns)_k"], 0, comm_tag_launch))) + ewr.EventWriter(prv_file, comm_memory_df, "Memory correlation lines", lambda r: + (create_communication_record(r["task"], r["thread_call"], r["task"], r["thread_mem"], (r["Start (ns)_call"]), r["Start (ns)_mem"], int(r["bytes_b"]), comm_tag_memory))) prv_file.close() -- GitLab