#!/usr/bin/env python3 import grpc, time, sys from collections import defaultdict ENDPOINT = "shred.ghostgeyser.pro:10800" DURATION = 30 RPC_PATH = "/shredstream.ShredstreamProxy/SubscribeEntries" def decode_varint(buf, pos): val, shift = 0, 0 while pos < len(buf): b = buf[pos]; val |= (b & 0x7F) << shift; shift += 7; pos += 1 if not (b & 0x80): return val, pos return val, pos def extract_slot(raw): if len(raw) < 2: return 0 field_tag, pos = decode_varint(raw, 0) if (field_tag & 0x07) != 0: return 0 slot, _ = decode_varint(raw, pos) return slot def main(): channel = grpc.insecure_channel(ENDPOINT, options=[ ("grpc.max_receive_message_length", 67108864), ("grpc.keepalive_time_ms", 10000), ("grpc.keepalive_timeout_ms", 5000), ("grpc.http2.min_ping_interval_without_data_ms", 5000), ]) stream = channel.unary_stream( RPC_PATH, request_serializer=lambda x: x, response_deserializer=lambda x: x, )(b"") slots = defaultdict(lambda: {"entries": 0, "bytes": 0, "first_seen": 0.0}) t0 = time.monotonic() total_entries = 0 total_bytes = 0 first_ts = None sys.stdout.write(f"\n ┌{'─'*56}┐\n") sys.stdout.write(f" │ SHREDSTREAM BENCHMARK{' '*32}│\n") sys.stdout.write(f" │ {ENDPOINT:<54}│\n") sys.stdout.write(f" └{'─'*56}┘\n\n") sys.stdout.flush() try: for raw in stream: now = time.monotonic() elapsed = now - t0 if first_ts is None: first_ts = elapsed sys.stdout.write(f" subscribe_latency: {first_ts*1000:.1f}ms\n\n") sys.stdout.flush() if elapsed > DURATION: break slot = extract_slot(raw) s = slots[slot] if s["first_seen"] == 0.0: s["first_seen"] = elapsed s["entries"] += 1 s["bytes"] += len(raw) total_entries += 1 total_bytes += len(raw) if total_entries % 250 == 0: pct = min(elapsed / DURATION, 1.0) bar = "█" * int(pct * 30) + "░" * (30 - int(pct * 30)) sys.stdout.write(f"\r {bar} {pct*100:5.1f}% {len(slots):>3} slots {total_entries:>5} entries {total_entries/elapsed:>5.0f}/s") sys.stdout.flush() except grpc.RpcError as e: sys.stderr.write(f"\n stream_error: {e.code()} {e.details()}\n") return 1 except KeyboardInterrupt: pass elapsed = time.monotonic() - t0 n_slots = len(slots) expected = elapsed / 0.4 coverage = n_slots / expected * 100 if expected > 0 else 0 slot_list = sorted(slots.items(), key=lambda x: x[1]["first_seen"]) intervals = [] for i in range(1, len(slot_list)): dt = slot_list[i][1]["first_seen"] - slot_list[i-1][1]["first_seen"] intervals.append(dt * 1000) avg_interval = sum(intervals) / len(intervals) if intervals else 0 p50 = sorted(intervals)[len(intervals)//2] if intervals else 0 p99 = sorted(intervals)[int(len(intervals)*0.99)] if intervals else 0 sys.stdout.write(f"\r{' '*80}\r") sys.stdout.write(f"\n ┌{'─'*56}┐\n") sys.stdout.write(f" │ RESULTS ({elapsed:.1f}s){' '*(43-len(f'{elapsed:.1f}'))}│\n") sys.stdout.write(f" ├{'─'*56}┤\n") sys.stdout.write(f" │ slots {n_slots:<8} ({n_slots/elapsed:.2f}/s){' '*(23-len(f'{n_slots/elapsed:.2f}'))}│\n") sys.stdout.write(f" │ entries {total_entries:<8} ({total_entries/elapsed:.0f}/s){' '*(23-len(f'{total_entries/elapsed:.0f}'))}│\n") sys.stdout.write(f" │ throughput {total_bytes/1024/1024:.1f} MB ({total_bytes/elapsed/1024:.0f} KB/s){' '*(19-len(f'{total_bytes/1024/1024:.1f} MB ({total_bytes/elapsed/1024:.0f}'))}│\n") sys.stdout.write(f" │ coverage {coverage:.0f}%{' '*(37-len(f'{coverage:.0f}'))}│\n") sys.stdout.write(f" ├{'─'*56}┤\n") sys.stdout.write(f" │ slot_interval avg={avg_interval:.1f}ms p50={p50:.1f}ms p99={p99:.1f}ms{' '*(8-len(f'{p99:.1f}'))}│\n") sys.stdout.write(f" │ subscribe_lat {first_ts*1000:.1f}ms{' '*(38-len(f'{first_ts*1000:.1f}'))}│\n") sys.stdout.write(f" │ avg_entry/slot {total_entries/n_slots:.1f}{' '*(38-len(f'{total_entries/n_slots:.1f}'))}│\n") sys.stdout.write(f" └{'─'*56}┘\n\n") sys.stdout.flush() return 0 if __name__ == "__main__": sys.exit(main())