QGAN_Project / vGe0.1 /Foptimize_qkernel_lite_base.py
1bnjmn3's picture
Add files using upload-large-folder tool
0f755ec verified
import numpy as np
import os
import time
import gc
# --- 1. ENVIRONMENT CONFIGURATION (M1 TURBO) ---
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["RAYON_NUM_THREADS"] = "4"
os.environ["QISKIT_PARALLEL"] = "TRUE"
from sklearn.svm import SVC
from sklearn.metrics import roc_auc_score
from qiskit import transpile
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.kernels import TrainableFidelityQuantumKernel
from qiskit_machine_learning.utils.loss_functions import SVCLoss
from qiskit_algorithms.optimizers import COBYLA
# --- AER V2 IMPORTS ---
from qiskit_aer.primitives import SamplerV2 as AerSampler
from qiskit_aer import AerSimulator
from qiskit_machine_learning.state_fidelities import ComputeUncompute, BaseStateFidelity
# --- BASE LITE CONFIGURATION (MODIFY THESE FOR VARIATIONS) ---
N_QUBITS = 8
TRAIN_SIZE = 200 # Reduced from 600
TEST_SIZE = 300
MAX_ITERS = 5 # Reduced from 50 (Smoke Test)
BATCH_SIZE = 5000 # Increased from 2000 (Test RAM usage)
OPT_LEVEL = 1 # Reduced from 3 (Test compilation speed)
OUTPUT_DIR = "lite_diagnostics"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# --- DUMMY JOB WRAPPER ---
class DummyResult:
def __init__(self, fidelities):
self.fidelities = fidelities
class DummyJob:
def __init__(self, fidelities):
self._fidelities = fidelities
def submit(self): return
def result(self): return DummyResult(self._fidelities)
class BatchedFidelity(BaseStateFidelity):
def __init__(self, fidelity_inst, batch_size):
self._fidelity = fidelity_inst
self._batch_size = batch_size
super().__init__()
def create_fidelity_circuit(self, circuit_1, circuit_2):
return self._fidelity.create_fidelity_circuit(circuit_1, circuit_2)
def _run(self, circuits_1, circuits_2, values_1=None, values_2=None, **options):
n_samples = len(circuits_1)
results = []
# TIMING DIAGNOSTIC
batch_start = time.time()
for i in range(0, n_samples, self._batch_size):
end = min(i + self._batch_size, n_samples)
c1_batch = circuits_1[i:end]
c2_batch = circuits_2[i:end]
v1_batch = values_1[i:end] if values_1 is not None else None
v2_batch = values_2[i:end] if values_2 is not None else None
# Run Job
job = self._fidelity.run(c1_batch, c2_batch, v1_batch, v2_batch, **options)
batch_values = job.result().fidelities
results.append(batch_values)
# Print progress for every batch to debug speed
if (i // self._batch_size) % 5 == 0:
print(f" Batch {i}/{n_samples} processed in {time.time() - batch_start:.2f}s")
batch_start = time.time() # Reset timer
del job, batch_values
return DummyJob(np.concatenate(results))
def main():
print(f"πŸš€ Starting Base Lite Diagnostic...")
print(f" βš™οΈ Config: Train={TRAIN_SIZE} | Batch={BATCH_SIZE} | Iters={MAX_ITERS} | Opt={OPT_LEVEL}")
# 1. LOAD DATA
possible_paths = ['vG.0.1/qgan_data_optimized.npz', 'qgan_data_optimized.npz']
data_path = next((p for p in possible_paths if os.path.exists(p)), None)
if not data_path: return
data = np.load(data_path)
X_train_full = data['X_train']
y_train_full = data['y_train']
X_test_full = data['X_test']
y_test_full = data['y_test']
# 2. SUBSAMPLE
pos_idx = np.where(y_train_full == 1)[0]
neg_idx = np.where(y_train_full == 0)[0]
train_idx = np.concatenate([
np.random.choice(pos_idx, TRAIN_SIZE // 2, replace=False),
np.random.choice(neg_idx, TRAIN_SIZE // 2, replace=False)
])
np.random.shuffle(train_idx)
X_train = X_train_full[train_idx]
y_train = y_train_full[train_idx]
X_test = X_test_full[:TEST_SIZE]
y_test = y_test_full[:TEST_SIZE]
# --- 3. MEMORY FLUSH ---
del data, X_train_full, y_train_full, X_test_full, y_test_full
gc.collect()
# 4. CONFIGURE AER BACKEND
backend = AerSimulator(
method="statevector",
precision="single",
max_parallel_threads=4,
max_memory_mb=12000
)
aer_sampler = AerSampler.from_backend(backend)
aer_sampler.options.default_shots = None
# 5. DEFINE KERNEL
print(" βš›οΈ Initializing Kernel...")
fm = ZZFeatureMap(N_QUBITS, reps=2, entanglement='linear')
ansatz = RealAmplitudes(N_QUBITS, reps=1)
combined_circuit = fm.compose(ansatz)
print(f" πŸ”§ Transpiling (Level {OPT_LEVEL})...")
transpiled_circuit = transpile(combined_circuit, backend=backend, optimization_level=OPT_LEVEL)
base_fidelity = ComputeUncompute(sampler=aer_sampler)
batched_fidelity = BatchedFidelity(base_fidelity, batch_size=BATCH_SIZE)
quant_kernel = TrainableFidelityQuantumKernel(
feature_map=transpiled_circuit,
training_parameters=ansatz.parameters,
fidelity=batched_fidelity
)
# 6. OPTIMIZE
print(" 🧠 Optimizing (COBYLA)...")
loss_func = SVCLoss(C=1.0, gamma='auto')
optimizer = COBYLA(maxiter=MAX_ITERS)
from qiskit_machine_learning.kernels.algorithms import QuantumKernelTrainer
trainer = QuantumKernelTrainer(
quantum_kernel=quant_kernel,
loss=loss_func,
optimizer=optimizer,
initial_point=[0.1] * len(ansatz.parameters)
)
start_time = time.time()
results = trainer.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f" βœ… Complete in {elapsed:.1f}s ({(elapsed/60):.1f} mins)")
print(f" 🎯 Final Loss: {results.optimal_value:.4f}")
# 7. RUN FINAL SVC
print(" πŸ† Training Final SVM...")
qsvc = SVC(kernel=results.quantum_kernel.evaluate, probability=True)
qsvc.fit(X_train, y_train)
test_probs = qsvc.predict_proba(X_test)[:, 1]
test_auc = roc_auc_score(y_test, test_probs)
print("\n" + "="*40)
print(f"πŸš€ BASE LITE RESULTS (Batch={BATCH_SIZE})")
print("="*40)
print(f"βœ… Test AUC: {test_auc:.4f}")
print("="*40)
if __name__ == "__main__":
main()