使用LSTM模型对心跳时间序列数据预测(Python代码,ipynb环境)

所用模块版本:

matplotlib==3.7.1
numpy==1.24.4
pandas==1.5.3
scikit_learn==1.2.2
scipy==1.10.1
seaborn==0.12.2
statsmodels==0.14.0
torch==1.13.1
torch==2.0.1
wfdb==4.1.2

主代码:

import itertools
import pandas as pd
import matplotlib.pyplot as plt
#完整代码:mbd.pub/o/bread/mbd-ZpWUmZ1x


import wfdb
import matplotlib.pyplot as plt

# Specify the path to your downloaded data
path_to_data = 'file_resource/ecg-id-database-1.0.0/Person_03/'

# The record name is the filename without the extension
record_name = 'rec_1'

# Use the 'rdrecord' function to read the ECG data
record = wfdb.rdrecord(f'{path_to_data}/{record_name}')
# Plot the ECG data
plt.figure(figsize=(10, 4))
plt.plot(record.p_signal[:,1])
plt.title('ECG Signal')
plt.xlabel('Time (samples)')
plt.ylabel('Amplitude')
plt.show()
pd.DataFrame(record.p_signal[:,1],columns=["hr"]).to_csv("./P3_rec_1.csv")
hr2 = pd.DataFrame(record.p_signal[:,1],columns=["hr"])[0:10000]
# hr2["index"] = hr2.index


plt.figure(figsize=(10, 4))


from torch import nn
import pandas as pd
import numpy as np
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
df = hr2.copy()
df_train = df.loc[:8000].copy()
df_test = df.loc[8000:10000].copy()

target_sensor = "hr"
# features = list(df.columns.difference([target_sensor]))
features = ["hr"]
batch_size =32
forecast_lead = 5
forcast_step = 1
target = f"{target_sensor}_lead{forecast_lead}"

df[target] = df[target_sensor].shift(-forecast_lead)
df = df.iloc[:-forecast_lead]

df_train = df.loc[:8000].copy()
df_test = df.loc[8000-forecast_lead:].copy()

print("Test set fraction:", len(df_train) / len(df_test))

target_mean = df_train[target].mean()
target_stdev = df_train[target].std()
for c in df_train.columns:
    mean = df_train[c].mean()
    stdev = df_train[c].std()

    df_train[c] = (df_train[c] - mean) / stdev
    df_test[c] = (df_test[c] - mean) / stdev
    
    
    
import torch
from torch.utils.data import Dataset

# parse the data into sliding windows
class SequenceDataset(Dataset):
    def __init__(self, dataframe, target, features,sequence_head=10, sequence_length=5):
        self.features = features
        self.target = target
        self.sequence_head = sequence_head
        self.sequence_length = sequence_length
        self.y = torch.tensor(dataframe[target].values).float()
        self.X = torch.tensor(dataframe[features].values).float()

    def __len__(self):
        return self.X.shape[0]-self.sequence_length +1

    def __getitem__(self, i):
        if i >= self.sequence_head - 1:
            i_start = i - self.sequence_head + 1
            x = self.X[i_start:(i + 1), :]

        else:
            padding = self.X[0].repeat(self.sequence_head - i - 1, 1)
            x = self.X[0:(i + 1), :]
            x = torch.cat((padding, x), 0)


        return x.to(device), self.y[i:i+self.sequence_length].to(device)



train_dataset = SequenceDataset(
    df_train,
    target=target,
    features=features,
    sequence_head=forecast_lead,
    sequence_length=forcast_step,

)
    
    
from torch.utils.data import DataLoader
torch.manual_seed(99)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

for i, (X, y) in enumerate(train_loader):
#     print(X)
    # print(y)
#     print()
    if i >3:break   
    
    from torch.utils.data import DataLoader
torch.manual_seed(99)

    
    
torch.manual_seed(101)

# define the dataset
train_dataset = SequenceDataset(
    df_train,
    target=target,
    features=features,
       sequence_head=forecast_lead,
    sequence_length=forcast_step,
)

test_dataset = SequenceDataset(
    df_test,
    target=target,
    features=["hr"],
    sequence_head=forecast_lead,
    sequence_length=forcast_step,
)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

