Prerequisites
bash
# Install required tools
pip install pqcrypto liboqs-python cryptography
sudo apt install liboqs-dev # Linux
brew install liboqs # macOS
# Clone reference implementations
git clone https://github.com/pq-crystals/kyber.git
git clone https://github.com/pq-crystals/dilithium.git
git clone https://github.com/open-quantum-safe/liboqs-python.gitLab 1: Your First ML-KEM Key Exchange
Objective
Perform a complete key encapsulation/decapsulation using ML-KEM-768.
Exercise 1.1: Basic Key Exchange
python
# lab1_mlkem_basic.py
"""
Lab 1.1: ML-KEM Key Exchange
Learn: Key generation, encapsulation, decapsulation
"""
import oqs
def mlkem_key_exchange():
"""
Simulate Alice and Bob establishing a shared secret using ML-KEM.
"""
# === ALICE: Generate keypair ===
print("=" * 60)
print("ALICE: Generating ML-KEM-768 keypair...")
kem = oqs.KeyEncapsulation("ML-KEM-768")
alice_public_key = kem.generate_keypair()
alice_secret_key = kem.export_secret_key()
print(f" Public key size: {len(alice_public_key)} bytes")
print(f" Secret key size: {len(alice_secret_key)} bytes")
print(f" Public key (first 32 bytes): {alice_public_key[:32].hex()}")
# === ALICE: Send public key to Bob ===
print("\nALICE → BOB: Sending public key over insecure channel...")
# === BOB: Encapsulate shared secret ===
print("\n" + "=" * 60)
print("BOB: Encapsulating shared secret...")
bob_kem = oqs.KeyEncapsulation("ML-KEM-768")
ciphertext, bob_shared_secret = bob_kem.encap_secret(alice_public_key)
print(f" Ciphertext size: {len(ciphertext)} bytes")
print(f" Shared secret size: {len(bob_shared_secret)} bytes")
print(f" Bob's shared secret: {bob_shared_secret.hex()}")
# === BOB: Send ciphertext to Alice ===
print("\nBOB → ALICE: Sending ciphertext over insecure channel...")
# === ALICE: Decapsulate shared secret ===
print("\n" + "=" * 60)
print("ALICE: Decapsulating shared secret...")
alice_shared_secret = kem.decap_secret(ciphertext)
print(f" Alice's shared secret: {alice_shared_secret.hex()}")
# === VERIFY ===
print("\n" + "=" * 60)
if alice_shared_secret == bob_shared_secret:
print("SUCCESS: Shared secrets match!")
print("Alice and Bob now have a shared 256-bit key for symmetric encryption.")
else:
print("FAILURE: Shared secrets do not match!")
return alice_shared_secret == bob_shared_secret
if __name__ == "__main__":
mlkem_key_exchange()Exercise 1.2: Compare with Classical ECDH
python
# lab1_compare_ecdh.py
"""
Lab 1.2: Compare ML-KEM with ECDH
Learn: Size differences, performance differences
"""
import oqs
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives import serialization
import time
def compare_key_exchange():
results = {}
# === ML-KEM-768 ===
print("Testing ML-KEM-768...")
kem = oqs.KeyEncapsulation("ML-KEM-768")
start = time.perf_counter()
for _ in range(1000):
pk = kem.generate_keypair()
keygen_time = (time.perf_counter() - start) / 1000
start = time.perf_counter()
for _ in range(1000):
ct, ss = kem.encap_secret(pk)
encap_time = (time.perf_counter() - start) / 1000
start = time.perf_counter()
for _ in range(1000):
ss = kem.decap_secret(ct)
decap_time = (time.perf_counter() - start) / 1000
results['ML-KEM-768'] = {
'public_key_size': len(pk),
'ciphertext_size': len(ct),
'secret_key_size': len(kem.export_secret_key()),
'keygen_ms': keygen_time * 1000,
'encap_ms': encap_time * 1000,
'decap_ms': decap_time * 1000,
}
# === X25519 (ECDH) ===
print("Testing X25519...")
start = time.perf_counter()
for _ in range(1000):
alice_private = x25519.X25519PrivateKey.generate()
alice_public = alice_private.public_key()
keygen_time = (time.perf_counter() - start) / 1000
bob_private = x25519.X25519PrivateKey.generate()
bob_public = bob_private.public_key()
start = time.perf_counter()
for _ in range(1000):
shared = alice_private.exchange(bob_public)
exchange_time = (time.perf_counter() - start) / 1000
pk_bytes = alice_public.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
results['X25519'] = {
'public_key_size': len(pk_bytes),
'ciphertext_size': 32, # Just the public key
'secret_key_size': 32,
'keygen_ms': keygen_time * 1000,
'encap_ms': exchange_time * 1000,
'decap_ms': exchange_time * 1000,
}
# === Print comparison ===
print("\n" + "=" * 70)
print("COMPARISON: ML-KEM-768 vs X25519 (ECDH)")
print("=" * 70)
print(f"{'Metric':<25} {'ML-KEM-768':>15} {'X25519':>15} {'Ratio':>10}")
print("-" * 70)
for metric in ['public_key_size', 'ciphertext_size', 'secret_key_size']:
mlkem = results['ML-KEM-768'][metric]
ecdh = results['X25519'][metric]
ratio = mlkem / ecdh
print(f"{metric:<25} {mlkem:>15} {ecdh:>15} {ratio:>10.1f}x")
print("-" * 70)
for metric in ['keygen_ms', 'encap_ms', 'decap_ms']:
mlkem = results['ML-KEM-768'][metric]
ecdh = results['X25519'][metric]
ratio = mlkem / ecdh
print(f"{metric:<25} {mlkem:>15.3f} {ecdh:>15.3f} {ratio:>10.1f}x")
print("\nKEY INSIGHT: ML-KEM has larger keys but is still fast!")
if __name__ == "__main__":
compare_key_exchange()Lab 2: ML-DSA Digital Signatures
Exercise 2.1: Sign and Verify
python
# lab2_mldsa_basic.py
"""
Lab 2.1: ML-DSA Digital Signatures
Learn: Key generation, signing, verification
"""
import oqs
import hashlib
def mldsa_sign_verify():
"""
Demonstrate ML-DSA signing and verification.
"""
# === Generate keypair ===
print("Generating ML-DSA-65 keypair...")
sig = oqs.Signature("ML-DSA-65")
public_key = sig.generate_keypair()
secret_key = sig.export_secret_key()
print(f" Public key size: {len(public_key)} bytes")
print(f" Secret key size: {len(secret_key)} bytes")
# === Sign a message ===
message = b"This is a critical firmware update. Verify before installing!"
print(f"\nSigning message: '{message.decode()}'")
signature = sig.sign(message)
print(f" Signature size: {len(signature)} bytes")
print(f" Signature (first 64 bytes): {signature[:64].hex()}")
# === Verify signature ===
print("\nVerifying signature...")
verifier = oqs.Signature("ML-DSA-65")
is_valid = verifier.verify(message, signature, public_key)
if is_valid:
print(" ✓ Signature is VALID")
else:
print(" ✗ Signature is INVALID")
# === Try to verify tampered message ===
print("\nTrying to verify tampered message...")
tampered_message = b"This is a malicious firmware update. Install now!"
is_valid_tampered = verifier.verify(tampered_message, signature, public_key)
if is_valid_tampered:
print(" ✗ ERROR: Tampered message accepted!")
else:
print(" ✓ Tampered message correctly REJECTED")
return True
if __name__ == "__main__":
mldsa_sign_verify()Exercise 2.2: Code Signing Simulation
python
# lab2_code_signing.py
"""
Lab 2.2: Code Signing with ML-DSA
Learn: Real-world application of PQC signatures
"""
import oqs
import hashlib
import json
import base64
from datetime import datetime
class PQCCodeSigner:
"""
Quantum-safe code signing utility.
"""
def __init__(self, algorithm="ML-DSA-65"):
self.algorithm = algorithm
self.sig = oqs.Signature(algorithm)
self.public_key = None
self.secret_key = None
def generate_keypair(self, key_id: str) -> dict:
"""Generate a new signing keypair."""
self.public_key = self.sig.generate_keypair()
self.secret_key = self.sig.export_secret_key()
return {
"key_id": key_id,
"algorithm": self.algorithm,
"created": datetime.utcnow().isoformat(),
"public_key": base64.b64encode(self.public_key).decode(),
}
def sign_artifact(self, artifact_path: str, metadata: dict = None) -> dict:
"""Sign a file and return signature manifest."""
# Read and hash the file
with open(artifact_path, 'rb') as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
# Create manifest
manifest = {
"file": artifact_path,
"sha256": file_hash,
"size": len(content),
"signed_at": datetime.utcnow().isoformat(),
"algorithm": self.algorithm,
"metadata": metadata or {},
}
# Sign the manifest (not just the file!)
manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
signature = self.sig.sign(manifest_bytes)
manifest["signature"] = base64.b64encode(signature).decode()
return manifest
def verify_artifact(self, artifact_path: str, manifest: dict,
public_key_b64: str) -> dict:
"""Verify a signed artifact."""
results = {
"file": artifact_path,
"checks": {},
"valid": False,
}
# 1. Verify file exists and hash matches
try:
with open(artifact_path, 'rb') as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
if file_hash == manifest["sha256"]:
results["checks"]["hash"] = "PASS"
else:
results["checks"]["hash"] = "FAIL: Hash mismatch"
return results
except FileNotFoundError:
results["checks"]["hash"] = "FAIL: File not found"
return results
# 2. Verify signature
public_key = base64.b64decode(public_key_b64)
signature = base64.b64decode(manifest["signature"])
# Reconstruct manifest without signature for verification
manifest_copy = {k: v for k, v in manifest.items() if k != "signature"}
manifest_bytes = json.dumps(manifest_copy, sort_keys=True).encode()
verifier = oqs.Signature(manifest["algorithm"])
try:
is_valid = verifier.verify(manifest_bytes, signature, public_key)
if is_valid:
results["checks"]["signature"] = "PASS"
results["valid"] = True
else:
results["checks"]["signature"] = "FAIL: Invalid signature"
except Exception as e:
results["checks"]["signature"] = f"FAIL: {str(e)}"
return results
def demo_code_signing():
"""Demonstrate the code signing workflow."""
print("=" * 60)
print("QUANTUM-SAFE CODE SIGNING DEMO")
print("=" * 60)
# Create signer
signer = PQCCodeSigner("ML-DSA-65")
# Generate keypair
print("\n[1] Generating signing keypair...")
key_info = signer.generate_keypair("ACME-SIGNING-KEY-001")
print(f" Key ID: {key_info['key_id']}")
print(f" Algorithm: {key_info['algorithm']}")
print(f" Public key size: {len(base64.b64decode(key_info['public_key']))} bytes")
# Create a test file to sign
test_file = "/tmp/firmware_v2.0.bin"
with open(test_file, 'wb') as f:
f.write(b"FIRMWARE_HEADER" + bytes(range(256)) * 100)
print(f"\n[2] Created test artifact: {test_file}")
# Sign the artifact
print("\n[3] Signing artifact...")
manifest = signer.sign_artifact(test_file, {
"version": "2.0.0",
"target": "ARM-Cortex-M4",
"release_notes": "Security update with PQC support"
})
print(f" File hash: {manifest['sha256']}")
print(f" Signature size: {len(base64.b64decode(manifest['signature']))} bytes")
# Save manifest
manifest_file = "/tmp/firmware_v2.0.manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
print(f" Manifest saved: {manifest_file}")
# Verify the artifact
print("\n[4] Verifying artifact...")
result = signer.verify_artifact(test_file, manifest, key_info['public_key'])
for check, status in result['checks'].items():
print(f" {check}: {status}")
print(f" Overall: {'✓ VALID' if result['valid'] else '✗ INVALID'}")
# Tamper with the file and try to verify
print("\n[5] Tampering with artifact and re-verifying...")
with open(test_file, 'ab') as f:
f.write(b"MALICIOUS_PAYLOAD")
result = signer.verify_artifact(test_file, manifest, key_info['public_key'])
print(f" Hash check: {result['checks']['hash']}")
print(f" Overall: {'✓ VALID' if result['valid'] else '✗ INVALID (as expected)'}")
if __name__ == "__main__":
demo_code_signing()Lab 3: Hybrid Encryption
Exercise 3.1: Hybrid TLS Key Exchange Simulation
python
# lab3_hybrid_kex.py
"""
Lab 3.1: Hybrid Key Exchange (Classical + PQC)
Learn: Defense in depth during transition period
"""
import oqs
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import os
def hybrid_key_exchange():
"""
Implement hybrid key exchange: X25519 + ML-KEM-768
Security: If EITHER algorithm is secure, the combined key is secure.
"""
print("=" * 60)
print("HYBRID KEY EXCHANGE: X25519 + ML-KEM-768")
print("=" * 60)
# ============================================================
# ALICE: Generate both classical and PQC keypairs
# ============================================================
print("\n[ALICE] Generating keypairs...")
# Classical: X25519
alice_x25519_private = x25519.X25519PrivateKey.generate()
alice_x25519_public = alice_x25519_private.public_key()
# PQC: ML-KEM-768
alice_mlkem = oqs.KeyEncapsulation("ML-KEM-768")
alice_mlkem_public = alice_mlkem.generate_keypair()
print(" X25519 public key: 32 bytes")
print(f" ML-KEM-768 public key: {len(alice_mlkem_public)} bytes")
print(f" Total public key size: {32 + len(alice_mlkem_public)} bytes")
# ============================================================
# ALICE → BOB: Send both public keys
# ============================================================
print("\n[ALICE → BOB] Sending hybrid public key...")
# ============================================================
# BOB: Generate shared secrets using both methods
# ============================================================
print("\n[BOB] Computing shared secrets...")
# Classical: X25519 key exchange
bob_x25519_private = x25519.X25519PrivateKey.generate()
bob_x25519_public = bob_x25519_private.public_key()
ss_classical = bob_x25519_private.exchange(alice_x25519_public)
print(f" X25519 shared secret: {ss_classical.hex()}")
# PQC: ML-KEM encapsulation
bob_mlkem = oqs.KeyEncapsulation("ML-KEM-768")
ciphertext, ss_pqc = bob_mlkem.encap_secret(alice_mlkem_public)
print(f" ML-KEM shared secret: {ss_pqc.hex()}")
# ============================================================
# BOB: Combine shared secrets
# ============================================================
print("\n[BOB] Combining shared secrets...")
# Method: Concatenate and derive using HKDF
combined_secret = ss_classical + ss_pqc
print(f" Combined secret size: {len(combined_secret)} bytes")
final_key_bob = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"hybrid-kex-v1",
).derive(combined_secret)
print(f" Final key (Bob): {final_key_bob.hex()}")
# ============================================================
# BOB → ALICE: Send X25519 public key and ML-KEM ciphertext
# ============================================================
print("\n[BOB → ALICE] Sending response...")
print(f" X25519 public key: 32 bytes")
print(f" ML-KEM ciphertext: {len(ciphertext)} bytes")
# ============================================================
# ALICE: Compute shared secrets
# ============================================================
print("\n[ALICE] Computing shared secrets...")
# Classical: X25519
ss_classical_alice = alice_x25519_private.exchange(bob_x25519_public)
# PQC: ML-KEM decapsulation
ss_pqc_alice = alice_mlkem.decap_secret(ciphertext)
# Combine
combined_secret_alice = ss_classical_alice + ss_pqc_alice
final_key_alice = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"hybrid-kex-v1",
).derive(combined_secret_alice)
print(f" Final key (Alice): {final_key_alice.hex()}")
# ============================================================
# VERIFY
# ============================================================
print("\n" + "=" * 60)
if final_key_alice == final_key_bob:
print("✓ SUCCESS: Hybrid key exchange complete!")
print(" Both parties now share a 256-bit key.")
print(" Security: Protected by BOTH X25519 AND ML-KEM-768")
else:
print("✗ FAILURE: Keys do not match!")
return final_key_alice == final_key_bob
if __name__ == "__main__":
hybrid_key_exchange()Lab 4: Cryptographic Inventory Scanner
Exercise 4.1: Build Your Own Scanner
python
# lab4_crypto_scanner.py
"""
Lab 4.1: Build a Cryptographic Inventory Scanner
Learn: Identify quantum-vulnerable crypto in real code
"""
import os
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict
import json
@dataclass
class CryptoFinding:
file: str
line: int
pattern: str
algorithm: str
severity: str
snippet: str
class CryptoScanner:
"""
Scan source code for quantum-vulnerable cryptography.
"""
# Patterns to detect (regex, algorithm name, severity)
PATTERNS = {
'python': [
(r'from\s+Crypto\.PublicKey\s+import\s+RSA', 'RSA', 'CRITICAL'),
(r'from\s+cryptography.*\s+import\s+rsa', 'RSA', 'CRITICAL'),
(r'RSA\.generate\s*\(', 'RSA', 'CRITICAL'),
(r'rsa\.generate_private_key\s*\(', 'RSA', 'CRITICAL'),
(r'from\s+ecdsa\s+import', 'ECDSA', 'CRITICAL'),
(r'ec\.generate_private_key\s*\(', 'ECDSA/ECDH', 'CRITICAL'),
(r'ECDH\s*\(', 'ECDH', 'CRITICAL'),
(r'Diffie-?Hellman', 'DH', 'CRITICAL'),
(r'from\s+Crypto\.Cipher\s+import\s+DES', 'DES', 'HIGH'),
(r'AES\.new\s*\([^)]*128', 'AES-128', 'MEDIUM'),
],
'java': [
(r'KeyPairGenerator\.getInstance\s*\(\s*"RSA"', 'RSA', 'CRITICAL'),
(r'KeyPairGenerator\.getInstance\s*\(\s*"EC"', 'ECDSA', 'CRITICAL'),
(r'KeyPairGenerator\.getInstance\s*\(\s*"DH"', 'DH', 'CRITICAL'),
(r'Signature\.getInstance\s*\([^)]*ECDSA', 'ECDSA', 'CRITICAL'),
(r'KeyAgreement\.getInstance\s*\(\s*"ECDH"', 'ECDH', 'CRITICAL'),
(r'Cipher\.getInstance\s*\([^)]*DES', 'DES', 'HIGH'),
],
'javascript': [
(r'crypto\.generateKeyPairSync\s*\(\s*[\'"]rsa[\'"]', 'RSA', 'CRITICAL'),
(r'crypto\.generateKeyPairSync\s*\(\s*[\'"]ec[\'"]', 'ECDSA', 'CRITICAL'),
(r'new\s+NodeRSA\s*\(', 'RSA', 'CRITICAL'),
(r'crypto\.createECDH\s*\(', 'ECDH', 'CRITICAL'),
(r'crypto\.createDiffieHellman\s*\(', 'DH', 'CRITICAL'),
],
'go': [
(r'rsa\.GenerateKey\s*\(', 'RSA', 'CRITICAL'),
(r'ecdsa\.GenerateKey\s*\(', 'ECDSA', 'CRITICAL'),
(r'ecdh\.P256\s*\(', 'ECDH', 'CRITICAL'),
(r'elliptic\.P256\s*\(', 'ECDSA/ECDH', 'CRITICAL'),
],
}
EXTENSIONS = {
'.py': 'python',
'.java': 'java',
'.js': 'javascript',
'.ts': 'javascript',
'.go': 'go',
}
def __init__(self):
self.findings: List[CryptoFinding] = []
def scan_file(self, filepath: Path) -> List[CryptoFinding]:
"""Scan a single file for crypto patterns."""
ext = filepath.suffix.lower()
if ext not in self.EXTENSIONS:
return []
language = self.EXTENSIONS[ext]
patterns = self.PATTERNS.get(language, [])
findings = []
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
for pattern, algorithm, severity in patterns:
if re.search(pattern, line, re.IGNORECASE):
findings.append(CryptoFinding(
file=str(filepath),
line=line_num,
pattern=pattern,
algorithm=algorithm,
severity=severity,
snippet=line.strip()[:100],
))
except Exception as e:
print(f"Error scanning {filepath}: {e}")
return findings
def scan_directory(self, directory: str, exclude_dirs: List[str] = None) -> List[CryptoFinding]:
"""Recursively scan a directory."""
exclude_dirs = exclude_dirs or ['node_modules', '.git', 'venv', '__pycache__']
self.findings = []
for root, dirs, files in os.walk(directory):
# Skip excluded directories
dirs[:] = [d for d in dirs if d not in exclude_dirs]
for file in files:
filepath = Path(root) / file
self.findings.extend(self.scan_file(filepath))
return self.findings
def generate_report(self) -> Dict:
"""Generate a summary report."""
report = {
'total_findings': len(self.findings),
'by_severity': {},
'by_algorithm': {},
'findings': [],
}
for finding in self.findings:
# Count by severity
report['by_severity'][finding.severity] = \
report['by_severity'].get(finding.severity, 0) + 1
# Count by algorithm
report['by_algorithm'][finding.algorithm] = \
report['by_algorithm'].get(finding.algorithm, 0) + 1
# Add finding details
report['findings'].append({
'file': finding.file,
'line': finding.line,
'algorithm': finding.algorithm,
'severity': finding.severity,
'snippet': finding.snippet,
})
return report
def demo_scanner():
"""Demonstrate the scanner on a test directory."""
# Create test files with vulnerable patterns
test_dir = "/tmp/crypto_scan_test"
os.makedirs(test_dir, exist_ok=True)
# Test Python file
with open(f"{test_dir}/auth.py", 'w') as f:
f.write("""
from Crypto.PublicKey import RSA
from Crypto.Cipher import AES
def generate_keypair():
key = RSA.generate(2048)
return key
def encrypt_data(data, key):
cipher = AES.new(key, AES.MODE_GCM)
return cipher.encrypt(data)
""")
# Test Java file
with open(f"{test_dir}/KeyManager.java", 'w') as f:
f.write("""
import java.security.*;
public class KeyManager {
public KeyPair generateKeyPair() throws Exception {
KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
kpg.initialize(2048);
return kpg.generateKeyPair();
}
public KeyPair generateECKeyPair() throws Exception {
KeyPairGenerator kpg = KeyPairGenerator.getInstance("EC");
return kpg.generateKeyPair();
}
}
""")
# Run scanner
print("=" * 60)
print("QUANTUM-VULNERABLE CRYPTO SCANNER")
print("=" * 60)
print(f"\nScanning: {test_dir}\n")
scanner = CryptoScanner()
findings = scanner.scan_directory(test_dir)
# Print results
report = scanner.generate_report()
print(f"Total findings: {report['total_findings']}")
print("\nBy Severity:")
for severity, count in sorted(report['by_severity'].items()):
print(f" {severity}: {count}")
print("\nBy Algorithm:")
for algo, count in sorted(report['by_algorithm'].items()):
print(f" {algo}: {count}")
print("\nDetailed Findings:")
print("-" * 60)
for finding in report['findings']:
print(f"[{finding['severity']}] {finding['algorithm']}")
print(f" File: {finding['file']}:{finding['line']}")
print(f" Code: {finding['snippet']}")
print()
# Save report
report_file = f"{test_dir}/crypto_scan_report.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"Report saved: {report_file}")
if __name__ == "__main__":
demo_scanner()Lab 5: Performance Benchmarking
Exercise 5.1: Comprehensive Benchmark Suite
python
# lab5_benchmark.py
"""
Lab 5.1: PQC Performance Benchmarking
Learn: Understand real-world performance characteristics
"""
import oqs
import time
import statistics
from typing import Dict, List
import json
def benchmark_algorithm(algorithm: str, operation: str, iterations: int = 1000) -> Dict:
"""Benchmark a specific algorithm and operation."""
times = []
if algorithm.startswith("ML-KEM") or algorithm.startswith("Kyber"):
# KEM benchmarks
kem = oqs.KeyEncapsulation(algorithm)
if operation == "keygen":
for _ in range(iterations):
start = time.perf_counter()
pk = kem.generate_keypair()
times.append(time.perf_counter() - start)
elif operation == "encap":
pk = kem.generate_keypair()
for _ in range(iterations):
start = time.perf_counter()
ct, ss = kem.encap_secret(pk)
times.append(time.perf_counter() - start)
elif operation == "decap":
pk = kem.generate_keypair()
ct, ss = kem.encap_secret(pk)
for _ in range(iterations):
start = time.perf_counter()
ss2 = kem.decap_secret(ct)
times.append(time.perf_counter() - start)
elif algorithm.startswith("ML-DSA") or algorithm.startswith("Dilithium"):
# Signature benchmarks
sig = oqs.Signature(algorithm)
message = b"Benchmark message" * 10
if operation == "keygen":
for _ in range(iterations):
start = time.perf_counter()
pk = sig.generate_keypair()
times.append(time.perf_counter() - start)
elif operation == "sign":
pk = sig.generate_keypair()
for _ in range(iterations):
start = time.perf_counter()
signature = sig.sign(message)
times.append(time.perf_counter() - start)
elif operation == "verify":
pk = sig.generate_keypair()
signature = sig.sign(message)
verifier = oqs.Signature(algorithm)
for _ in range(iterations):
start = time.perf_counter()
verifier.verify(message, signature, pk)
times.append(time.perf_counter() - start)
return {
"algorithm": algorithm,
"operation": operation,
"iterations": iterations,
"mean_us": statistics.mean(times) * 1_000_000,
"median_us": statistics.median(times) * 1_000_000,
"stdev_us": statistics.stdev(times) * 1_000_000 if len(times) > 1 else 0,
"min_us": min(times) * 1_000_000,
"max_us": max(times) * 1_000_000,
"ops_per_sec": iterations / sum(times),
}
def run_full_benchmark():
"""Run comprehensive benchmarks on all PQC algorithms."""
print("=" * 70)
print("PQC PERFORMANCE BENCHMARKS")
print("=" * 70)
# Algorithms to benchmark
kems = ["ML-KEM-512", "ML-KEM-768", "ML-KEM-1024"]
sigs = ["ML-DSA-44", "ML-DSA-65", "ML-DSA-87"]
results = []
# KEM benchmarks
print("\n[KEY ENCAPSULATION MECHANISMS]")
print("-" * 70)
print(f"{'Algorithm':<15} {'Operation':<10} {'Mean (μs)':<12} {'Ops/sec':<12}")
print("-" * 70)
for algo in kems:
for op in ["keygen", "encap", "decap"]:
result = benchmark_algorithm(algo, op, iterations=1000)
results.append(result)
print(f"{algo:<15} {op:<10} {result['mean_us']:>10.1f} {result['ops_per_sec']:>12.0f}")
# Signature benchmarks
print("\n[DIGITAL SIGNATURES]")
print("-" * 70)
print(f"{'Algorithm':<15} {'Operation':<10} {'Mean (μs)':<12} {'Ops/sec':<12}")
print("-" * 70)
for algo in sigs:
for op in ["keygen", "sign", "verify"]:
result = benchmark_algorithm(algo, op, iterations=1000)
results.append(result)
print(f"{algo:<15} {op:<10} {result['mean_us']:>10.1f} {result['ops_per_sec']:>12.0f}")
# Size comparison
print("\n[KEY AND SIGNATURE SIZES]")
print("-" * 70)
print(f"{'Algorithm':<15} {'Public Key':<12} {'Secret Key':<12} {'Cipher/Sig':<12}")
print("-" * 70)
for algo in kems:
kem = oqs.KeyEncapsulation(algo)
pk = kem.generate_keypair()
sk = kem.export_secret_key()
ct, _ = kem.encap_secret(pk)
print(f"{algo:<15} {len(pk):>10} B {len(sk):>10} B {len(ct):>10} B")
for algo in sigs:
sig = oqs.Signature(algo)
pk = sig.generate_keypair()
sk = sig.export_secret_key()
signature = sig.sign(b"test")
print(f"{algo:<15} {len(pk):>10} B {len(sk):>10} B {len(signature):>10} B")
# Save results
with open("/tmp/pqc_benchmark_results.json", 'w') as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to: /tmp/pqc_benchmark_results.json")
if __name__ == "__main__":
run_full_benchmark()Lab 6: Certificate Generation (Advanced)
Exercise 6.1: Create PQC X.509 Certificates
python
# lab6_certificates.py
"""
Lab 6.1: Generate PQC X.509 Certificates
Learn: PKI with post-quantum algorithms
NOTE: This requires pyopenssl and python-oqs with certificate support.
This is a conceptual example - real implementation requires OQS-OpenSSL.
"""
import subprocess
import os
import tempfile
def generate_pqc_certificate_openssl():
"""
Generate a self-signed certificate using ML-DSA via OQS-OpenSSL.
Prerequisites:
- OQS-OpenSSL provider installed
- OpenSSL 3.x with provider support
"""
print("=" * 60)
print("PQC CERTIFICATE GENERATION (using OQS-OpenSSL)")
print("=" * 60)
# Check if OQS provider is available
print("\nChecking for OQS provider...")
result = subprocess.run(
["openssl", "list", "-providers"],
capture_output=True, text=True
)
if "oqsprovider" not in result.stdout.lower():
print("WARNING: OQS provider not detected.")
print("Install from: https://github.com/open-quantum-safe/oqs-provider")
print("\nShowing example commands only:\n")
# Show example commands
commands = """
# Generate ML-DSA private key
openssl genpkey -provider oqsprovider -algorithm mldsa65 -out mldsa_key.pem
# Generate self-signed certificate
openssl req -provider oqsprovider -x509 -new \\
-key mldsa_key.pem \\
-out mldsa_cert.pem \\
-days 365 \\
-subj "/CN=PQC Test Certificate/O=Quantum Security Era"
# View certificate details
openssl x509 -provider oqsprovider -in mldsa_cert.pem -text -noout
# Generate Certificate Signing Request (CSR)
openssl req -provider oqsprovider -new \\
-key mldsa_key.pem \\
-out mldsa_csr.pem \\
-subj "/CN=Server/O=Company"
"""
print(commands)
return
# If provider is available, actually generate certificate
with tempfile.TemporaryDirectory() as tmpdir:
key_file = os.path.join(tmpdir, "mldsa_key.pem")
cert_file = os.path.join(tmpdir, "mldsa_cert.pem")
# Generate key
print("\n[1] Generating ML-DSA-65 private key...")
subprocess.run([
"openssl", "genpkey",
"-provider", "oqsprovider",
"-algorithm", "mldsa65",
"-out", key_file
], check=True)
# Get key info
result = subprocess.run([
"openssl", "pkey",
"-provider", "oqsprovider",
"-in", key_file,
"-text", "-noout"
], capture_output=True, text=True)
print(f"Key type: ML-DSA-65")
# Generate self-signed cert
print("\n[2] Generating self-signed certificate...")
subprocess.run([
"openssl", "req",
"-provider", "oqsprovider",
"-x509", "-new",
"-key", key_file,
"-out", cert_file,
"-days", "365",
"-subj", "/CN=PQC Test/O=Quantum Security Era/C=AU"
], check=True)
# Display certificate
print("\n[3] Certificate details:")
result = subprocess.run([
"openssl", "x509",
"-provider", "oqsprovider",
"-in", cert_file,
"-text", "-noout"
], capture_output=True, text=True)
print(result.stdout)
def explain_hybrid_certificates():
"""Explain hybrid certificate approaches."""
print("\n" + "=" * 60)
print("HYBRID CERTIFICATES FOR TRANSITION")
print("=" * 60)
explanation = """
During the PQC transition, you may need HYBRID certificates that contain
BOTH classical and PQC signatures.
APPROACH 1: Composite Signatures
--------------------------------
- Single certificate with composite signature
- Signature = Classical_Sig || PQC_Sig
- Verifier checks BOTH signatures
- Draft: draft-ietf-lamps-pq-composite-sigs
APPROACH 2: Dual Certificate Chains
------------------------------------
- Maintain separate classical and PQC hierarchies
- Server sends appropriate chain based on client capability
- More complex but more flexible
APPROACH 3: Alternative Signatures Extension
--------------------------------------------
- Primary signature in certificate is classical
- PQC signature in X.509 extension
- Backward compatible with old validators
Current Status (2025):
- Standards still being finalized
- Browser support in development
- Consider hybrid for new deployments
- Full PQC for new high-security applications
"""
print(explanation)
if __name__ == "__main__":
generate_pqc_certificate_openssl()
explain_hybrid_certificates()Challenge Labs
Challenge 1: Implement Constant-Time Comparison
Write a constant-time byte array comparison function and verify it with timing analysis.
Challenge 2: Build a PQC VPN Prototype
Use WireGuard concepts with ML-KEM for key exchange.
Challenge 3: Create a Crypto Migration Report Generator
Build a tool that scans a codebase and generates a detailed migration plan with effort estimates.
Challenge 4: Implement ML-KEM from Scratch
Using only the NIST FIPS 203 specification, implement key generation, encapsulation, and decapsulation in Python (for learning, not production).
Challenge 5: Side-Channel Analysis
Use ChipWhisperer or similar to analyze power consumption of a PQC implementation.
Lab Environment Setup
bash
#!/bin/bash
# setup_lab_environment.sh
# Create virtual environment
python3 -m venv pqc_labs
source pqc_labs/bin/activate
# Install dependencies
pip install --upgrade pip
pip install liboqs-python cryptography pycryptodome hypothesis pytest
# Clone useful repositories
git clone https://github.com/open-quantum-safe/liboqs-python.git
git clone https://github.com/pq-crystals/kyber.git
git clone https://github.com/pq-crystals/dilithium.git
# Verify installation
python -c "import oqs; print(f'liboqs version: {oqs.oqs_version()}')"
python -c "print('Available KEMs:', oqs.get_enabled_kem_mechanisms()[:5])"
python -c "print('Available Sigs:', oqs.get_enabled_sig_mechanisms()[:5])"
echo "Lab environment ready!"