使用更少的参数建模时间序列

先前的Dlinear已经足够简单,且击败了一众transformer模型。

我们还能使用更少的参数吗?Dlinear使用了两个线性网络,分别周期和残差,我们能只用一个吗?

这也就是FITS所做的,我们直接在傅里叶域上做神经网络,这样能实现了周期和残差的同时建模。

FITS

FITS是DLinear的后续之作,依旧保持着简单有效的方法。获得ICLR2024。

算法

具体算法其实很清晰。

  • 首先对于长度为L的序列,作者首先进行了RIN归一化,目的是为了使序列均值为0,然后使用傅立叶变换rFFT把时域信息转到频域(复数域)。
  • 然后,使用低通滤波器(LPF)将高频分量过滤掉,这部分在代码中是通过一个cut_freq参数来确定的。这样的好处在于能够去掉噪声,减少模型参数量。
  • 之后,通过图中紫色部分的Complex-valued Linear Layer,这部分是一个上采样(线性层),相当于对频率特征做了线性变换,其长度取决于pred_len和seq_len的比例。
  • 最后,将新的频率特征进行零pad,使用傅立叶逆变换irFFT转回时域。

简而言之,其实就是在频域上的线性层。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import models.NLinear as DLinear

class Model(nn.Module):

# FITS: Frequency Interpolation Time Series Forecasting

def __init__(self, configs):
super(Model, self).__init__()
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.individual = configs.individual
self.channels = configs.enc_in

self.dominance_freq=configs.cut_freq # 720/24
self.length_ratio = (self.seq_len + self.pred_len)/self.seq_len

if self.individual:
self.freq_upsampler = nn.ModuleList()
for i in range(self.channels):
self.freq_upsampler.append(nn.Linear(self.dominance_freq, int(self.dominance_freq*self.length_ratio)).to(torch.cfloat))

else:
self.freq_upsampler = nn.Linear(self.dominance_freq, int(self.dominance_freq*self.length_ratio)).to(torch.cfloat) # complex layer for frequency upcampling]
# configs.pred_len=configs.seq_len+configs.pred_len
# #self.Dlinear=DLinear.Model(configs)
# configs.pred_len=self.pred_len


def forward(self, x):
# RIN
x_mean = torch.mean(x, dim=1, keepdim=True)
x = x - x_mean
x_var=torch.var(x, dim=1, keepdim=True)+ 1e-5
# print(x_var)
x = x / torch.sqrt(x_var)

low_specx = torch.fft.rfft(x, dim=1)
low_specx[:,self.dominance_freq:]=0 # LPF
low_specx = low_specx[:,0:self.dominance_freq,:] # LPF
# print(low_specx.permute(0,2,1))
if self.individual:
low_specxy_ = torch.zeros([low_specx.size(0),int(self.dominance_freq*self.length_ratio),low_specx.size(2)],dtype=low_specx.dtype).to(low_specx.device)
for i in range(self.channels):
low_specxy_[:,:,i]=self.freq_upsampler[i](low_specx[:,:,i].permute(0,1)).permute(0,1)
else:
low_specxy_ = self.freq_upsampler(low_specx.permute(0,2,1)).permute(0,2,1)
# print(low_specxy_)
low_specxy = torch.zeros([low_specxy_.size(0),int((self.seq_len+self.pred_len)/2+1),low_specxy_.size(2)],dtype=low_specxy_.dtype).to(low_specxy_.device)
low_specxy[:,0:low_specxy_.size(1),:]=low_specxy_ # zero padding
low_xy=torch.fft.irfft(low_specxy, dim=1)
low_xy=low_xy * self.length_ratio # energy compemsation for the length change
# dom_x=x-low_x

# dom_xy=self.Dlinear(dom_x)
# xy=(low_xy+dom_xy) * torch.sqrt(x_var) +x_mean # REVERSE RIN
xy=(low_xy) * torch.sqrt(x_var) +x_mean
return xy, low_xy* torch.sqrt(x_var)

值得注意的是,为了使用某些py库没有复数层功能,作者还提供了如何使用实数层来实现这一功能。

复数中:
$$
Y=XW
$$

$$
\begin{align}
Y_{real}=X_{real}W_{real}-X_{imag}W_{imaag}
\\
Y_{imag}=X_{real}W_{imag}+X_{imag}W_{real}
\end{align}
$$

SparseTSF

华南理工出品,ICML 2024 Oral

算法

算法还是在时域里面做。

具体步骤分为:切块,卷积提取特征,上下采样。

我们直接来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import torch
import torch.nn as nn

class Model(nn.Module):
def __init__(self, configs):
super(Model, self).__init__()

# get parameters
self.seq_len = configs.seq_len
self.pred_len = configs.pred_len
self.enc_in = configs.enc_in
self.period_len = configs.period_len
self.d_model = configs.d_model
self.model_type = configs.model_type
assert self.model_type in ['linear', 'mlp']

self.seg_num_x = self.seq_len // self.period_len
self.seg_num_y = self.pred_len // self.period_len

self.conv1d = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=1 + 2 * (self.period_len // 2),
stride=1, padding=self.period_len // 2, padding_mode="zeros", bias=False)

if self.model_type == 'linear':
self.linear = nn.Linear(self.seg_num_x, self.seg_num_y, bias=False)
elif self.model_type == 'mlp':
self.mlp = nn.Sequential(
nn.Linear(self.seg_num_x, self.d_model),
nn.ReLU(),
nn.Linear(self.d_model, self.seg_num_y)
)


def forward(self, x):
batch_size = x.shape[0]
# normalization and permute b,s,c -> b,c,s
seq_mean = torch.mean(x, dim=1).unsqueeze(1)
x = (x - seq_mean).permute(0, 2, 1)

# 1D convolution aggregation
x = self.conv1d(x.reshape(-1, 1, self.seq_len)).reshape(-1, self.enc_in, self.seq_len) + x

# downsampling: b,c,s -> bc,n,w -> bc,w,n
x = x.reshape(-1, self.seg_num_x, self.period_len).permute(0, 2, 1)

# sparse forecasting
if self.model_type == 'linear':
y = self.linear(x) # bc,w,m
elif self.model_type == 'mlp':
y = self.mlp(x)

# upsampling: bc,w,m -> bc,m,w -> b,c,s
y = y.permute(0, 2, 1).reshape(batch_size, self.enc_in, self.pred_len)

# permute and denorm
y = y.permute(0, 2, 1) + seq_mean

return y

使用更少的参数建模时间序列
https://lijianxiong.work/2025/20250110/
作者
LJX
发布于
2025年1月10日
许可协议