X, y = next(iter(train_loader))

print("Features shape:", X.shape)
print("Target shape:", y.shape)  
    
    
from torch import nn

# define the model
class ShallowRegressionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        self.num_directions = 1 # 单向LSTM
        self.batch_size = batch_size
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
        self.linear = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input_seq):
        batch_size, seq_len = input_seq.shape[0], input_seq.shape[1]
        h_0 = torch.randn(self.num_directions * self.num_layers, batch_size, self.hidden_size).to(device)
        c_0 = torch.randn(self.num_directions * self.num_layers, batch_size, self.hidden_size).to(device)
        output, _ = self.lstm(input_seq, (h_0, c_0))
        pred = self.linear(output)
        pred = pred[:, -1, :]
        return pred



class ShallowRegressionRNN(nn.Module):
    def __init__(self, num_sensors, hidden_units):
        super().__init__()
        self.num_sensors = num_sensors  # this is the number of features
        self.hidden_units = hidden_units
        self.num_layers = 1

        self.rnn = nn.RNN(
            input_size=num_sensors,
            hidden_size=hidden_units,
            batch_first=True,
            num_layers=self.num_layers
        )

        self.linear = nn.Linear(in_features=self.hidden_units, out_features=1)

    def forward(self, x):
        batch_size = x.shape[0]
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_().to(device)

        _, hn = self.rnn(x, h0)
        out = self.linear(hn).flatten()

        return out    
    
    
# instantiated the model
num_hidden_units = 512
# loss_function = nn.MSELoss()
model_lstm = ShallowRegressionLSTM(input_size=len(features), hidden_size=num_hidden_units, num_layers=1,output_size=forcast_step,batch_size=batch_size)
class RMSELoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()

    def forward(self,yhat,y):
        return torch.sqrt(self.mse(yhat,y))

