HPC101 Lab2 report

实验目标

借助 Numpy 实现一个支持批量处理的向量化的双线性插值

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import numpy as np

def bilinear_interp_vectorized(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""
This is the vectorized implementation of bilinear interpolation.
- a is a ND array with shape [N, H1, W1, C], dtype = int64
- b is a ND array with shape [N, H2, W2, 2], dtype = float64
- return a ND array with shape [N, H2, W2, C], dtype = int64
"""
N, H1, W1, C=a.shape
N1,H2, W2, _=b.shape
assert N == N1

res=np.empty((N,H2,W2,C),dtype=np.int64)

# 获得左上方与右下方最近的整点坐标
pos1=np.floor(b).astype(np.int64)
pos2=np.ceil(b).astype(np.int64)

# 获取与整点的水平距离与竖直距离
delta1=b-pos1
delta2=pos2-b

# 获取左边的行向量与右边的列向量
Dx=np.concatenate((delta2[...,np.newaxis,0],delta1[...,np.newaxis,0]),axis=3)
Dx=Dx[...,np.newaxis,:,np.newaxis]
Dy=np.concatenate((delta2[...,np.newaxis,1],delta1[...,np.newaxis,1]),axis=3)
Dy=Dy[...,np.newaxis,np.newaxis,:]


# 获取中间的 2*2 矩阵
index_n=np.arange(N).repeat(H2*W2,axis=0)
index_x1=pos1[...,0].flatten()
index_y1=pos1[...,1].flatten()
index_x2=pos2[...,0].flatten()
index_y2=pos2[...,1].flatten()

# 通过 fancy index 取得周围 4 个点对应的值
val_11=a[index_n,index_x1,index_y1,:].reshape(N,H2,W2,-1)
val_12=a[index_n,index_x1,index_y2,:].reshape(N,H2,W2,-1)
val_21=a[index_n,index_x2,index_y1,:].reshape(N,H2,W2,-1)
val_22=a[index_n,index_x2,index_y2,:].reshape(N,H2,W2,-1)

# 将获得的值合并为对应的 2*2 矩阵
val_1=np.concatenate((val_11[...,np.newaxis],val_21[...,np.newaxis]),axis=4)
val_2=np.concatenate((val_12[...,np.newaxis],val_22[...,np.newaxis]),axis=4)
val=np.concatenate((val_1[...,np.newaxis,:],val_2[...,np.newaxis,:]),axis=4)

# 执行矩阵乘法
res=Dy@val@Dx
# 调整矩阵形状与数据类型
res=res.reshape(N,H2,W2,C).astype(np.int64)
return res

思路

在该线性插值公式中,由于我们取的基准点是整点,并且是包含插值点的最小正方形的四个顶点,可知 $x2-x1=1$ ,$y2-y1=1$ ,因而公式的第一项恒为 1,故我们只需要求出后面三个矩阵乘积的大小。

这个函数要返回的数据是四维数组 $res[N][H2][W2][C]$ ,对于每个不同的 $(n,h2,w2,c)$ , 维数为 2 且不一定相同,需要用六维的数组存储。而虽然 维数都是1,并且与通道序号 $C$ 无关,可以用四维数组存储,但我们为了保证最后的矩阵乘法可以运行,需要保证存储三个矩阵的数组的维数相同,因此决定统一用六维的数组存储这些矩阵。

首先我们应该求出插值点 $(x,y)$ 附近整点的坐标,易知这四个点的坐标分别为 $(\lfloor x \rfloor,\lfloor y\rfloor),(\lfloor x\rfloor,\lceil y\rceil),(\lceil x\rceil,\lfloor x\rfloor),(\lceil x\rceil,\lceil y\rceil)$ ,所以我们可以通过 numpy 的 floor 与 ceil 函数对 b 中数据批量处理得到 $pos1[N][H2][W2][2]$ (存有 $x_1$ 与 $y_1$)和 $pos2[N][H2][W2][2]$ (存有 $x_2$ 与 $y_2$)。然后作差得 $delta1=b-pos1,delta2=pos2-b$ ,再合并成分别存储有行向量与列向量的六维矩阵 $Dx$ 与 $Dy$ 。

接着我们需要求中间的系数矩阵,矩阵中的数值来源于 $a[N][H1][W1][C]$ ,为了与行向量和列向量一一对应,我们需要建立索引数组来把其中的数据按照与 $Dx$ ,$Dy$ 相同的数据顺序取出来。对于每层图像,有 $H2 \cdot W2$ 个插值点,所以构建 index_n=np.arange(N).repeat(H2*W2,axis=0),而每层的基准点坐标则直接从 $pos1$ 与 $pos2$ 的切片中取得,将数据合并,构建六维数组。

最后执行矩阵乘法,并调整矩阵形状与数据类型,返回结果。

测试:正确性与加速比

image-20231027225424915

测试编号 正确性 基准代码运行时间/s 向量化代码运行时间/s 加速比
1 Y 147.7335 5.6584 26.1085
2 Y 146.1727 5.5062 26.5470
3 Y 145.4642 5.8223 24.9838
4 Y 152.4930 5.6819 26.8386
avg 100% / / 26.1195

调优

使用 time 测试一下每个部分所用的时间,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def bilinear_interp_vectorized(a: np.ndarray, b: np.ndarray) -> np.ndarray:
N, H1, W1, C=a.shape
H2=b.shape[1]
W2=b.shape[2]

t=[]

t.append(time.time()) # 0
res=np.empty((N,H2,W2,C),dtype=np.int64)

t.append(time.time()) # 1
pos1=np.floor(b).astype(np.int64)
pos2=np.ceil(b).astype(np.int64)

t.append(time.time()) # 2
delta1=b-pos1
delta2=pos2-b

t.append(time.time()) # 3
Dx=np.concatenate((delta2[...,np.newaxis,0],delta1[...,np.newaxis,0]),axis=3)
Dx=Dx[...,np.newaxis,:,np.newaxis]
Dy=np.concatenate((delta2[...,np.newaxis,1],delta1[...,np.newaxis,1]),axis=3)
Dy=Dy[...,np.newaxis,np.newaxis,:]

t.append(time.time()) # 4
index_n=np.arange(N).repeat(H2*W2,axis=0)
index_x1=pos1[...,0].flatten()
index_y1=pos1[...,1].flatten()
index_x2=pos2[...,0].flatten()
index_y2=pos2[...,1].flatten()

t.append(time.time()) # 5
val_11=a[index_n,index_x1,index_y1,:].reshape(N,H2,W2,-1)
val_12=a[index_n,index_x1,index_y2,:].reshape(N,H2,W2,-1)
val_21=a[index_n,index_x2,index_y1,:].reshape(N,H2,W2,-1)
val_22=a[index_n,index_x2,index_y2,:].reshape(N,H2,W2,-1)

t.append(time.time()) # 6
val_1=np.concatenate((val_11[...,np.newaxis],val_21[...,np.newaxis]),axis=4)
val_2=np.concatenate((val_12[...,np.newaxis],val_22[...,np.newaxis]),axis=4)
val=np.concatenate((val_1[...,np.newaxis,:],val_2[...,np.newaxis,:]),axis=4)

t.append(time.time()) # 7
res=Dy@val@Dx
t.append(time.time()) # 8
res=res.reshape(N,H2,W2,C).astype(np.int64)
t.append(time.time())

for i in range(9):
print("%d: %.6fs"%(i, t[i+1]-t[i]))


return res
id test 1 test 2 test 3
0 0.000000s 0.000000s 0.000000s
1 0.169998s 0.175999s 0.180998s
2 0.065002s 0.066999s 0.066001s
3 0.076056s 0.078063s 0.076253s
4 0.084945s 0.083939s 0.085745s
5 1.203999s 1.212999s 1.389002s
6 0.428073s 0.423998s 0.436043s
7 3.228925s 3.353000s 3.326474s
8 0.074074s 0.068001s 0.065000s

显然,第5步的 fancy index 和第7步的矩阵乘法耗时最长

想法:可以尝试减少数组维数,把前几维数据在执行矩阵乘法前横向展开,减少执行矩阵乘法时数据的深度,也许能加快矩阵乘法的速度

重新编写代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def bilinear_interp_vectorized(a: np.ndarray, b: np.ndarray) -> np.ndarray:
N, H1, W1, C=a.shape
H2=b.shape[1]
W2=b.shape[2]

t=[]

t.append(time.time()) # 0
res=np.empty((N,H2,W2,C),dtype=np.int64)

t.append(time.time()) # 1
b1=b.reshape(-1,2)
pos1=np.floor(b1).astype(np.int64)
pos2=np.ceil(b1).astype(np.int64)

t.append(time.time()) # 2
delta1=b1-pos1
delta2=pos2-b1

t.append(time.time()) # 3
Dx=np.concatenate((delta2[...,np.newaxis,0],delta1[...,np.newaxis,0]),axis=1)
Dx=Dx[...,np.newaxis,:,np.newaxis]
Dy=np.concatenate((delta2[...,np.newaxis,1],delta1[...,np.newaxis,1]),axis=1)
Dy=Dy[...,np.newaxis,np.newaxis,:]

t.append(time.time()) # 4
index_n=np.arange(N).repeat(H2*W2,axis=0)
index_x1=pos1[...,0].flatten()
index_y1=pos1[...,1].flatten()
index_x2=pos2[...,0].flatten()
index_y2=pos2[...,1].flatten()

t.append(time.time()) # 5
val_11=a[index_n,index_x1,index_y1,:].reshape(-1,C)
val_12=a[index_n,index_x1,index_y2,:].reshape(-1,C)
val_21=a[index_n,index_x2,index_y1,:].reshape(-1,C)
val_22=a[index_n,index_x2,index_y2,:].reshape(-1,C)

t.append(time.time()) # 6
val_1=np.concatenate((val_11[...,np.newaxis],val_21[...,np.newaxis]),axis=2)
val_2=np.concatenate((val_12[...,np.newaxis],val_22[...,np.newaxis]),axis=2)
val=np.concatenate((val_1[...,np.newaxis,:],val_2[...,np.newaxis,:]),axis=2)

t.append(time.time()) # 7
res=Dy@val@Dx
t.append(time.time()) # 8
res=res.reshape(N,H2,W2,C).astype(np.int64)
t.append(time.time())

for i in range(9):
print("%d: %.6fs"%(i, t[i+1]-t[i]))


return res

修改 main.py ,与原来的版本进行对比:

编号 未压缩版本耗时 压缩版本耗时 整体加速比 矩阵乘法加速比
1 6.1774s 6.0661s 1.018 1.015
2 5.7063s 5.6795s 1.005 0.997
3 5.6909s 5.7629s 0.988 0.982
4 6.1859s 5.7574s 1.074 1.047
5 6.6703s 5.7098s 1.168 1.160
6 5.8854s 5.6585s 1.040 1.046

测试结果显示,整体优化并不明显,而矩阵乘法的速度提升更不明显,所以执行矩阵乘法时数据的深度与矩阵乘法的速度关系不是很大。(why?)