File size: 18,499 Bytes
f7cde70
 
 
 
 
8f99c19
f7cde70
 
 
acead3f
ffb4f17
 
acead3f
f7cde70
 
 
 
 
 
 
 
 
 
9734238
 
 
 
 
 
 
 
 
 
f7cde70
 
9734238
 
 
 
f7cde70
 
 
 
 
 
 
 
 
 
 
da031a5
f7cde70
da031a5
f7cde70
da031a5
f7cde70
 
 
 
da031a5
 
f7cde70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9734238
f7cde70
da031a5
9734238
 
da031a5
f7cde70
 
 
da031a5
9734238
 
da031a5
f7cde70
8f99c19
 
f7cde70
8f99c19
 
9734238
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f99c19
9734238
8f99c19
 
 
 
9734238
 
f7cde70
9734238
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7cde70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb498b2
 
f7cde70
bb498b2
9734238
bb498b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9734238
bb498b2
 
 
 
 
ffb4f17
bb498b2
ffb4f17
bb498b2
 
ffb4f17
bb498b2
 
 
 
 
ffb4f17
bb498b2
 
 
ffb4f17
 
bb498b2
 
 
 
ffb4f17
 
 
bb498b2
 
ffb4f17
bb498b2
ffb4f17
 
 
 
 
 
 
bb498b2
 
ffb4f17
 
 
 
 
 
 
bb498b2
ffb4f17
bb498b2
ffb4f17
 
 
bb498b2
 
ffb4f17
bb498b2
 
 
ffb4f17
 
bb498b2
 
 
 
 
 
c8d26ac
 
 
 
 
 
 
bb498b2
fcecfa3
bb498b2
9734238
bb498b2
 
 
f7cde70
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
from contextlib import ExitStack
from dataclasses import dataclass
from typing import List

import click
import os
import gradio as gr
import pandas as pd

import traceback
import glob
import json

from parse_results import build_results


@dataclass
class PlotConfig:
    x_title: str
    y_title: str
    title: str
    percentiles: List[float] = None

def check_file_exists(path, label=""):
    if os.path.exists(path):
        print(f"βœ… {label} file exists: {path}")
        print(f"   File size: {os.path.getsize(path)} bytes")
        print(f"   Absolute path: {os.path.abspath(path)}")
    else:
        print(f"❌ {label} file NOT found: {path}")
        print(f"   Current working directory: {os.getcwd()}")
        print(f"   Directory contents: {os.listdir(os.path.dirname(path) if os.path.dirname(path) else '.')}")