loss_function = RMSELoss()
def train_model(data_loader, model, loss_function, optimizer):
    num_batches = len(data_loader)
    total_loss = 0
    model.to(device)
    model.train()

    for X, y in data_loader:
        # print(X.shape)
        output = model(X)
        loss = loss_function(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / num_batches
    print(f"Train loss: {avg_loss}")

def test_model(data_loader, model, loss_function):

    num_batches = len(data_loader)
    total_loss = 0

    model.eval()
    with torch.no_grad():
        for X, y in data_loader:
            output = model(X)
            total_loss += loss_function(output, y).item()

    avg_loss = total_loss / num_batches
    print(f"Test loss: {avg_loss}")
    return avg_loss


print("Untrained test\n--------")
# test_model(test_loader, model, loss_function)

avg_loss = 1
model_lstm.to(device)    
    
    
learning_rate = 5e-4
# train the model
optimizer = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
for ix_epoch in range(150):
    print(f"Epoch {ix_epoch}\n---------")
    train_model(train_loader, model_lstm, loss_function, optimizer=optimizer)
    temp = test_model(test_loader, model_lstm, loss_function)
    if temp < avg_loss:
        avg_loss = temp
        torch.save(model_lstm.state_dict(), "model_lstm_%s_%s.pt"% (forecast_lead,forcast_step))
    # if ix_epoch % 20 == 0:
    #     learning_rate = learning_rate * 0.6
    #     optimizer = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
    #     print(learning_rate)
    print()    
    
    
    
# save the model
# torch.save(model_lstm.state_dict(), "model_lstm.pt")

model_lstm.load_state_dict(torch.load("model_lstm_%s_%s.pt"% (forecast_lead,forcast_step)))    
    
# predict the model
def predict(data_loader, model):

    output = torch.tensor([]).to(device)
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            y_star = model(X)
            output = torch.cat((output, y_star), 0)

    return output


train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

ystar_col = "Model forecast"
pre = predict(train_eval_loader, model_lstm).cpu().numpy()
print(pre.shape)
df_train[ystar_col] = predict(train_eval_loader, model_lstm).cpu().numpy()
df_test[ystar_col] = predict(test_loader, model_lstm).cpu().numpy()

df_out = pd.concat((df_train, df_test))[[target, ystar_col]]



for c in df_out.columns:
    df_out[c] = df_out[c] * target_stdev + target_mean

print(df_out)
    
    
# use last predict data to be the next input
def predict_window(data_loader, model, forecast_step=2000):
    output = torch.tensor([]).to(device)
    model.eval()
    count = 0
    with torch.no_grad():
        for X, _ in data_loader:
            y_star = model(X)
            output = torch.cat((output, y_star), 0)
            count +=1
            # print(X)
            if count > forecast_lead:
                break
        for i in range(forecast_step-1):
            y_star = model(output[output.shape[0]-forecast_lead:].reshape(1,forecast_lead,1))
            print(output)
            print(output[output.shape[0]-forecast_lead:])
            # y_star = model(output.reshape(1,forecast_lead,1))
            output = torch.cat((output, y_star), 0)
            if i > 10:
                break

    return output
res = predict_window(test_loader, model_lstm).cpu().numpy()
print(res)
plt.plot(res)



import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score,mean_absolute_percentage_error
fig, ax = plt.subplots(figsize=(12, 6))
df_out[8000:].plot(ax=ax)
ax.set_title("LSTM model forecast")
ax.set_ylabel("ECG")
ax.set_xlabel("Time")
plt.show()

# calculate the  error
# calculate the  AIC
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score,mean_absolute_percentage_error

def calculate_aic(y_true, y_pred, num_params):
    mse = mean_squared_error(y_true, y_pred)
    aic = len(y_true) * np.log(mse) + 2 * num_params
    return aic

mse = mean_squared_error(df_out[target], df_out[ystar_col])
mae = mean_absolute_error(df_out[target], df_out[ystar_col])
r2 = r2_score(df_out[target], df_out[ystar_col])
mape = mean_absolute_percentage_error(df_out[target], df_out[ystar_col])
print(f"R2: {r2:.6f}")
print(f"MAPE: {mape:.6f}")
print(f"MAE: {mae:.6f} ")
print(f"RMSE: {np.sqrt(mse):.6f}")
print(f"mse: {mse:.6f}")
print(f"AIC: {calculate_aic(df_out[target], df_out[ystar_col], 1):.6f}")


num_hidden_units = 128
# use sdg loss
loss_function = nn.MSELoss()
model = ShallowRegressionRNN(num_sensors=len(features), hidden_units=num_hidden_units)
model.to(device)
avg_loss = 1
# loss_function = RMSELoss()

learning_rate = 1e-4

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
for ix_epoch in range(500):
    print(f"Epoch {ix_epoch}\n---------")
    train_model(train_loader, model, loss_function, optimizer=optimizer)
    temp = test_model(test_loader, model, loss_function)
    if temp < avg_loss:
        avg_loss = temp
        torch.save(model.state_dict(), "model_RNN.pt")
    print()

# save the model
# torch.save(model.state_dict(), "model_RNN.pt")

# load the model
model.load_state_dict(torch.load("model_RNN.pt"))
print(avg_loss)


res = predict_window(test_loader, model).cpu().numpy()
print(len(res))
plt.plot(res)

def predict(data_loader, model):

    output = torch.tensor([]).to(device)
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            y_star = model(X)
            output = torch.cat((output, y_star), 0)

    return output


train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

ystar_col = "Model forecast"
df_train[ystar_col] = predict(train_eval_loader, model).cpu().numpy()
df_test[ystar_col] = predict(test_loader, model).cpu().numpy()

df_out = pd.concat((df_train, df_test))[[target, ystar_col]]

# for c in df_out.columns:
#     df_out[c] = df_out[c] * target_stdev + target_mean

print(df_out)

import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(12, 6))
df_out[8000:].plot(ax=ax)
ax.set_title("RNN model forecast")
ax.set_ylabel("ECG")
ax.set_xlabel("Time")
plt.show()


from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score,mean_absolute_percentage_error
mse = mean_squared_error(df_out[target], df_out[ystar_col])
mae = mean_absolute_error(df_out[target], df_out[ystar_col])
r2 = r2_score(df_out[target], df_out[ystar_col])
mape = mean_absolute_percentage_error(df_out[target], df_out[ystar_col])
print(f"R2: {r2:.6f}")
print(f"MAPE: {mape:.6f}")
print(f"MAE: {mae:.6f} ")
print(f"RMSE: {np.sqrt(mse):.6f}")
print(f"mse: {mse:.6f}")
print(f"AIC: {calculate_aic(df_out[target], df_out[ystar_col], 1):.6f}")
# aic_res = calaic(df_out[target], df_out[ystar_col], df_out.shape[1])
# print(f"AIC: {aic_res:.6f}")

