summaryrefslogtreecommitdiff
path: root/examples/server
diff options
context:
space:
mode:
authorPierrick Hymbert <pierrick.hymbert@gmail.com>2024-03-27 20:26:49 +0100
committerGitHub <noreply@github.com>2024-03-27 20:26:49 +0100
commita016026a3ac16d8c9b993a3573f19b9556d67de4 (patch)
treeeb72bc0e48589c195d4523dd61511eb8f69c0dcc /examples/server
parent53c7ec53d5eca26b2c0c648605543a5fa6c12817 (diff)
server: continuous performance monitoring and PR comment (#6283)
* server: bench: init * server: bench: reduce list of GPU nodes * server: bench: fix graph, fix output artifact * ci: bench: add mermaid in case of image cannot be uploaded * ci: bench: more resilient, more metrics * ci: bench: trigger build * ci: bench: fix duration * ci: bench: fix typo * ci: bench: fix mermaid values, markdown generated * typo on the step name Co-authored-by: Xuan Son Nguyen <thichthat@gmail.com> * ci: bench: trailing spaces * ci: bench: move images in a details section * ci: bench: reduce bullet point size --------- Co-authored-by: Xuan Son Nguyen <thichthat@gmail.com>
Diffstat (limited to 'examples/server')
-rw-r--r--examples/server/bench/bench.py303
-rw-r--r--examples/server/bench/prometheus.yml9
-rw-r--r--examples/server/bench/requirements.txt2
-rw-r--r--examples/server/tests/features/steps/steps.py19
4 files changed, 324 insertions, 9 deletions
diff --git a/examples/server/bench/bench.py b/examples/server/bench/bench.py
new file mode 100644
index 00000000..ea5d3854
--- /dev/null
+++ b/examples/server/bench/bench.py
@@ -0,0 +1,303 @@
+import argparse
+import json
+import os
+import re
+import signal
+import socket
+import subprocess
+import sys
+import threading
+import time
+import traceback
+from contextlib import closing
+from datetime import datetime
+
+import matplotlib
+import matplotlib.dates
+import matplotlib.pyplot as plt
+import requests
+
+
+def main(args_in: list[str] | None = None) -> None:
+ parser = argparse.ArgumentParser(description="Start server benchmark scenario")
+ parser.add_argument("--name", type=str, help="Bench name", required=True)
+ parser.add_argument("--runner-label", type=str, help="Runner label", required=True)
+ parser.add_argument("--branch", type=str, help="Branch name", default="detached")
+ parser.add_argument("--commit", type=str, help="Commit name", default="dirty")
+ parser.add_argument("--host", type=str, help="Server listen host", default="0.0.0.0")
+ parser.add_argument("--port", type=int, help="Server listen host", default="8080")
+ parser.add_argument("--model-path-prefix", type=str, help="Prefix where to store the model files", default="models")
+ parser.add_argument("--n-prompts", type=int,
+ help="SERVER_BENCH_N_PROMPTS: total prompts to randomly select in the benchmark", required=True)
+ parser.add_argument("--max-prompt-tokens", type=int,
+ help="SERVER_BENCH_MAX_PROMPT_TOKENS: maximum prompt tokens to filter out in the dataset",
+ required=True)
+ parser.add_argument("--max-tokens", type=int,
+ help="SERVER_BENCH_MAX_CONTEXT: maximum context size of the completions request to filter out in the dataset: prompt + predicted tokens",
+ required=True)
+ parser.add_argument("--hf-repo", type=str, help="Hugging Face model repository", required=True)
+ parser.add_argument("--hf-file", type=str, help="Hugging Face model file", required=True)
+ parser.add_argument("-ngl", "--n-gpu-layers", type=int, help="layers to the GPU for computation", required=True)
+ parser.add_argument("--ctx-size", type=int, help="Set the size of the prompt context", required=True)
+ parser.add_argument("--parallel", type=int, help="Set the number of slots for process requests", required=True)
+ parser.add_argument("--batch-size", type=int, help="Set the batch size for prompt processing", required=True)
+ parser.add_argument("--ubatch-size", type=int, help="physical maximum batch size", required=True)
+ parser.add_argument("--scenario", type=str, help="Scenario to run", required=True)
+ parser.add_argument("--duration", type=str, help="Bench scenario", required=True)
+
+ args = parser.parse_args(args_in)
+
+ start_time = time.time()
+
+ # Start the server and performance scenario
+ try:
+ server_process = start_server(args)
+ except Exception:
+ print("bench: server start error :")
+ traceback.print_exc(file=sys.stdout)
+ sys.exit(1)
+
+ # start the benchmark
+ try:
+ start_benchmark(args)
+
+ iterations = 0
+ with open("results.github.env", 'w') as github_env:
+ # parse output
+ with open('k6-results.json', 'r') as bench_results:
+ # Load JSON data from file
+ data = json.load(bench_results)
+ for metric_name in data['metrics']:
+ for metric_metric in data['metrics'][metric_name]:
+ value = data['metrics'][metric_name][metric_metric]
+ if isinstance(value, float) or isinstance(value, int):
+ value = round(value, 2)
+ data['metrics'][metric_name][metric_metric]=value
+ github_env.write(
+ f"{escape_metric_name(metric_name)}_{escape_metric_name(metric_metric)}={value}\n")
+ token_seconds = data['metrics']['llamacpp_tokens_second']['avg']
+ iterations = data['root_group']['checks']['success completion']['passes']
+
+ except Exception:
+ print("bench: error :")
+ traceback.print_exc(file=sys.stdout)
+
+ # Stop the server
+ if server_process:
+ try:
+ print(f"bench: shutting down server pid={server_process.pid} ...")
+ if os.name == 'nt':
+ interrupt = signal.CTRL_C_EVENT
+ else:
+ interrupt = signal.SIGINT
+ server_process.send_signal(interrupt)
+ server_process.wait(0.5)
+
+ except subprocess.TimeoutExpired:
+ print(f"server still alive after 500ms, force-killing pid={server_process.pid} ...")
+ server_process.kill() # SIGKILL
+ server_process.wait()
+
+ while is_server_listening(args.host, args.port):
+ time.sleep(0.1)
+
+ title = (f"llama.cpp {args.name} on {args.runner_label}\n "
+ f"duration={args.duration} {iterations} iterations")
+ xlabel = (f"{args.hf_repo}/{args.hf_file}\n"
+ f"parallel={args.parallel} ctx-size={args.ctx_size} ngl={args.n_gpu_layers} batch-size={args.batch_size} ubatch-size={args.ubatch_size} pp={args.max_prompt_tokens} pp+tg={args.max_tokens}\n"
+ f"branch={args.branch} commit={args.commit}")
+
+ # Prometheus
+ end_time = time.time()
+ if is_server_listening("0.0.0.0", 9090):
+ metrics = ['prompt_tokens_seconds', 'predicted_tokens_seconds',
+ 'kv_cache_usage_ratio', 'requests_processing', 'requests_deferred']
+
+ for metric in metrics:
+ resp = requests.get(f"http://localhost:9090/api/v1/query_range",
+ params={'query': 'llamacpp:' + metric, 'start': start_time, 'end': end_time, 'step': 2})
+
+ with open(f"{metric}.json", 'w') as metric_json:
+ metric_json.write(resp.text)
+
+ if resp.status_code != 200:
+ print(f"bench: unable to extract prometheus metric {metric}: {resp.text}")
+ else:
+ metric_data = resp.json()
+ values = metric_data['data']['result'][0]['values']
+ timestamps, metric_values = zip(*values)
+ metric_values = [float(value) for value in metric_values]
+ timestamps_dt = [datetime.fromtimestamp(int(ts)) for ts in timestamps]
+ plt.figure(figsize=(16, 10), dpi=80)
+ plt.plot(timestamps_dt, metric_values, label=metric)
+ plt.xticks(rotation=0, fontsize=14, horizontalalignment='center', alpha=.7)
+ plt.yticks(fontsize=12, alpha=.7)
+
+ ylabel = f"llamacpp:{metric}"
+ plt.title(title,
+ fontsize=14, wrap=True)
+ plt.grid(axis='both', alpha=.3)
+ plt.ylabel(ylabel, fontsize=22)
+ plt.xlabel(xlabel, fontsize=14, wrap=True)
+ plt.gca().xaxis.set_major_locator(matplotlib.dates.MinuteLocator())
+ plt.gca().xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%Y-%m-%d %H:%M:%S"))
+ plt.gcf().autofmt_xdate()
+
+ # Remove borders
+ plt.gca().spines["top"].set_alpha(0.0)
+ plt.gca().spines["bottom"].set_alpha(0.3)
+ plt.gca().spines["right"].set_alpha(0.0)
+ plt.gca().spines["left"].set_alpha(0.3)
+
+ # Save the plot as a jpg image
+ plt.savefig(f'{metric}.jpg', dpi=60)
+ plt.close()
+
+ # Mermaid format in case images upload failed
+ with (open(f"{metric}.mermaid", 'w') as mermaid_f):
+ mermaid = (
+ f"""---
+config:
+ xyChart:
+ titleFontSize: 12
+ width: 900
+ height: 600
+ themeVariables:
+ xyChart:
+ titleColor: "#000000"
+---
+xychart-beta
+ title "{title}"
+ y-axis "llamacpp:{metric}"
+ x-axis "llamacpp:{metric}" {int(min(timestamps))} --> {int(max(timestamps))}
+ line [{', '.join([str(round(float(value), 2)) for value in metric_values])}]
+ """)
+ mermaid_f.write(mermaid)
+
+ # 140 chars max for commit status description
+ bench_results = {
+ "req": {
+ "p90": data['metrics']["http_req_duration"]["p(90)"],
+ "avg": data['metrics']["http_req_duration"]["avg"],
+ },
+ "pp": {
+ "p90": data['metrics']["llamacpp_prompt_tokens"]["p(90)"],
+ "avg": data['metrics']["llamacpp_prompt_tokens"]["avg"],
+ },
+ "tg": {
+ "p90": data['metrics']["llamacpp_tokens_second"]["p(90)"],
+ "avg": data['metrics']["llamacpp_tokens_second"]["avg"],
+ },
+ }
+ with open("results.github.env", 'a') as github_env:
+ github_env.write(f"BENCH_RESULTS={json.dumps(bench_results, indent=None, separators=(',', ':') )}\n")
+ github_env.write(f"BENCH_ITERATIONS={iterations}\n")
+
+ title = title.replace('\n', ' ')
+ xlabel = xlabel.replace('\n', ' ')
+ github_env.write(f"BENCH_GRAPH_TITLE={title}\n")
+ github_env.write(f"BENCH_GRAPH_XLABEL={xlabel}\n")
+
+
+def start_benchmark(args):
+ k6_path = 'k6'
+ if 'BENCH_K6_BIN_PATH' in os.environ:
+ k6_path = os.environ['BENCH_K6_BIN_PATH']
+ k6_args = [
+ 'run', args.scenario,
+ '--no-color',
+ ]
+ k6_args.extend(['--duration', args.duration])
+ k6_args.extend(['--iterations', args.n_prompts])
+ k6_args.extend(['--vus', args.parallel])
+ k6_args.extend(['--summary-export', 'k6-results.json'])
+ args = f"SERVER_BENCH_N_PROMPTS={args.n_prompts} SERVER_BENCH_MAX_PROMPT_TOKENS={args.max_prompt_tokens} SERVER_BENCH_MAX_CONTEXT={args.max_tokens} "
+ args = args + ' '.join([str(arg) for arg in [k6_path, *k6_args]])
+ print(f"bench: starting k6 with: {args}")
+ k6_completed = subprocess.run(args, shell=True, stdout=sys.stdout, stderr=sys.stderr)
+ if k6_completed.returncode != 0:
+ raise Exception("bench: unable to run k6")
+
+
+def start_server(args):
+ server_process = start_server_background(args)
+
+ attempts = 0
+ max_attempts = 20
+ if 'GITHUB_ACTIONS' in os.environ:
+ max_attempts *= 2
+
+ while not is_server_listening(args.host, args.port):
+ attempts += 1
+ if attempts > max_attempts:
+ assert False, "server not started"
+ print(f"bench: waiting for server to start ...")
+ time.sleep(0.5)
+
+ print("bench: server started.")
+ return server_process
+
+
+def start_server_background(args):
+ # Start the server
+ server_path = '../../../build/bin/server'
+ if 'LLAMA_SERVER_BIN_PATH' in os.environ:
+ server_path = os.environ['LLAMA_SERVER_BIN_PATH']
+ server_args = [
+ '--host', args.host,
+ '--port', args.port,
+ ]
+ model_file = args.model_path_prefix + os.path.sep + args.hf_file
+ model_dir = os.path.dirname(model_file)
+ if not os.path.exists(model_dir):
+ os.makedirs(model_dir)
+ server_args.extend(['--model', model_file])
+ server_args.extend(['--hf-repo', args.hf_repo])
+ server_args.extend(['--hf-file', args.hf_file])
+ server_args.extend(['--n-gpu-layers', args.n_gpu_layers])
+ server_args.extend(['--ctx-size', args.ctx_size])
+ server_args.extend(['--parallel', args.parallel])
+ server_args.extend(['--batch-size', args.batch_size])
+ server_args.extend(['--ubatch-size', args.ubatch_size])
+ server_args.extend(['--n-predict', args.max_tokens * 2])
+ server_args.extend(['--defrag-thold', "0.1"])
+ server_args.append('--cont-batching')
+ server_args.append('--metrics')
+ server_args.extend(['--log-format', "text"])
+ args = [str(arg) for arg in [server_path, *server_args]]
+ print(f"bench: starting server with: {' '.join(args)}")
+ pkwargs = {
+ 'stdout': subprocess.PIPE,
+ 'stderr': subprocess.PIPE
+ }
+ server_process = subprocess.Popen(
+ args,
+ **pkwargs)
+
+ def server_log(in_stream, out_stream):
+ for line in iter(in_stream.readline, b''):
+ print(line.decode('utf-8'), end='', file=out_stream)
+
+ thread_stdout = threading.Thread(target=server_log, args=(server_process.stdout, sys.stdout))
+ thread_stdout.start()
+ thread_stderr = threading.Thread(target=server_log, args=(server_process.stderr, sys.stderr))
+ thread_stderr.start()
+
+ return server_process
+
+
+def is_server_listening(server_fqdn, server_port):
+ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
+ result = sock.connect_ex((server_fqdn, server_port))
+ _is_server_listening = result == 0
+ if _is_server_listening:
+ print(f"server is listening on {server_fqdn}:{server_port}...")
+ return _is_server_listening
+
+
+def escape_metric_name(metric_name):
+ return re.sub('[^A-Z0-9]', '_', metric_name.upper())
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/server/bench/prometheus.yml b/examples/server/bench/prometheus.yml
new file mode 100644
index 00000000..b15ee524
--- /dev/null
+++ b/examples/server/bench/prometheus.yml
@@ -0,0 +1,9 @@
+global:
+ scrape_interval: 10s
+ external_labels:
+ llamacpp: 'server'
+
+scrape_configs:
+ - job_name: 'llama.cpp server'
+ static_configs:
+ - targets: ['localhost:8080']
diff --git a/examples/server/bench/requirements.txt b/examples/server/bench/requirements.txt
new file mode 100644
index 00000000..66ed226e
--- /dev/null
+++ b/examples/server/bench/requirements.txt
@@ -0,0 +1,2 @@
+matplotlib
+requests
diff --git a/examples/server/tests/features/steps/steps.py b/examples/server/tests/features/steps/steps.py
index 86c3339d..9a6cf7d6 100644
--- a/examples/server/tests/features/steps/steps.py
+++ b/examples/server/tests/features/steps/steps.py
@@ -1114,7 +1114,10 @@ def start_server_background(context):
server_args.append('--verbose')
if 'SERVER_LOG_FORMAT_JSON' not in os.environ:
server_args.extend(['--log-format', "text"])
- print(f"starting server with: {context.server_path} {server_args}")
+
+ args = [str(arg) for arg in [context.server_path, *server_args]]
+ print(f"bench: starting server with: {' '.join(args)}")
+
flags = 0
if 'nt' == os.name:
flags |= subprocess.DETACHED_PROCESS
@@ -1130,16 +1133,14 @@ def start_server_background(context):
[str(arg) for arg in [context.server_path, *server_args]],
**pkwargs)
- def log_stdout(process):
- for line in iter(process.stdout.readline, b''):
- print(line.decode('utf-8'), end='')
- thread_stdout = threading.Thread(target=log_stdout, args=(context.server_process,))
+ def server_log(in_stream, out_stream):
+ for line in iter(in_stream.readline, b''):
+ print(line.decode('utf-8'), end='', file=out_stream)
+
+ thread_stdout = threading.Thread(target=server_log, args=(context.server_process.stdout, sys.stdout))
thread_stdout.start()
- def log_stderr(process):
- for line in iter(process.stderr.readline, b''):
- print(line.decode('utf-8'), end='', file=sys.stderr)
- thread_stderr = threading.Thread(target=log_stderr, args=(context.server_process,))
+ thread_stderr = threading.Thread(target=server_log, args=(context.server_process.stderr, sys.stderr))
thread_stderr.start()
print(f"server pid={context.server_process.pid}, behave pid={os.getpid()}")