def run(from_results_dir, datasource, port):
    print(f"πŸ’‘ Debug - from_results_dir: {from_results_dir}")
    print(f"πŸ’‘ Debug - datasource: {datasource}")
    print(f"πŸ’‘ Debug - current directory: {os.getcwd()}")

    css = '''
    .summary span {
        font-size: 10px;
        padding-top:0;
        padding-bottom:0;
    }
    '''

    summary_desc = '''
    ## Summary
    This table shows the average of the metrics for each model and QPS rate.

    The metrics are:
    * Inter token latency: Time to generate a new output token for each user querying the system.
      It translates as the β€œspeed” perceived by the end-user. We aim for at least 300 words per minute (average reading speed), so ITL<150ms
    * Time to First Token: Time the user has to wait before seeing the first token of its answer.
      Lower waiting time are essential for real-time interactions, less so for offline workloads.
    * End-to-end latency: The overall time the system took to generate the full response to the user.
    * Throughput: The number of tokens per second the system can generate across all requests
    * Successful requests: The number of requests the system was able to honor in the benchmark timeframe
    * Error rate: The percentage of requests that ended up in error, as the system could not process them in time or failed to process them.

    '''

    df_bench = pd.DataFrame()
    line_plots_bench = []
    column_mappings = {'inter_token_latency_ms_p90': 'ITL P90 (ms)', 'time_to_first_token_ms_p90': 'TTFT P90 (ms)',
                       'e2e_latency_ms_p90': 'E2E P90 (ms)', 'token_throughput_secs': 'Throughput (tokens/s)',
                       'successful_requests': 'Successful requests', 'error_rate': 'Error rate (%)', 'model': 'Model',
                       'rate': 'QPS', 'run_id': 'Run ID'}
    default_df = pd.DataFrame.from_dict(
        {"rate": [1, 2], "inter_token_latency_ms_p90": [10, 20],
         "version": ["default", "default"],
         "model": ["default", "default"]})

    def load_demo(model_bench, percentiles):
        return update_bench(model_bench, percentiles)

    def update_bench(model, percentiles):
        res = []
        for plot in line_plots_bench:
            if plot['config'].percentiles:
                k = plot['metric'] + '_' + str(percentiles)
                df_bench[plot['metric']] = df_bench[k] if k in df_bench.columns else 0
            res.append(df_bench[(df_bench['model'] == model)])

        return res + [summary_table()]

    def summary_table() -> pd.DataFrame:
        data = df_bench.groupby(['model', 'run_id', 'rate']).agg(
            {'inter_token_latency_ms_p90': 'mean', 'time_to_first_token_ms_p90': 'mean',
             'e2e_latency_ms_p90': 'mean', 'token_throughput_secs': 'mean',
             'successful_requests': 'mean', 'error_rate': 'mean'}).reset_index()
        data = data[
            ['run_id', 'model', 'rate', 'inter_token_latency_ms_p90', 'time_to_first_token_ms_p90',
             'e2e_latency_ms_p90',
             'token_throughput_secs']]
        for metric in ['inter_token_latency_ms_p90', 'time_to_first_token_ms_p90', 'e2e_latency_ms_p90',
                       'token_throughput_secs']:
            data[metric] = data[metric].apply(lambda x: f"{x:.2f}")
        data = data.rename(
            columns=column_mappings)
        return data

    def load_bench_results(source) -> pd.DataFrame:
        data = pd.read_parquet(source)
        # remove warmup and throughput
        data = data[(data['id'] != 'warmup') & (data['id'] != 'throughput')]
        # only keep constant rate
        data = data[data['executor_type'] == 'ConstantArrivalRate']
        return data

    def select_region(selection: gr.SelectData, model):
        min_w, max_w = selection.index
        data = df_bench[(df_bench['model'] == model) & (df_bench['rate'] >= min_w) & (
                df_bench['rate'] <= max_w)]
        res = []
        for plot in line_plots_bench:
            # find the y values for the selected region
            metric = plot["metric"]
            y_min = data[metric].min()
            y_max = data[metric].max()
            res.append(gr.LinePlot(x_lim=[min_w, max_w], y_lim=[y_min, y_max]))
        return res

    def reset_region():
        res = []
        for _ in line_plots_bench:
            res.append(gr.LinePlot(x_lim=None, y_lim=None))
        return res

    def load_datasource(datasource, fn):
        print(f"πŸ’‘ Debug - load_datasource called with: {datasource}")
        if datasource.startswith('file://'):
            local_path = datasource[len('file://'):]
            print(f"πŸ’‘ Debug - Extracted local path: {local_path}")
            check_file_exists(local_path, "Local")
            return fn(local_path)
        elif datasource.startswith('s3://'):
            return fn(datasource)
        else:
            # If no scheme is provided, assume it's a local path.
            print(f"πŸ’‘ Debug - Using path as-is: {datasource}")
            check_file_exists(datasource, "Direct")
            return fn(datasource)

    parquet_file_to_load = None

    if from_results_dir is not None:
        # If from_results_dir is specified, results are built into 'benchmarks.parquet'
        # within that directory.
        output_filename = 'benchmarks.parquet'
        print(f"πŸ’‘ Debug - Building results from directory: {from_results_dir}")

        # Check if results directory exists
        check_file_exists(from_results_dir, "Results directory")

        # Create absolute path for results directory
        abs_results_dir = os.path.abspath(from_results_dir)
        print(f"πŸ’‘ Debug - Absolute results directory: {abs_results_dir}")

        # Create the results directory if it doesn't exist
        if not os.path.exists(abs_results_dir):
            print(f"πŸ’‘ Debug - Creating results directory: {abs_results_dir}")
            os.makedirs(abs_results_dir, exist_ok=True)

        # Call build_results with absolute paths
        full_output_path = os.path.join(abs_results_dir, output_filename)
        print(f"πŸ’‘ Debug - Expected output path: {full_output_path}")

        build_results(abs_results_dir, output_filename, None)

        # Check if the file was created
        check_file_exists(full_output_path, "Generated parquet")

        # The file to load is now in from_results_dir/output_filename
        parquet_file_to_load = full_output_path
    else:
        # If not building from results_dir, use the provided datasource directly.
        parquet_file_to_load = datasource

    print(f"πŸ’‘ Debug - Final parquet_file_to_load: {parquet_file_to_load}")

    # Load data
    try:
        df_bench = load_datasource(parquet_file_to_load, load_bench_results)
        print(f"βœ… Successfully loaded data with {len(df_bench)} rows")
    except Exception as e:
        print(f"❌ Error loading data: {str(e)}")
        print(f"Stack trace: {traceback.format_exc()}")
        # Create a minimal DataFrame to prevent further errors
        df_bench = pd.DataFrame({
            "model": ["error"],
            "run_id": ["error"],
            "rate": [0],
            "inter_token_latency_ms_p90": [0],
            "time_to_first_token_ms_p90": [0],
            "e2e_latency_ms_p90": [0],
            "token_throughput_secs": [0],
            "successful_requests": [0],
            "error_rate": [0]
        })

    # Define metrics
    metrics = {
        "inter_token_latency_ms": PlotConfig(title="Inter Token Latency (lower is better)", x_title="QPS",
                                             y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]),
        "time_to_first_token_ms": PlotConfig(title="TTFT (lower is better)", x_title="QPS",
                                             y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]),
        "e2e_latency_ms": PlotConfig(title="End to End Latency (lower is better)", x_title="QPS",
                                     y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]),
        "token_throughput_secs": PlotConfig(title="Request Output Throughput (higher is better)", x_title="QPS",
                                            y_title="Tokens/s"),
        "successful_requests": PlotConfig(title="Successful requests (higher is better)", x_title="QPS",
                                          y_title="Count"),
        "error_rate": PlotConfig(title="Error rate", x_title="QPS", y_title="%"),
        "prompt_tokens": PlotConfig(title="Prompt tokens", x_title="QPS", y_title="Count"),
        "decoded_tokens": PlotConfig(title="Decoded tokens", x_title="QPS", y_title="Count")
    }

    models = df_bench["model"].unique()
    run_ids = df_bench["run_id"].unique()

    # get all available percentiles
    percentiles = set()
    for k, v in metrics.items():
        if v.percentiles:
            percentiles.update(v.percentiles)
    percentiles = map(lambda p: f'p{int(float(p) * 100)}', percentiles)
    percentiles = sorted(list(percentiles))
    percentiles.append('avg')
    with gr.Blocks(css=css, title="Inference Benchmarker") as demo:
        with gr.Row():
            gr.Markdown("# Inference-benchmarker πŸ€—\n## Benchmarks results")
        with gr.Row():
            gr.Markdown(summary_desc)
        with gr.Row():
            table = gr.DataFrame(
                pd.DataFrame(),
                elem_classes=["summary"],
            )
        with gr.Row():
            details_desc = gr.Markdown("## Details")
        with gr.Row():
            model = gr.Dropdown(list(models), label="Select model", value=models[0])
        with gr.Row():
            percentiles_bench = gr.Radio(percentiles, label="", value="avg")
        i = 0
        with ExitStack() as stack:
            for k, v in metrics.items():
                if i % 2 == 0:
                    stack.close()
                    gs = stack.enter_context(gr.Row())
                line_plots_bench.append(
                    {"component": gr.LinePlot(default_df, label=f'{v.title}', x="rate", y=k,
                                              y_title=v.y_title, x_title=v.x_title,
                                              color="run_id"
                                              ),
                     "model": model.value,
                     "metric": k,
                     "config": v
                     },
                )
                i += 1

        for component in [model, percentiles_bench]:
            component.change(update_bench, [model, percentiles_bench],
                             [item["component"] for item in line_plots_bench] + [table])
        gr.on([plot["component"].select for plot in line_plots_bench], select_region, [model],
              outputs=[item["component"] for item in line_plots_bench])
        gr.on([plot["component"].double_click for plot in line_plots_bench], reset_region, None,
              outputs=[item["component"] for item in line_plots_bench])
        demo.load(load_demo, [model, percentiles_bench],
                  [item["component"] for item in line_plots_bench] + [table])

    demo.launch(server_port=port, server_name="0.0.0.0")