工学博士,担任《Mechanical System and Signal Processing》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/577940.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

elasticsearch 常用语法汇总

文章目录 前言elasticsearch 常用语法汇总1. 创建索引2. 检索索引信息3. 删除索引4. 文档操作4.1. 对blog_new索引指定文档ID新增4.2. 对blog_new索引不指定文档ID新增&#xff0c;随机文档ID:4.3. 获取文档4.4. 更新文档4.5. 删除文档 5. 查询5.1. 匹配查询5.2. 范围查询5.3. …

计算机视觉的应用29-卷积神经网络(CNN)中的变种:分组卷积、转置卷积、空洞卷积的计算过程

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用29-卷积神经网络(CNN)中的变种&#xff1a;分组卷积、转置卷积、空洞卷积的计算过程。分组卷积将输入通道分为几组&#xff0c;对每组独立进行卷积操作&#xff0c;以减少计算量和模型参数。转置卷…

vue如何发送请求给后端(包括前后端跨域)

目录 有哪些方法可以发送请求要请求先解决跨域问题代理服务器后端解决跨域问题 axios发送请求vue-resource发送请求 有哪些方法可以发送请求 以前可能了解过&#xff1a; xhr 即&#xff1a;new XMLHttpRequest()jQuery 即&#xff1a;$.get $.postaxios fetch 在vue中特有的…

Leetcode 剑指 Offer II 075.数组的相对排序

题目难度: 简单 原题链接 今天继续更新 Leetcode 的剑指 Offer&#xff08;专项突击版&#xff09;系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 给定两个数组&#xff0c;arr1 和 arr2&#xff0c; arr2 中的元…

Java NIO

1. IO分类概述 1.1 阻塞与非阻塞 阻塞&#xff08;Blocking&#xff09;和非阻塞&#xff08;Nonblocking&#xff09;是在计算机编程中用于描述I/O操作的两个重要概念。阻塞与非阻塞描述的是线程在访问某个资源时&#xff0c;在该资源没有准备就绪期间的处理方式。 1、阻塞&a…

Android使用AlertDialog实现弹出菜单

最近又开始捣鼓APP&#xff0c;许多api , class都忘记怎么用了&#xff0c;楼下使用AlertDialog实现个弹出菜单&#xff0c;结果直接crash&#xff0c;查了半天&#xff0c;终于即将&#xff0c;记录一下…… 1 实现代码 AlertDialog.Builder mBuilder new AlertDialog.Builde…

后端工程师——C++工程师如何准备面试?

相比 Java 语言方向,C++ 入门简单,精通难,找工作竞争压力更小,但 C++ 依然是近年来招聘的热门岗位之一。本文将从以下三个方面进行详细讲解,帮助你对 C++ 相关岗位的就业前景、岗位要求、学习路线等有更充分的了解。 C++工程师面试准备 上两篇文章对 C++ 工程师的招聘需求…

SpringCloud系列(17)--将服务消费者Consumer注册进Zookeeper

前言&#xff1a;在上一章节中我们把服务提供者Provider注册进了Zookeeper&#xff0c;而本章节则是关于如何将服务消费者Consumer注册进Zookeeper 1、再次创建一个服务提供者模块&#xff0c;命名为consumerzk-order80 (1)在父工程下新建模块 (2)选择模块的项目类型为Maven并…

HPE Aruba Networking推出新一代Wi-Fi 7接入点 助力企业高效应对安全、AI与物联网挑战

HPE ArubaNetworking推出的全新Wi-Fi 7接入点&#xff0c;提供全面的AI就绪边缘IT解决方案&#xff0c;旨在为用户和物联网设备提供安全、高性能的连接服务&#xff0c;以实现数据的捕获和路由&#xff0c;从而满足AI训练和推理需求 休斯顿-2024年4月23日-慧与科技(NYSE: HPE)近…

【golang学习之旅】深入理解字符串string数据类型

