In [1]:
Copied!
# 环境导入
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from sklearn.datasets import make_blobs, make_circles
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, silhouette_score
import warnings
warnings.filterwarnings('ignore')
# 导入 DeepQuantum
import sys
sys.path.insert(0, 'E:/02_Projects/turingQ/deepquantum/src')
import deepquantum as dq
import deepquantum.photonic as dqp
# 设置
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
np.random.seed(42)
torch.manual_seed(42)
print('环境配置完成!')
print(f'PyTorch: {torch.__version__}')
print(f'NumPy: {np.__version__}')
# 环境导入
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from sklearn.datasets import make_blobs, make_circles
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, silhouette_score
import warnings
warnings.filterwarnings('ignore')
# 导入 DeepQuantum
import sys
sys.path.insert(0, 'E:/02_Projects/turingQ/deepquantum/src')
import deepquantum as dq
import deepquantum.photonic as dqp
# 设置
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
np.random.seed(42)
torch.manual_seed(42)
print('环境配置完成!')
print(f'PyTorch: {torch.__version__}')
print(f'NumPy: {np.__version__}')
环境配置完成! PyTorch: 2.9.1+cpu NumPy: 2.4.0
第1章:GBS 理论基础回顾¶
1.1 什么是高斯玻色采样?¶
高斯玻色采样(Gaussian Boson Sampling, GBS)是玻色采样的一种变体,使用高斯压缩态作为输入。
核心思想:
- 制备多个高斯压缩态
- 通过线性光学网络(分束器、相移器)
- 量子干涉
- 光子数测量
1.2 Hafnian 函数¶
对于 $2m \times 2m$ 对称矩阵 $A$,Hafnian 定义为:
$$ \text{Haf}(A) = \sum_{\sigma \in \text{PMP}(2m)} \prod_{k=1}^{m} a_{\sigma(2k-1),\sigma(2k)} $$
物理意义:
- 计算图的完美匹配数
- GBS 输出概率与 Hafnian 相关
- 计算复杂度:#P 完全问题
1.3 GBS vs 玻色采样¶
| 特性 | 玻色采样 | GBS |
|---|---|---|
| 输入态 | 单光子 Fock 态 | 高斯压缩态 |
| 数学函数 | Permanent | Hafnian |
| 实验难度 | 较高 | 较低 |
| 应用 | 量子优越性 | 图问题、机器学习 |
第2章:高斯态与挤压态¶
2.1 什么是高斯态?¶
高斯态是 Wigner 函数为高斯分布的量子态,包括:
- 真空态(Vacuum state)
- 相干态(Coherent state)
- 压缩态(Squeezed state)
2.2 挤压态¶
挤压算符: $$ \hat{S}(r) = \exp\left(\frac{r}{2}(\hat{a}^2 - \hat{a}^{\dagger 2})\right) $$
其中 $r$ 是挤压参数。
物理意义:
- 减小一个分量的量子噪声
- 增大另一个分量的量子噪声
- 满足海森堡不确定性原理
2.3 在 DeepQuantum 中创建高斯态¶
使用 backend='gaussian' 创建高斯态电路。
In [ ]:
Copied!
# 创建高斯态电路
nmode = 4
# 方法1:使用 Gaussian 后端
cir_gaussian = dq.QumodeCircuit(
nmode=nmode,
init_state='vac', # 真空态
backend='gaussian' # 使用 Gaussian 后端
)
# 添加挤压门
squeezing_params = [1.0, 0.8, 1.2, 0.9]
for i, r in enumerate(squeezing_params):
cir_gaussian.s(i, r) # 挤压门
# 添加分束器网络
cir_gaussian.bs([0, 1], [0.5, 0.3])
cir_gaussian.bs([2, 3], [0.4, 0.2])
cir_gaussian.bs([1, 2], [0.6, 0.1])
# 执行电路
state_gaussian = cir_gaussian()
print('高斯态电路执行完成!')
print(f'状态类型: {type(state_gaussian)}')
# 修复:Gaussian 后端返回的是列表 [cov, mean]
if isinstance(state_gaussian, list):
cov, mean = state_gaussian
print(f'✓ 正确:返回列表类型')
print(f'协方差矩阵形状: {cov.shape}')
print(f'位移向量形状: {mean.shape}')
else:
print(f'✗ 警告:返回类型不符合预期')
# 创建高斯态电路
nmode = 4
# 方法1:使用 Gaussian 后端
cir_gaussian = dq.QumodeCircuit(
nmode=nmode,
init_state='vac', # 真空态
backend='gaussian' # 使用 Gaussian 后端
)
# 添加挤压门
squeezing_params = [1.0, 0.8, 1.2, 0.9]
for i, r in enumerate(squeezing_params):
cir_gaussian.s(i, r) # 挤压门
# 添加分束器网络
cir_gaussian.bs([0, 1], [0.5, 0.3])
cir_gaussian.bs([2, 3], [0.4, 0.2])
cir_gaussian.bs([1, 2], [0.6, 0.1])
# 执行电路
state_gaussian = cir_gaussian()
print('高斯态电路执行完成!')
print(f'状态类型: {type(state_gaussian)}')
# 修复:Gaussian 后端返回的是列表 [cov, mean]
if isinstance(state_gaussian, list):
cov, mean = state_gaussian
print(f'✓ 正确:返回列表类型')
print(f'协方差矩阵形状: {cov.shape}')
print(f'位移向量形状: {mean.shape}')
else:
print(f'✗ 警告:返回类型不符合预期')
In [ ]:
Copied!
# 创建 GBS 电路(使用 Fock 后端)
nmode = 6
cutoff = 5 # Fock 空间截断
# 改进的 cutoff 估计方法
def estimate_cutoff(squeezing_params, confidence=3):
"""
根据 Chebyshev 不等式估计 cutoff
参数:
squeezing_params: 挤压参数列表
confidence: 置信因子(默认 3σ,覆盖 99.7% 的概率质量)
返回:
cutoff: 推荐的 Fock 空间截断维度
"""
# 计算平均光子数:⟨n⟩ = sinh²(r)
mean_photon = sum([np.sinh(p)**2 for p in squeezing_params])
# 计算光子数方差:Var(n) = 2sinh²(r)cosh²(r)
var_photon = sum([2 * np.sinh(p)**2 * np.cosh(p)**2 for p in squeezing_params])
std_photon = np.sqrt(var_photon)
# 使用 Chebyshev 不等式:P(|n - μ| ≥ kσ) ≤ 1/k²
# 3σ 原则覆盖 99.7% 的概率质量
cutoff = int(mean_photon + confidence * std_photon + 5)
print(f'平均光子数: {mean_photon:.2f}')
print(f'标准差: {std_photon:.2f}')
print(f'估计的 cutoff (3σ): {cutoff}')
return cutoff
squeezing_params = [1.0] * nmode
cutoff = estimate_cutoff(squeezing_params)
# 创建电路
cir_gbs = dq.QumodeCircuit(
nmode=nmode,
init_state='vac',
cutoff=cutoff,
backend='fock',
basis=True
)
# 添加挤压门
for i in range(nmode):
cir_gbs.s(i, squeezing_params[i])
# 添加分束器网络
cir_gbs.bs([0, 1], [np.pi/4, 0])
cir_gbs.bs([2, 3], [np.pi/4, 0])
cir_gbs.bs([4, 5], [np.pi/4, 0])
cir_gbs.bs([1, 2], [np.pi/6, np.pi/8])
cir_gbs.bs([3, 4], [np.pi/6, np.pi/8])
print('\\nGBS 电路创建完成!')
print(f'模式数: {nmode}')
print(f'Cutoff: {cutoff}')
print(f'\\n注意:使用 Fock 后端是对 GBS 的近似模拟。真正的 GBS 应使用 backend="gaussian"。')
# 创建 GBS 电路(使用 Fock 后端)
nmode = 6
cutoff = 5 # Fock 空间截断
# 改进的 cutoff 估计方法
def estimate_cutoff(squeezing_params, confidence=3):
"""
根据 Chebyshev 不等式估计 cutoff
参数:
squeezing_params: 挤压参数列表
confidence: 置信因子(默认 3σ,覆盖 99.7% 的概率质量)
返回:
cutoff: 推荐的 Fock 空间截断维度
"""
# 计算平均光子数:⟨n⟩ = sinh²(r)
mean_photon = sum([np.sinh(p)**2 for p in squeezing_params])
# 计算光子数方差:Var(n) = 2sinh²(r)cosh²(r)
var_photon = sum([2 * np.sinh(p)**2 * np.cosh(p)**2 for p in squeezing_params])
std_photon = np.sqrt(var_photon)
# 使用 Chebyshev 不等式:P(|n - μ| ≥ kσ) ≤ 1/k²
# 3σ 原则覆盖 99.7% 的概率质量
cutoff = int(mean_photon + confidence * std_photon + 5)
print(f'平均光子数: {mean_photon:.2f}')
print(f'标准差: {std_photon:.2f}')
print(f'估计的 cutoff (3σ): {cutoff}')
return cutoff
squeezing_params = [1.0] * nmode
cutoff = estimate_cutoff(squeezing_params)
# 创建电路
cir_gbs = dq.QumodeCircuit(
nmode=nmode,
init_state='vac',
cutoff=cutoff,
backend='fock',
basis=True
)
# 添加挤压门
for i in range(nmode):
cir_gbs.s(i, squeezing_params[i])
# 添加分束器网络
cir_gbs.bs([0, 1], [np.pi/4, 0])
cir_gbs.bs([2, 3], [np.pi/4, 0])
cir_gbs.bs([4, 5], [np.pi/4, 0])
cir_gbs.bs([1, 2], [np.pi/6, np.pi/8])
cir_gbs.bs([3, 4], [np.pi/6, np.pi/8])
print('\\nGBS 电路创建完成!')
print(f'模式数: {nmode}')
print(f'Cutoff: {cutoff}')
print(f'\\n注意:使用 Fock 后端是对 GBS 的近似模拟。真正的 GBS 应使用 backend="gaussian"。')
In [ ]:
Copied!
# 进行采样
print('开始 GBS 采样...')
samples = cir_gbs.measure(shots=1000)
print(f'采样完成!获得 {len(samples)} 个不同的结果')
print('\\n最常见的 10 个输出:')
# 修复:将 FockState 对象转换为字符串以便打印
samples_str = {str(k): v for k, v in samples.items()}
for i, (state, count) in enumerate(list(samples_str.items())[:10]):
print(f'{i+1}. {state}: {count} 次')
# 进行采样
print('开始 GBS 采样...')
samples = cir_gbs.measure(shots=1000)
print(f'采样完成!获得 {len(samples)} 个不同的结果')
print('\\n最常见的 10 个输出:')
# 修复:将 FockState 对象转换为字符串以便打印
samples_str = {str(k): v for k, v in samples.items()}
for i, (state, count) in enumerate(list(samples_str.items())[:10]):
print(f'{i+1}. {state}: {count} 次')
In [ ]:
Copied!
# 可视化采样结果
# 统计总光子数分布
photon_numbers = {}
for state, count in samples.items():
n_photons = sum(int(s) for s in state)
photon_numbers[n_photons] = photon_numbers.get(n_photons, 0) + count
# 绘制分布
plt.figure(figsize=(10, 6))
plt.bar(photon_numbers.keys(), photon_numbers.values(), alpha=0.7, color='steelblue')
plt.xlabel('总光子数', fontsize=12)
plt.ylabel('出现次数', fontsize=12)
plt.title('GBS 采样结果:总光子数分布', fontsize=14, fontweight='bold')
plt.grid(axis='y', alpha=0.3)
plt.show()
print(f'平均光子数: {sum(k*v for k,v in photon_numbers.items()) / sum(photon_numbers.values()):.2f}')
# 可视化采样结果
# 统计总光子数分布
photon_numbers = {}
for state, count in samples.items():
n_photons = sum(int(s) for s in state)
photon_numbers[n_photons] = photon_numbers.get(n_photons, 0) + count
# 绘制分布
plt.figure(figsize=(10, 6))
plt.bar(photon_numbers.keys(), photon_numbers.values(), alpha=0.7, color='steelblue')
plt.xlabel('总光子数', fontsize=12)
plt.ylabel('出现次数', fontsize=12)
plt.title('GBS 采样结果:总光子数分布', fontsize=14, fontweight='bold')
plt.grid(axis='y', alpha=0.3)
plt.show()
print(f'平均光子数: {sum(k*v for k,v in photon_numbers.items()) / sum(photon_numbers.values()):.2f}')
In [ ]:
Copied!
class PhotonicQLayer(nn.Module):
"""
光量子神经网络层
使用多种编码方式将数据编码到光量子态,通过光量子门进行变换
编码方式:
- 'angle': 角度编码(相移编码)
- 'displacement': 位移编码(需要 Gaussian 后端)
- 'photon': 光子数编码(简单但粗糙)
"""
def __init__(self, nmode, nlayer=2, cutoff=3, encoding='angle'):
super().__init__()
self.nmode = nmode
self.nlayer = nlayer
self.cutoff = cutoff
self.encoding = encoding
# 可训练参数
# 每层有 nmode 个相移参数和 nmode-1 个分束器参数
self.ps_params = nn.ParameterList([
nn.Parameter(torch.randn(nmode) * 0.1)
for _ in range(nlayer)
])
self.bs_params = nn.ParameterList([
nn.Parameter(torch.randn(nmode-1, 2) * 0.1)
for _ in range(nlayer)
])
def encode_data(self, x, cir):
"""
将数据编码到光量子电路
参数:
x: 输入数据 [nmode]
cir: 光量子电路
"""
if self.encoding == 'angle':
# 角度编码:将数据映射为相移角度
# 归一化到 [0, π] 范围
x_norm = torch.sigmoid(x) * np.pi # sigmoid 归一化到 [0, π]
for j in range(self.nmode):
cir.ps(j, x_norm[j].item())
elif self.encoding == 'photon':
# 光子数编码(简单但粗糙,仅用于演示)
# 将数据值作为初始光子数(需要归一化到整数)
data_norm = torch.clamp(torch.abs(x), 0, self.cutoff - 1)
# 注意:这种方式会丢失很多信息,不推荐用于实际应用
# 这里仅用于演示,实际应使用角度编码
pass # 初始态已在创建电路时设置
elif self.encoding == 'displacement':
# 位移编码(需要 Gaussian 后端)
# 将数据映射为位移幅度
x_norm = torch.tanh(x) * 0.5 # 归一化到 [-0.5, 0.5]
for j in range(self.nmode):
cir.d(j, x_norm[j].item())
def forward(self, x):
"""
前向传播
参数:
x: 输入数据 [batch_size, nmode]
返回:
输出 [batch_size, nmode]
"""
batch_size = x.shape[0]
outputs = []
for i in range(batch_size):
# 创建电路
if self.encoding == 'photon':
# 光子数编码:使用数据作为初始态
data = torch.clamp(torch.abs(x[i]), 0, self.cutoff - 1).int()
cir = dq.QumodeCircuit(
nmode=self.nmode,
init_state=data.tolist(),
cutoff=self.cutoff,
basis=True
)
else:
# 角度编码或位移编码:从真空态开始
cir = dq.QumodeCircuit(
nmode=self.nmode,
init_state='vac',
cutoff=self.cutoff,
basis=True
)
# 数据编码
self.encode_data(x[i], cir)
# 添加变分层
for l in range(self.nlayer):
# 相移层
for j in range(self.nmode):
theta_ps = self.ps_params[l][j].item()
cir.ps(j, theta_ps)
# 分束器层
for j in range(self.nmode - 1):
theta_bs, phi_bs = self.bs_params[l][j]
cir.bs([j, j+1], [theta_bs.item(), phi_bs.item()])
# 获取输出
result = cir(is_prob=True)
# 提取特征:使用期望光子数
features = torch.zeros(self.nmode)
for state, prob in result.items():
state_list = [int(s) for s in str(state).strip('|>').split(',')]
for j, n in enumerate(state_list):
features[j] += n * prob.item()
outputs.append(features)
return torch.stack(outputs)
# 测试光量子层
print('测试角度编码(推荐):')
pqlayer_angle = PhotonicQLayer(nmode=4, nlayer=2, cutoff=3, encoding='angle')
x_test = torch.randn(2, 4)
output_angle = pqlayer_angle(x_test)
print(f'输入形状: {x_test.shape}')
print(f'输出形状: {output_angle.shape}')
print(f'输出样例:\\n{output_angle}')
print('\\n测试光子数编码(粗糙,仅演示):')
pqlayer_photon = PhotonicQLayer(nmode=4, nlayer=2, cutoff=3, encoding='photon')
x_test_small = torch.abs(torch.randn(2, 4)) * 0.5 # 小输入以保持在 cutoff 范围
output_photon = pqlayer_photon(x_test_small)
print(f'输出样例(光子数编码):\\n{output_photon}')
class PhotonicQLayer(nn.Module):
"""
光量子神经网络层
使用多种编码方式将数据编码到光量子态,通过光量子门进行变换
编码方式:
- 'angle': 角度编码(相移编码)
- 'displacement': 位移编码(需要 Gaussian 后端)
- 'photon': 光子数编码(简单但粗糙)
"""
def __init__(self, nmode, nlayer=2, cutoff=3, encoding='angle'):
super().__init__()
self.nmode = nmode
self.nlayer = nlayer
self.cutoff = cutoff
self.encoding = encoding
# 可训练参数
# 每层有 nmode 个相移参数和 nmode-1 个分束器参数
self.ps_params = nn.ParameterList([
nn.Parameter(torch.randn(nmode) * 0.1)
for _ in range(nlayer)
])
self.bs_params = nn.ParameterList([
nn.Parameter(torch.randn(nmode-1, 2) * 0.1)
for _ in range(nlayer)
])
def encode_data(self, x, cir):
"""
将数据编码到光量子电路
参数:
x: 输入数据 [nmode]
cir: 光量子电路
"""
if self.encoding == 'angle':
# 角度编码:将数据映射为相移角度
# 归一化到 [0, π] 范围
x_norm = torch.sigmoid(x) * np.pi # sigmoid 归一化到 [0, π]
for j in range(self.nmode):
cir.ps(j, x_norm[j].item())
elif self.encoding == 'photon':
# 光子数编码(简单但粗糙,仅用于演示)
# 将数据值作为初始光子数(需要归一化到整数)
data_norm = torch.clamp(torch.abs(x), 0, self.cutoff - 1)
# 注意:这种方式会丢失很多信息,不推荐用于实际应用
# 这里仅用于演示,实际应使用角度编码
pass # 初始态已在创建电路时设置
elif self.encoding == 'displacement':
# 位移编码(需要 Gaussian 后端)
# 将数据映射为位移幅度
x_norm = torch.tanh(x) * 0.5 # 归一化到 [-0.5, 0.5]
for j in range(self.nmode):
cir.d(j, x_norm[j].item())
def forward(self, x):
"""
前向传播
参数:
x: 输入数据 [batch_size, nmode]
返回:
输出 [batch_size, nmode]
"""
batch_size = x.shape[0]
outputs = []
for i in range(batch_size):
# 创建电路
if self.encoding == 'photon':
# 光子数编码:使用数据作为初始态
data = torch.clamp(torch.abs(x[i]), 0, self.cutoff - 1).int()
cir = dq.QumodeCircuit(
nmode=self.nmode,
init_state=data.tolist(),
cutoff=self.cutoff,
basis=True
)
else:
# 角度编码或位移编码:从真空态开始
cir = dq.QumodeCircuit(
nmode=self.nmode,
init_state='vac',
cutoff=self.cutoff,
basis=True
)
# 数据编码
self.encode_data(x[i], cir)
# 添加变分层
for l in range(self.nlayer):
# 相移层
for j in range(self.nmode):
theta_ps = self.ps_params[l][j].item()
cir.ps(j, theta_ps)
# 分束器层
for j in range(self.nmode - 1):
theta_bs, phi_bs = self.bs_params[l][j]
cir.bs([j, j+1], [theta_bs.item(), phi_bs.item()])
# 获取输出
result = cir(is_prob=True)
# 提取特征:使用期望光子数
features = torch.zeros(self.nmode)
for state, prob in result.items():
state_list = [int(s) for s in str(state).strip('|>').split(',')]
for j, n in enumerate(state_list):
features[j] += n * prob.item()
outputs.append(features)
return torch.stack(outputs)
# 测试光量子层
print('测试角度编码(推荐):')
pqlayer_angle = PhotonicQLayer(nmode=4, nlayer=2, cutoff=3, encoding='angle')
x_test = torch.randn(2, 4)
output_angle = pqlayer_angle(x_test)
print(f'输入形状: {x_test.shape}')
print(f'输出形状: {output_angle.shape}')
print(f'输出样例:\\n{output_angle}')
print('\\n测试光子数编码(粗糙,仅演示):')
pqlayer_photon = PhotonicQLayer(nmode=4, nlayer=2, cutoff=3, encoding='photon')
x_test_small = torch.abs(torch.randn(2, 4)) * 0.5 # 小输入以保持在 cutoff 范围
output_photon = pqlayer_photon(x_test_small)
print(f'输出样例(光子数编码):\\n{output_photon}')
In [ ]:
Copied!
class PhotonicQuantumKernel:
"""
光量子核方法
使用光量子电路计算核矩阵
核方法原理:
K(x_i, x_j) = |⟨φ(x_i)|φ(x_j)⟩|²
其中 |φ(x)⟩ 是通过特征映射电路得到的光量子态
"""
def __init__(self, nmode=4, nlayer=2, cutoff=3):
self.nmode = nmode
self.nlayer = nlayer
self.cutoff = cutoff
def encode(self, x):
"""
将数据编码为光量子态(使用角度编码)
参数:
x: 单个数据点 [nmode]
返回:
cir: 光量子电路
"""
# 从真空态开始
cir = dq.QumodeCircuit(
nmode=self.nmode,
init_state='vac',
cutoff=self.cutoff,
basis=True
)
# 角度编码:将数据映射为相移
x_norm = torch.sigmoid(torch.tensor(x)) * np.pi
for j in range(self.nmode):
cir.ps(j, x_norm[j].item())
return cir
def feature_map(self, cir):
"""
应用特征映射电路
参数:
cir: 光量子电路
返回:
cir: 应用特征映射后的电路
"""
for _ in range(self.nlayer):
# 相移层(固定角度)
for j in range(self.nmode):
cir.ps(j, np.pi/4)
# 分束器层(固定参数)
for j in range(self.nmode - 1):
cir.bs([j, j+1], [np.pi/4, 0])
return cir
def get_quantum_state(self, cir):
"""
获取量子态(概率分布)
参数:
cir: 光量子电路
返回:
state_dict: {FockState: probability}
"""
return cir(is_prob=True)
def compute_overlap(self, state_dict1, state_dict2):
"""
计算两个量子态的内积(重叠)
参数:
state_dict1, state_dict2: 量子态字典 {FockState: probability}
返回:
overlap: |⟨ψ₁|ψ₂⟩|²
"""
overlap = 0.0
# 遍历两个态的共同基
for state, prob1 in state_dict1.items():
if state in state_dict2:
prob2 = state_dict2[state]
# 对于概率分布,重叠是相同基的概率乘积之和
overlap += prob1.item() * prob2.item()
return overlap
def compute_kernel_matrix(self, X1, X2=None):
"""
计算核矩阵
参数:
X1: 数据集1 [n_samples1, n_features]
X2: 数据集2 [n_samples2, n_features] (可选)
返回:
kernel_matrix: 核矩阵 [n_samples1, n_samples2]
"""
if X2 is None:
X2 = X1
n1, n2 = X1.shape[0], X2.shape[0]
kernel_matrix = np.zeros((n1, n2))
print(f'计算 {n1}x{n2} 核矩阵...')
# 预计算所有数据点的量子态
print('步骤 1: 编码数据和特征映射...')
states1 = []
for i in range(n1):
cir = self.encode(X1[i])
cir = self.feature_map(cir)
state = self.get_quantum_state(cir)
states1.append(state)
states2 = []
for j in range(n2):
cir = self.encode(X2[j])
cir = self.feature_map(cir)
state = self.get_quantum_state(cir)
states2.append(state)
print('步骤 2: 计算核矩阵...')
for i in range(n1):
for j in range(n2):
# 计算量子态的内积
kernel_matrix[i, j] = self.compute_overlap(states1[i], states2[j])
if (i * n2 + j + 1) % 10 == 0:
print(f'进度: {i*n2+j+1}/{n1*n2}')
return kernel_matrix
# 测试量子核
print('创建量子核...')
kernel = PhotonicQuantumKernel(nmode=4, nlayer=1, cutoff=3)
# 生成小测试数据集
X_test = torch.randn(6, 4) * 0.5 # 小数据集用于快速测试
print('\\n开始计算核矩阵...')
K = kernel.compute_kernel_matrix(X_test[:3], X_test[3:])
print(f'\\n核矩阵形状: {K.shape}')
print('核矩阵:')
print(K)
# 验证核矩阵的性质
print('\\n核矩阵性质检查:')
print(f'对角元素平均值(应接近 1): {np.trace(K) / K.shape[0]:.4f}')
print(f'是否对称: {np.allclose(K, K.T)}')
print(f'最小值: {K.min():.4f}')
print(f'最大值: {K.max():.4f}')
class PhotonicQuantumKernel:
"""
光量子核方法
使用光量子电路计算核矩阵
核方法原理:
K(x_i, x_j) = |⟨φ(x_i)|φ(x_j)⟩|²
其中 |φ(x)⟩ 是通过特征映射电路得到的光量子态
"""
def __init__(self, nmode=4, nlayer=2, cutoff=3):
self.nmode = nmode
self.nlayer = nlayer
self.cutoff = cutoff
def encode(self, x):
"""
将数据编码为光量子态(使用角度编码)
参数:
x: 单个数据点 [nmode]
返回:
cir: 光量子电路
"""
# 从真空态开始
cir = dq.QumodeCircuit(
nmode=self.nmode,
init_state='vac',
cutoff=self.cutoff,
basis=True
)
# 角度编码:将数据映射为相移
x_norm = torch.sigmoid(torch.tensor(x)) * np.pi
for j in range(self.nmode):
cir.ps(j, x_norm[j].item())
return cir
def feature_map(self, cir):
"""
应用特征映射电路
参数:
cir: 光量子电路
返回:
cir: 应用特征映射后的电路
"""
for _ in range(self.nlayer):
# 相移层(固定角度)
for j in range(self.nmode):
cir.ps(j, np.pi/4)
# 分束器层(固定参数)
for j in range(self.nmode - 1):
cir.bs([j, j+1], [np.pi/4, 0])
return cir
def get_quantum_state(self, cir):
"""
获取量子态(概率分布)
参数:
cir: 光量子电路
返回:
state_dict: {FockState: probability}
"""
return cir(is_prob=True)
def compute_overlap(self, state_dict1, state_dict2):
"""
计算两个量子态的内积(重叠)
参数:
state_dict1, state_dict2: 量子态字典 {FockState: probability}
返回:
overlap: |⟨ψ₁|ψ₂⟩|²
"""
overlap = 0.0
# 遍历两个态的共同基
for state, prob1 in state_dict1.items():
if state in state_dict2:
prob2 = state_dict2[state]
# 对于概率分布,重叠是相同基的概率乘积之和
overlap += prob1.item() * prob2.item()
return overlap
def compute_kernel_matrix(self, X1, X2=None):
"""
计算核矩阵
参数:
X1: 数据集1 [n_samples1, n_features]
X2: 数据集2 [n_samples2, n_features] (可选)
返回:
kernel_matrix: 核矩阵 [n_samples1, n_samples2]
"""
if X2 is None:
X2 = X1
n1, n2 = X1.shape[0], X2.shape[0]
kernel_matrix = np.zeros((n1, n2))
print(f'计算 {n1}x{n2} 核矩阵...')
# 预计算所有数据点的量子态
print('步骤 1: 编码数据和特征映射...')
states1 = []
for i in range(n1):
cir = self.encode(X1[i])
cir = self.feature_map(cir)
state = self.get_quantum_state(cir)
states1.append(state)
states2 = []
for j in range(n2):
cir = self.encode(X2[j])
cir = self.feature_map(cir)
state = self.get_quantum_state(cir)
states2.append(state)
print('步骤 2: 计算核矩阵...')
for i in range(n1):
for j in range(n2):
# 计算量子态的内积
kernel_matrix[i, j] = self.compute_overlap(states1[i], states2[j])
if (i * n2 + j + 1) % 10 == 0:
print(f'进度: {i*n2+j+1}/{n1*n2}')
return kernel_matrix
# 测试量子核
print('创建量子核...')
kernel = PhotonicQuantumKernel(nmode=4, nlayer=1, cutoff=3)
# 生成小测试数据集
X_test = torch.randn(6, 4) * 0.5 # 小数据集用于快速测试
print('\\n开始计算核矩阵...')
K = kernel.compute_kernel_matrix(X_test[:3], X_test[3:])
print(f'\\n核矩阵形状: {K.shape}')
print('核矩阵:')
print(K)
# 验证核矩阵的性质
print('\\n核矩阵性质检查:')
print(f'对角元素平均值(应接近 1): {np.trace(K) / K.shape[0]:.4f}')
print(f'是否对称: {np.allclose(K, K.T)}')
print(f'最小值: {K.min():.4f}')
print(f'最大值: {K.max():.4f}')
In [ ]:
Copied!
# 生成示例数据
from sklearn.datasets import make_blobs
# 生成3个聚类
X, y_true = make_blobs(
n_samples=100,
centers=3,
n_features=2,
random_state=42,
cluster_std=1.5
)
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y_true, cmap='viridis', alpha=0.6)
plt.xlabel('特征 1', fontsize=12)
plt.ylabel('特征 2', fontsize=12)
plt.title('真实聚类', fontsize=14, fontweight='bold')
plt.colorbar(label='聚类标签')
plt.grid(True, alpha=0.3)
plt.show()
print(f'数据形状: {X_scaled.shape}')
# 生成示例数据
from sklearn.datasets import make_blobs
# 生成3个聚类
X, y_true = make_blobs(
n_samples=100,
centers=3,
n_features=2,
random_state=42,
cluster_std=1.5
)
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y_true, cmap='viridis', alpha=0.6)
plt.xlabel('特征 1', fontsize=12)
plt.ylabel('特征 2', fontsize=12)
plt.title('真实聚类', fontsize=14, fontweight='bold')
plt.colorbar(label='聚类标签')
plt.grid(True, alpha=0.3)
plt.show()
print(f'数据形状: {X_scaled.shape}')
In [ ]:
Copied!
# 构建 GBS 聚类完整实现
def build_similarity_matrix(X, method='quantum_kernel', kernel=None):
"""
构建相似度矩阵
参数:
X: 数据 [n_samples, n_features]
method: 'quantum_kernel' 或 'euclidean'
kernel: 量子核对象(如果 method='quantum_kernel')
返回:
sim_mat: 相似度矩阵 [n_samples, n_samples]
"""
n = X.shape[0]
sim_mat = np.zeros((n, n))
if method == 'quantum_kernel' and kernel is not None:
print('使用量子核方法计算相似度矩阵...')
sim_mat = kernel.compute_kernel_matrix(X)
# 归一化到 [0, 1]
sim_mat = (sim_mat - sim_mat.min()) / (sim_mat.max() - sim_mat.min() + 1e-10)
else:
print('使用欧氏距离计算相似度矩阵...')
for i in range(n):
for j in range(i+1, n):
# 高斯核
dist = np.linalg.norm(X[i] - X[j])
sim = np.exp(-dist**2 / (2 * 1.0**2)) # sigma=1.0
sim_mat[i, j] = sim
sim_mat[j, i] = sim
return sim_mat
def quantum_spectral_clustering(X, n_clusters=3, nmode=4, use_quantum_kernel=True):
"""
使用量子方法的谱聚类
参数:
X: 数据 [n_samples, n_features]
n_clusters: 聚类数量
nmode: 量子模式数
use_quantum_kernel: 是否使用量子核
返回:
labels: 聚类标签
"""
from sklearn.cluster import SpectralClustering
from sklearn.metrics import silhouette_score
# 1. 构建相似度矩阵
if use_quantum_kernel:
print('创建量子核...')
kernel = PhotonicQuantumKernel(nmode=nmode, nlayer=1, cutoff=3)
sim_mat = build_similarity_matrix(X, method='quantum_kernel', kernel=kernel)
else:
sim_mat = build_similarity_matrix(X, method='euclidean')
print(f'相似度矩阵形状: {sim_mat.shape}')
# 2. 谱聚类
print(f'执行谱聚类(k={n_clusters})...')
clustering = SpectralClustering(
n_clusters=n_clusters,
affinity='precomputed',
random_state=42
)
labels = clustering.fit_predict(sim_mat)
# 3. 评估
score = silhouette_score(X, labels)
print(f'轮廓系数: {score:.4f}')
return labels, sim_mat
# 可视化聚类结果
def plot_clustering_result(X, labels, title):
"""可视化聚类结果"""
plt.figure(figsize=(10, 6))
plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis', alpha=0.6)
plt.xlabel('特征 1', fontsize=12)
plt.ylabel('特征 2', fontsize=12)
plt.title(title, fontsize=14, fontweight='bold')
plt.colorbar(label='聚类标签')
plt.grid(True, alpha=0.3)
plt.show()
# 测试聚类算法
print('='*60)
print('GBS 聚类算法测试')
print('='*60)
# 生成测试数据
print('\\n生成测试数据...')
X, y_true = make_blobs(
n_samples=50, # 减少样本数以加快计算
centers=3,
n_features=2,
random_state=42,
cluster_std=1.5
)
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
print(f'数据形状: {X_scaled.shape}')
# 方法1:经典谱聚类(基线)
print('\\n' + '='*60)
print('方法 1: 经典谱聚类(基线)')
print('='*60)
labels_classical, sim_classical = quantum_spectral_clustering(
X_scaled,
n_clusters=3,
use_quantum_kernel=False
)
plot_clustering_result(X_scaled, labels_classical, '经典谱聚类结果')
# 方法2:量子核谱聚类
print('\\n' + '='*60)
print('方法 2: 量子核谱聚类')
print('='*60)
labels_quantum, sim_quantum = quantum_spectral_clustering(
X_scaled,
n_clusters=3,
nmode=4,
use_quantum_kernel=True
)
plot_clustering_result(X_scaled, labels_quantum, '量子核谱聚类结果')
# 对比结果
print('\\n' + '='*60)
print('聚类结果对比')
print('='*60)
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
ari_classical = adjusted_rand_score(y_true, labels_classical)
nmi_classical = normalized_mutual_info_score(y_true, labels_classical)
ari_quantum = adjusted_rand_score(y_true, labels_quantum)
nmi_quantum = normalized_mutual_info_score(y_true, labels_quantum)
print(f'经典方法 - ARI: {ari_classical:.4f}, NMI: {nmi_classical:.4f}')
print(f'量子方法 - ARI: {ari_quantum:.4f}, NMI: {nmi_quantum:.4f}')
print('\\n注意:由于量子计算的计算复杂性,这里使用小数据集演示。')
print('实际应用中可以考虑更高效的量子核近似方法。')
# 构建 GBS 聚类完整实现
def build_similarity_matrix(X, method='quantum_kernel', kernel=None):
"""
构建相似度矩阵
参数:
X: 数据 [n_samples, n_features]
method: 'quantum_kernel' 或 'euclidean'
kernel: 量子核对象(如果 method='quantum_kernel')
返回:
sim_mat: 相似度矩阵 [n_samples, n_samples]
"""
n = X.shape[0]
sim_mat = np.zeros((n, n))
if method == 'quantum_kernel' and kernel is not None:
print('使用量子核方法计算相似度矩阵...')
sim_mat = kernel.compute_kernel_matrix(X)
# 归一化到 [0, 1]
sim_mat = (sim_mat - sim_mat.min()) / (sim_mat.max() - sim_mat.min() + 1e-10)
else:
print('使用欧氏距离计算相似度矩阵...')
for i in range(n):
for j in range(i+1, n):
# 高斯核
dist = np.linalg.norm(X[i] - X[j])
sim = np.exp(-dist**2 / (2 * 1.0**2)) # sigma=1.0
sim_mat[i, j] = sim
sim_mat[j, i] = sim
return sim_mat
def quantum_spectral_clustering(X, n_clusters=3, nmode=4, use_quantum_kernel=True):
"""
使用量子方法的谱聚类
参数:
X: 数据 [n_samples, n_features]
n_clusters: 聚类数量
nmode: 量子模式数
use_quantum_kernel: 是否使用量子核
返回:
labels: 聚类标签
"""
from sklearn.cluster import SpectralClustering
from sklearn.metrics import silhouette_score
# 1. 构建相似度矩阵
if use_quantum_kernel:
print('创建量子核...')
kernel = PhotonicQuantumKernel(nmode=nmode, nlayer=1, cutoff=3)
sim_mat = build_similarity_matrix(X, method='quantum_kernel', kernel=kernel)
else:
sim_mat = build_similarity_matrix(X, method='euclidean')
print(f'相似度矩阵形状: {sim_mat.shape}')
# 2. 谱聚类
print(f'执行谱聚类(k={n_clusters})...')
clustering = SpectralClustering(
n_clusters=n_clusters,
affinity='precomputed',
random_state=42
)
labels = clustering.fit_predict(sim_mat)
# 3. 评估
score = silhouette_score(X, labels)
print(f'轮廓系数: {score:.4f}')
return labels, sim_mat
# 可视化聚类结果
def plot_clustering_result(X, labels, title):
"""可视化聚类结果"""
plt.figure(figsize=(10, 6))
plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis', alpha=0.6)
plt.xlabel('特征 1', fontsize=12)
plt.ylabel('特征 2', fontsize=12)
plt.title(title, fontsize=14, fontweight='bold')
plt.colorbar(label='聚类标签')
plt.grid(True, alpha=0.3)
plt.show()
# 测试聚类算法
print('='*60)
print('GBS 聚类算法测试')
print('='*60)
# 生成测试数据
print('\\n生成测试数据...')
X, y_true = make_blobs(
n_samples=50, # 减少样本数以加快计算
centers=3,
n_features=2,
random_state=42,
cluster_std=1.5
)
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
print(f'数据形状: {X_scaled.shape}')
# 方法1:经典谱聚类(基线)
print('\\n' + '='*60)
print('方法 1: 经典谱聚类(基线)')
print('='*60)
labels_classical, sim_classical = quantum_spectral_clustering(
X_scaled,
n_clusters=3,
use_quantum_kernel=False
)
plot_clustering_result(X_scaled, labels_classical, '经典谱聚类结果')
# 方法2:量子核谱聚类
print('\\n' + '='*60)
print('方法 2: 量子核谱聚类')
print('='*60)
labels_quantum, sim_quantum = quantum_spectral_clustering(
X_scaled,
n_clusters=3,
nmode=4,
use_quantum_kernel=True
)
plot_clustering_result(X_scaled, labels_quantum, '量子核谱聚类结果')
# 对比结果
print('\\n' + '='*60)
print('聚类结果对比')
print('='*60)
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
ari_classical = adjusted_rand_score(y_true, labels_classical)
nmi_classical = normalized_mutual_info_score(y_true, labels_classical)
ari_quantum = adjusted_rand_score(y_true, labels_quantum)
nmi_quantum = normalized_mutual_info_score(y_true, labels_quantum)
print(f'经典方法 - ARI: {ari_classical:.4f}, NMI: {nmi_classical:.4f}')
print(f'量子方法 - ARI: {ari_quantum:.4f}, NMI: {nmi_quantum:.4f}')
print('\\n注意:由于量子计算的计算复杂性,这里使用小数据集演示。')
print('实际应用中可以考虑更高效的量子核近似方法。')
In [ ]:
Copied!
class PhotonicClassifier(nn.Module):
"""
光量子分类器
结合光量子层和经典全连接层,使用角度编码
"""
def __init__(self, nmode=4, nlayer=2, cutoff=3, num_classes=3, encoding='angle'):
super().__init__()
self.nmode = nmode
self.num_classes = num_classes
self.encoding = encoding
# 光量子层(使用角度编码)
self.photonic_layer = PhotonicQLayer(nmode, nlayer, cutoff, encoding=encoding)
# 经典全连接层
self.fc = nn.Sequential(
nn.Linear(nmode, 16),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(16, num_classes)
)
def forward(self, x):
# 光量子层
photonic_features = self.photonic_layer(x)
# 经典层
output = self.fc(photonic_features)
return output
# 创建分类器(使用角度编码)
print('创建光量子分类器(使用角度编码)...')
model = PhotonicClassifier(nmode=4, nlayer=1, cutoff=3, num_classes=3, encoding='angle')
print('光量子分类器创建完成!')
print(f'参数量: {sum(p.numel() for p in model.parameters())}')
print(f'\\n可训练参数:')
for name, param in model.named_parameters():
print(f' {name}: {param.shape}')
class PhotonicClassifier(nn.Module):
"""
光量子分类器
结合光量子层和经典全连接层,使用角度编码
"""
def __init__(self, nmode=4, nlayer=2, cutoff=3, num_classes=3, encoding='angle'):
super().__init__()
self.nmode = nmode
self.num_classes = num_classes
self.encoding = encoding
# 光量子层(使用角度编码)
self.photonic_layer = PhotonicQLayer(nmode, nlayer, cutoff, encoding=encoding)
# 经典全连接层
self.fc = nn.Sequential(
nn.Linear(nmode, 16),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(16, num_classes)
)
def forward(self, x):
# 光量子层
photonic_features = self.photonic_layer(x)
# 经典层
output = self.fc(photonic_features)
return output
# 创建分类器(使用角度编码)
print('创建光量子分类器(使用角度编码)...')
model = PhotonicClassifier(nmode=4, nlayer=1, cutoff=3, num_classes=3, encoding='angle')
print('光量子分类器创建完成!')
print(f'参数量: {sum(p.numel() for p in model.parameters())}')
print(f'\\n可训练参数:')
for name, param in model.named_parameters():
print(f' {name}: {param.shape}')
In [ ]:
Copied!
# 准备训练数据
from sklearn.model_selection import train_test_split
# 生成更大的数据集
X, y_true = make_blobs(
n_samples=150, # 增加样本数
centers=3,
n_features=2,
random_state=42,
cluster_std=1.5
)
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 将2D特征扩展到4D(添加非线性特征)
X_expanded = np.zeros((X_scaled.shape[0], 4))
X_expanded[:, :2] = X_scaled
X_expanded[:, 2] = X_scaled[:, 0] * X_scaled[:, 1] # 交互项
X_expanded[:, 3] = X_scaled[:, 0]**2 + X_scaled[:, 1]**2 # 径向特征
# 转换为张量(不缩放,使用角度编码)
X_tensor = torch.FloatTensor(X_expanded)
y_tensor = torch.LongTensor(y_true)
# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(
X_tensor, y_tensor, test_size=0.2, random_state=42
)
print(f'训练集大小: {X_train.shape[0]}')
print(f'验证集大小: {X_val.shape[0]}')
# 创建模型
model = PhotonicClassifier(nmode=4, nlayer=1, cutoff=3, num_classes=3, encoding='angle')
# 训练配置
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
n_epochs = 20
# 训练循环
print('\\n' + '='*60)
print('开始训练')
print('='*60)
train_losses = []
val_losses = []
train_accs = []
val_accs = []
for epoch in range(n_epochs):
# 训练阶段
model.train()
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
# 计算训练准确率
with torch.no_grad():
_, predicted = torch.max(outputs.data, 1)
train_acc = (predicted == y_train).sum().item() / len(y_train)
# 验证阶段
model.eval()
with torch.no_grad():
val_outputs = model(X_val)
val_loss = criterion(val_outputs, y_val)
_, val_predicted = torch.max(val_outputs.data, 1)
val_acc = (val_predicted == y_val).sum().item() / len(y_val)
# 记录指标
train_losses.append(loss.item())
val_losses.append(val_loss.item())
train_accs.append(train_acc)
val_accs.append(val_acc)
# 打印进度
if (epoch + 1) % 5 == 0 or epoch == 0:
print(f'Epoch [{epoch+1}/{n_epochs}]')
print(f' Train - Loss: {loss.item():.4f}, Acc: {train_acc*100:.2f}%')
print(f' Val - Loss: {val_loss.item():.4f}, Acc: {val_acc*100:.2f}%')
print('\\n' + '='*60)
print('训练完成!')
print('='*60)
# 最终评估
print(f'\\n最终训练准确率: {train_accs[-1]*100:.2f}%')
print(f'最终验证准确率: {val_accs[-1]*100:.2f}%')
# 可视化训练过程
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 损失曲线
axes[0].plot(train_losses, label='Train Loss', marker='o')
axes[0].plot(val_losses, label='Val Loss', marker='s')
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss', fontsize=12)
axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 准确率曲线
axes[1].plot([acc*100 for acc in train_accs], label='Train Acc', marker='o')
axes[1].plot([acc*100 for acc in val_accs], label='Val Acc', marker='s')
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('Accuracy (%)', fontsize=12)
axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 可视化分类结果
model.eval()
with torch.no_grad():
# 对整个数据集进行预测
all_outputs = model(X_tensor)
_, all_predicted = torch.max(all_outputs.data, 1)
# 绘制预测结果
plt.figure(figsize=(10, 6))
plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=all_predicted.numpy(),
cmap='viridis', alpha=0.6, marker='o', label='Predicted')
plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y_true,
cmap='viridis', alpha=0.2, marker='x', s=100, label='True')
plt.xlabel('Feature 1', fontsize=12)
plt.ylabel('Feature 2', fontsize=12)
plt.title('Classification Results (Predicted vs True)', fontsize=14, fontweight='bold')
plt.colorbar(label='Class Label')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 计算最终指标
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
print('\\n' + '='*60)
print('分类报告')
print('='*60)
print(classification_report(y_true, all_predicted.numpy(),
target_names=[f'Class {i}' for i in range(3)]))
# 混淆矩阵
cm = confusion_matrix(y_true, all_predicted.numpy())
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=[f'Class {i}' for i in range(3)],
yticklabels=[f'Class {i}' for i in range(3)])
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix', fontsize=14, fontweight='bold')
plt.show()
print('\\n注意:这是一个演示实例。实际应用中可能需要:')
print('1. 更大的数据集')
print('2. 更深的量子电路')
print('3. 超参数调优(学习率、层数、cutoff等)')
print('4. 更长的训练时间')
# 准备训练数据
from sklearn.model_selection import train_test_split
# 生成更大的数据集
X, y_true = make_blobs(
n_samples=150, # 增加样本数
centers=3,
n_features=2,
random_state=42,
cluster_std=1.5
)
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 将2D特征扩展到4D(添加非线性特征)
X_expanded = np.zeros((X_scaled.shape[0], 4))
X_expanded[:, :2] = X_scaled
X_expanded[:, 2] = X_scaled[:, 0] * X_scaled[:, 1] # 交互项
X_expanded[:, 3] = X_scaled[:, 0]**2 + X_scaled[:, 1]**2 # 径向特征
# 转换为张量(不缩放,使用角度编码)
X_tensor = torch.FloatTensor(X_expanded)
y_tensor = torch.LongTensor(y_true)
# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(
X_tensor, y_tensor, test_size=0.2, random_state=42
)
print(f'训练集大小: {X_train.shape[0]}')
print(f'验证集大小: {X_val.shape[0]}')
# 创建模型
model = PhotonicClassifier(nmode=4, nlayer=1, cutoff=3, num_classes=3, encoding='angle')
# 训练配置
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
n_epochs = 20
# 训练循环
print('\\n' + '='*60)
print('开始训练')
print('='*60)
train_losses = []
val_losses = []
train_accs = []
val_accs = []
for epoch in range(n_epochs):
# 训练阶段
model.train()
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
# 计算训练准确率
with torch.no_grad():
_, predicted = torch.max(outputs.data, 1)
train_acc = (predicted == y_train).sum().item() / len(y_train)
# 验证阶段
model.eval()
with torch.no_grad():
val_outputs = model(X_val)
val_loss = criterion(val_outputs, y_val)
_, val_predicted = torch.max(val_outputs.data, 1)
val_acc = (val_predicted == y_val).sum().item() / len(y_val)
# 记录指标
train_losses.append(loss.item())
val_losses.append(val_loss.item())
train_accs.append(train_acc)
val_accs.append(val_acc)
# 打印进度
if (epoch + 1) % 5 == 0 or epoch == 0:
print(f'Epoch [{epoch+1}/{n_epochs}]')
print(f' Train - Loss: {loss.item():.4f}, Acc: {train_acc*100:.2f}%')
print(f' Val - Loss: {val_loss.item():.4f}, Acc: {val_acc*100:.2f}%')
print('\\n' + '='*60)
print('训练完成!')
print('='*60)
# 最终评估
print(f'\\n最终训练准确率: {train_accs[-1]*100:.2f}%')
print(f'最终验证准确率: {val_accs[-1]*100:.2f}%')
# 可视化训练过程
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 损失曲线
axes[0].plot(train_losses, label='Train Loss', marker='o')
axes[0].plot(val_losses, label='Val Loss', marker='s')
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss', fontsize=12)
axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 准确率曲线
axes[1].plot([acc*100 for acc in train_accs], label='Train Acc', marker='o')
axes[1].plot([acc*100 for acc in val_accs], label='Val Acc', marker='s')
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('Accuracy (%)', fontsize=12)
axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 可视化分类结果
model.eval()
with torch.no_grad():
# 对整个数据集进行预测
all_outputs = model(X_tensor)
_, all_predicted = torch.max(all_outputs.data, 1)
# 绘制预测结果
plt.figure(figsize=(10, 6))
plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=all_predicted.numpy(),
cmap='viridis', alpha=0.6, marker='o', label='Predicted')
plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y_true,
cmap='viridis', alpha=0.2, marker='x', s=100, label='True')
plt.xlabel('Feature 1', fontsize=12)
plt.ylabel('Feature 2', fontsize=12)
plt.title('Classification Results (Predicted vs True)', fontsize=14, fontweight='bold')
plt.colorbar(label='Class Label')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 计算最终指标
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
print('\\n' + '='*60)
print('分类报告')
print('='*60)
print(classification_report(y_true, all_predicted.numpy(),
target_names=[f'Class {i}' for i in range(3)]))
# 混淆矩阵
cm = confusion_matrix(y_true, all_predicted.numpy())
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=[f'Class {i}' for i in range(3)],
yticklabels=[f'Class {i}' for i in range(3)])
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix', fontsize=14, fontweight='bold')
plt.show()
print('\\n注意:这是一个演示实例。实际应用中可能需要:')
print('1. 更大的数据集')
print('2. 更深的量子电路')
print('3. 超参数调优(学习率、层数、cutoff等)')
print('4. 更长的训练时间')
总结¶
本教程涵盖了 GBS 和光量子机器学习的核心内容:
核心要点¶
1. GBS 理论¶
- 高斯玻色采样使用压缩态作为输入
- 输出概率与 Hafnian 函数相关
- 比传统玻色采样更容易实现
2. 光量子神经网络¶
- 使用 Fock 态编码数据
- 通过光量子门(相移器、分束器)进行变换
- 可以与经典层结合构建混合模型
3. 量子核方法¶
- 使用光量子电路计算核矩阵
- 可以用于支持向量机等算法
4. GBS 应用¶
- 图聚类:通过采样稠密子图
- 分类:结合量子核或量子神经网络
实用建议¶
选择合适的 Cutoff¶
def estimate_cutoff(squeezing_params):
mean_photon = sum([np.sinh(p)**2 for p in squeezing_params])
return int(3 * mean_photon + 5)
数据编码策略¶
- 角度编码:将数据映射为旋转角度
- 振幅编码:将数据映射为光子数
- 基态编码:直接使用基态
性能优化¶
- 使用 MPS 表示降低内存
- 批量处理提高效率
- GPU 加速(如果可用)
进一步学习¶
- 探索更深层的量子电路
- 研究量子生成模型
- 尝试真实数据集
- 了解量子优势的理论证明
参考文献¶
- Hamilton, C. S., et al. (2017). Gaussian boson sampling. Physical Review Letters.
- Bromley, T. R., et al. (2020). Applications of near-term photonic quantum computers. Quantum Science and Technology.
- Quesada, N., & Arrazola, J. M. (2020). Simulating realistic non-Gaussian boson sampling with GBS. Physical Review Research.
恭喜完成教程!
您已经学习了:
- GBS 的理论基础
- 光量子神经网络的设计
- 量子核方法的实现
- 实际应用案例
继续探索光量子机器学习的奇妙世界吧!