@click.command()
@click.option('--from-results-dir', 'cli_from_results_dir', default=None, help='Load inference-benchmarker results from this directory. Overrides DASHBOARD_FROM_RESULTS_DIR.')
@click.option('--datasource', 'cli_datasource', default='file://benchmarks.parquet', help='Load this Parquet file directly if not building from a results directory.')
@click.option('--port', default=7860, help='Port to run the dashboard')
def main(cli_from_results_dir, cli_datasource, port):
    print("===== Starting Application =====")
    # print(f"Environment variables: {os.environ}") # Already in user's code or logs

    # Determine the directory from which to process JSON results
    # Priority: 1. CLI option, 2. Env Var, 3. Default to 'results' dir
    processing_dir = cli_from_results_dir

    if processing_dir is None:
        env_var_value = os.environ.get('DASHBOARD_FROM_RESULTS_DIR')
        if env_var_value:
            print(f"Using environment variable DASHBOARD_FROM_RESULTS_DIR='{env_var_value}' for processing.")
            processing_dir = env_var_value
        elif os.path.exists('results') and os.path.isdir('results'):
            print(f"No --from-results-dir option or DASHBOARD_FROM_RESULTS_DIR env var. Defaulting to 'results' directory for processing as it exists.")
            processing_dir = 'results'
        else:
            print(f"No directory specified for processing (no --from-results-dir, no DASHBOARD_FROM_RESULTS_DIR env var, and 'results' dir not found).")
            # processing_dir remains None

    path_to_load_by_run_function = None # This will be the path to the .parquet file

    if processing_dir:
        # A directory for processing JSONs has been determined.
        # Use the existing logic to build/fallback and generate benchmarks.parquet.
        output_filename = 'benchmarks.parquet'
        abs_processing_dir = os.path.abspath(processing_dir)

        print(f"πŸ’‘ Debug - Will process JSONs from directory: {abs_processing_dir}")
        check_file_exists(abs_processing_dir, "Source directory for JSONs")

        # Ensure the directory exists (it might be 'results' or user-provided)
        # build_results might expect the output directory to exist.
        if not os.path.exists(abs_processing_dir):
            print(f"πŸ’‘ Debug - Creating directory for processing/output: {abs_processing_dir}")
            os.makedirs(abs_processing_dir, exist_ok=True)

        # The generated parquet file will be placed inside the abs_processing_dir
        generated_parquet_filepath = os.path.join(abs_processing_dir, output_filename)
        print(f"πŸ’‘ Debug - Expected path for generated parquet file: {generated_parquet_filepath}")

        try:
            build_results(abs_processing_dir, output_filename, None) # output_filename is relative to abs_processing_dir
            print("βœ… Build results completed using build_results.")
        except Exception as e_build:
            print(f"❌ Error in build_results: {str(e_build)}")
            print(f"Stack trace: {traceback.format_exc()}")
            print("⚠️ Attempting fallback method: direct JSON processing")
            try:
                json_files = glob.glob(os.path.join(abs_processing_dir, "*.json"))
                print(f"Found {len(json_files)} JSON files for fallback: {json_files}")
                if not json_files:
                    raise FileNotFoundError("Fallback: No JSON files found in results directory")

                combined_data = []
                for json_file in json_files:
                    try:
                        with open(json_file, 'r') as f:
                            data = json.load(f)
                        filename = os.path.basename(json_file)
                        model_name_parts = filename.split('_')
                        model_name = f"{model_name_parts[0]}_{model_name_parts[1]}" if len(model_name_parts) > 1 else model_name_parts[0]

                        if 'benchmarks' in data:
                            for benchmark in data['benchmarks']:
                                benchmark['model'] = model_name
                                benchmark['run_id'] = os.path.splitext(filename)[0]
                                combined_data.append(benchmark)
                        else:
                            print(f"⚠️ Fallback: No 'benchmarks' key in {json_file}")
                    except Exception as json_err:
                        print(f"❌ Fallback: Error processing {json_file}: {str(json_err)}")

                if combined_data:
                    df_direct = pd.DataFrame(combined_data)
                    df_direct.to_parquet(generated_parquet_filepath)
                    print(f"βœ… Created parquet file via fallback method: {generated_parquet_filepath}")
                else:
                    raise ValueError("Fallback: No data could be extracted from JSON files")
            except Exception as e_fallback:
                print(f"❌ Fallback method failed: {str(e_fallback)}")
                print(f"Stack trace: {traceback.format_exc()}")

        # After attempting to build/generate, check if the file exists
        check_file_exists(generated_parquet_filepath, "Parquet file after build/fallback attempts")
        if os.path.exists(generated_parquet_filepath):
            path_to_load_by_run_function = generated_parquet_filepath
        else:
            print(f"❌ CRITICAL: Failed to generate or find parquet file at '{generated_parquet_filepath}' after all attempts.")
            # path_to_load_by_run_function remains None here, will be handled below.

    # If path_to_load_by_run_function is still None at this point
    # (either because processing_dir was not set, or all generation attempts failed),
    # default to the original cli_datasource.
    if path_to_load_by_run_function is None:
        print(f"⚠️ Defaulting to cli_datasource '{cli_datasource}' as parquet generation failed or was skipped.")
        path_to_load_by_run_function = cli_datasource

    print(f"πŸ’‘ Final path to be loaded by run() function: '{path_to_load_by_run_function}'")

    # Call run(). The first argument (from_results_dir for run()) is None because main handles processing.
    # The second argument (datasource for run()) is the actual file path to load.
    run(None, path_to_load_by_run_function, port)


if __name__ == '__main__':
    main(auto_envvar_prefix='DASHBOARD')