系列文章 【golang学习之旅】报错&#xff1a;a declared but not used 【golang学习之旅】Go 的基本数据类型 目录 系列文章使用示例string的底层数据结构关于字符串复制字符串是不可变的如何高效的进行字符串拼接&#xff1f; 使用示例 Go 语言中的字符串只是一个只读的字节…

Spring boot + Redis + Spring Cache 实现缓存

学习 Redis 的 value 有 5 种常用的数据结构 Redis 存储的是 key-value 结构的数据。key 是字符串类型&#xff0c;value 有 5 种常用的数据结构&#xff1a; Redis 的图形化工具 Another Redis Desktop Manager Spring Data Redis Redis 的 Java 客户端。 Spring Cache Spr…

AI工具集:解锁智能新境界,一站式解决你的所有需求!

在这个信息爆炸的时代&#xff0c;我们每天都在与大量的数据和信息打交道。如何高效地处理这些信息&#xff0c;提高工作效率和生活品质&#xff0c;成为了我们亟待解决的问题。而AI工具集(AI-321.com)的出现&#xff0c;无疑为我们提供了一把解锁智能新境界的钥匙。 AI-321 | …

VirtualBox7.0.16的蓝屏大坑与ssh登陆ubuntu虚拟机的办法

背景&#xff1a; 安装了最新版的VirtualBox&#xff0c;装了ubuntu系统&#xff0c;在win10下通过ssh死活无法与ubuntu进行正常登陆控制。 然后开始了踩坑。 问题1&#xff1a;ssh登陆失败&#xff0c;但是主机能ping通ubuntu&#xff0c;反过来也能ping通&#xff0c;网络…

地学研究相关工具推荐0426

地学研究相关工具推荐0426 文章目录 地学研究相关工具推荐0426前言工具PanoplyFileZillaGetData Graph DigitizerZotero**谷谷GIS地图下载器** 总结 前言 以下这些工具是之前在进行一些研究过程中使用过的工具&#xff0c;在之后的研究中可能会用到&#xff0c;推荐给大家&…

Unity类银河恶魔城学习记录14-5 p152 Lost currency save and enemy‘s currency drop

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili LostCurrencyController.cs using System.Collections; using System.Colle…

每天五分钟深度学习:如何理解梯度下降算法可以逼近全局最小值?

本文重点 上节课程中,我们已经知道了逻辑回归的代价函数J。要想最小化代价函数,我们需要使用梯度下降算法。 梯度下降算法地直观理解: 为了可视化,我们假设w和b都是单一实数,实际上,w可以是更高地维度。 代价函数J是在水平轴w和b上的曲面,因此曲面的高度就是J(w,b)在…

井字棋游戏

1. 游戏创建 1.1导包 from tkinter import * import numpy as np import math import tkinter.messagebox 1.2 窗口内容 1.2.1创建一个窗口 root Tk() # 窗口名称 root.title("井字棋 from Sun") 1.2.2 创建一个框架&#xff0c;将其放置在窗口中 Frame1 F…

如何进行域名解析?如何清理DNS缓存?(附源码)

目录 1、什么是域名&#xff1f; 2、为什么使用域名&#xff1f; 3、域名解析的完整流程 4、调用gethostbyname系统接口将域名解析成IP地址 5、为什么需要清理系统DNS缓存&#xff1f; 6、使用cmd命令清理DNS缓存 7、通过代码去清除系统DNS缓存 C软件异常排查从入门到精…

图像分类导论:从模型设计到端到端

书籍&#xff1a;An Introduction to Image Classification&#xff1a;From Designed Models to End-to-End Learning 作者&#xff1a;Klaus D. Toennies 出版&#xff1a;Springer Singapore 书籍下载-《图像分类导论》图像分类的传统方法包括在特征空间中进行特征提取和…

怎么提高职场辩论的口才能力的方法

提高职场辩论的口才能力是一个综合而复杂的过程&#xff0c;涉及知识积累、技巧学习、实践锻炼等多个方面。以下是关于如何提高职场辩论口才能力的详细分析和建议。 一、引言 在职场中&#xff0c;良好的口才能力对于个人职业发展具有重要意义。优秀的口才不仅能够提升个人的…
最新文章