import tkinter as tk from tkinter import ttk, filedialog, messagebox import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import numpy as np import pandas as pd import datetime import json import os class ValveMonitoringSystem: def __init__(self, root): self.root = root self.root.title("阀门故障检测系统") self.root.geometry("1200x800") # 初始化数据 self.time_data = np.array([]) self.amplitude_data = np.array([]) self.freq_data = np.array([]) self.fft_data = np.array([]) self.valve_status = "正常" self.sensor_status = "正常" self.valve_id = "V-2024-001" self.history_data = [] self.setup_ui() self.load_history() def setup_ui(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # 配置行列权重 self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(1, weight=1) # 标题 title_label = ttk.Label(main_frame, text="阀门故障检测系统", font=("Arial", 16, "bold")) title_label.grid(row=0, column=0, columnspan=3, pady=(0, 10)) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="控制面板", padding="10") control_frame.grid(row=1, column=0, sticky=(tk.N, tk.S, tk.W), padx=(0, 10)) # 阀门信息 ttk.Label(control_frame, text="阀门编号:").grid(row=0, column=0, sticky=tk.W, pady=2) self.valve_id_entry = ttk.Entry(control_frame, width=15) self.valve_id_entry.grid(row=0, column=1, pady=2) self.valve_id_entry.insert(0, self.valve_id) ttk.Label(control_frame, text="运行状态:").grid(row=1, column=0, sticky=tk.W, pady=2) self.status_var = tk.StringVar(value=self.valve_status) status_combo = ttk.Combobox(control_frame, textvariable=self.status_var, values=["正常", "轻微故障", "严重故障", "关闭"], width=13) status_combo.grid(row=1, column=1, pady=2) ttk.Label(control_frame, text="传感器状态:").grid(row=2, column=0, sticky=tk.W, pady=2) self.sensor_var = tk.StringVar(value=self.sensor_status) sensor_combo = ttk.Combobox(control_frame, textvariable=self.sensor_var, values=["正常", "异常"], width=13) sensor_combo.grid(row=2, column=1, pady=2) # 按钮 ttk.Button(control_frame, text="导入数据", command=self.import_data).grid(row=3, column=0, columnspan=2, pady=10) ttk.Button(control_frame, text="生成曲线", command=self.generate_plots).grid(row=4, column=0, columnspan=2, pady=5) ttk.Button(control_frame, text="计算特征", command=self.calculate_features).grid(row=5, column=0, columnspan=2, pady=5) ttk.Button(control_frame, text="保存状态", command=self.save_status).grid(row=6, column=0, columnspan=2, pady=5) ttk.Button(control_frame, text="查看历史", command=self.view_history).grid(row=7, column=0, columnspan=2, pady=5) # 中间图表区域 chart_frame = ttk.Frame(main_frame) chart_frame.grid(row=1, column=1, sticky=(tk.N, tk.S, tk.E, tk.W)) # 创建图表 self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(8, 6)) self.fig.subplots_adjust(hspace=0.5) self.canvas = FigureCanvasTkAgg(self.fig, chart_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 右侧信息面板 info_frame = ttk.LabelFrame(main_frame, text="特征信息", padding="10") info_frame.grid(row=1, column=2, sticky=(tk.N, tk.S, tk.E), padx=(10, 0)) # 时域特征 ttk.Label(info_frame, text="时域特征", font=("Arial", 10, "bold")).grid(row=0, column=0, columnspan=2, pady=(0, 10)) time_features = ["均值", "方差", "均方根", "峰值", "峰峰值", "波形因子", "脉冲因子", "裕度因子"] self.time_feature_vars = {} for i, feature in enumerate(time_features): ttk.Label(info_frame, text=feature + ":").grid(row=i+1, column=0, sticky=tk.W, pady=2) var = tk.StringVar(value="0.0") ttk.Label(info_frame, textvariable=var).grid(row=i+1, column=1, sticky=tk.W, pady=2) self.time_feature_vars[feature] = var # 频域特征 ttk.Label(info_frame, text="频域特征", font=("Arial", 10, "bold")).grid(row=9, column=0, columnspan=2, pady=(10, 10)) freq_features = ["重心频率", "均方频率", "频率方差", "频率标准差", "幅度最大值", "主要频率", "频率幅值方差", "频谱熵"] self.freq_feature_vars = {} for i, feature in enumerate(freq_features): ttk.Label(info_frame, text=feature + ":").grid(row=i+10, column=0, sticky=tk.W, pady=2) var = tk.StringVar(value="0.0") ttk.Label(info_frame, textvariable=var).grid(row=i+10, column=1, sticky=tk.W, pady=2) self.freq_feature_vars[feature] = var # 状态栏 status_bar = ttk.Frame(self.root) status_bar.grid(row=2, column=0, sticky=(tk.W, tk.E)) self.status_label = ttk.Label(status_bar, text="就绪", relief=tk.SUNKEN, anchor=tk.W) self.status_label.pack(fill=tk.X) def import_data(self): file_path = filedialog.askopenfilename( title="选择数据文件", filetypes=[("CSV文件", "*.csv"), ("文本文件", "*.txt"), ("所有文件", "*.*")] ) if not file_path: return try: # 尝试读取CSV文件 df = pd.read_csv(file_path) if len(df.columns) < 2: messagebox.showerror("错误", "数据文件需要至少包含两列(时间和振幅)") return self.time_data = df.iloc[:, 0].values self.amplitude_data = df.iloc[:, 1].values self.status_label.config(text=f"已导入数据: {os.path.basename(file_path)}") messagebox.showinfo("成功", f"成功导入{len(self.time_data)}个数据点") except Exception as e: messagebox.showerror("错误", f"读取文件时出错: {str(e)}") def generate_plots(self): if len(self.time_data) == 0 or len(self.amplitude_data) == 0: messagebox.showwarning("警告", "请先导入数据") return # 生成时域图 self.ax1.clear() self.ax1.plot(self.time_data, self.amplitude_data, 'b-') self.ax1.set_title('振幅时域曲线') self.ax1.set_xlabel('时间 (s)') self.ax1.set_ylabel('振幅') self.ax1.grid(True) # 生成频域图 self.ax2.clear() n = len(self.amplitude_data) if n > 0: # 计算FFT sampling_rate = 1 / (self.time_data[1] - self.time_data[0]) if n > 1 else 1 fft_values = np.fft.fft(self.amplitude_data) fft_freq = np.fft.fftfreq(n, 1/sampling_rate) # 只取正频率部分 positive_freq = fft_freq[:n//2] positive_fft = np.abs(fft_values[:n//2]) self.freq_data = positive_freq self.fft_data = positive_fft self.ax2.plot(positive_freq, positive_fft, 'r-') self.ax2.set_title('频域波形') self.ax2.set_xlabel('频率 (Hz)') self.ax2.set_ylabel('幅度') self.ax2.grid(True) self.canvas.draw() self.status_label.config(text="已生成时域和频域曲线") def calculate_features(self): if len(self.amplitude_data) == 0: messagebox.showwarning("警告", "请先导入数据并生成曲线") return # 计算时域特征 amplitude = self.amplitude_data # 均值 mean = np.mean(amplitude) # 方差 variance = np.var(amplitude) # 均方根 rms = np.sqrt(np.mean(amplitude**2)) # 峰值 peak = np.max(np.abs(amplitude)) # 峰峰值 peak_to_peak = np.max(amplitude) - np.min(amplitude) # 波形因子 form_factor = rms / (np.mean(np.abs(amplitude)) + 1e-10) # 脉冲因子 impulse_factor = peak / (np.mean(np.abs(amplitude)) + 1e-10) # 裕度因子 margin_factor = peak / (np.mean(np.abs(amplitude)) ** 2 + 1e-10) # 更新时域特征显示 self.time_feature_vars["均值"].set(f"{mean:.4f}") self.time_feature_vars["方差"].set(f"{variance:.4f}") self.time_feature_vars["均方根"].set(f"{rms:.4f}") self.time_feature_vars["峰值"].set(f"{peak:.4f}") self.time_feature_vars["峰峰值"].set(f"{peak_to_peak:.4f}") self.time_feature_vars["波形因子"].set(f"{form_factor:.4f}") self.time_feature_vars["脉冲因子"].set(f"{impulse_factor:.4f}") self.time_feature_vars["裕度因子"].set(f"{margin_factor:.4f}") # 计算频域特征 if len(self.fft_data) > 0: fft_magnitude = self.fft_data freq = self.freq_data # 重心频率 fc = np.sum(freq * fft_magnitude) / (np.sum(fft_magnitude) + 1e-10) # 均方频率 msf = np.sum(freq**2 * fft_magnitude) / (np.sum(fft_magnitude) + 1e-10) # 频率方差 vf = np.sum((freq - fc)**2 * fft_magnitude) / (np.sum(fft_magnitude) + 1e-10) # 频率标准差 freq_std = np.sqrt(vf) # 幅度最大值 max_magnitude = np.max(fft_magnitude) # 主要频率 main_freq = freq[np.argmax(fft_magnitude)] # 频率幅值方差 magnitude_variance = np.var(fft_magnitude) # 频谱熵 norm_fft = fft_magnitude / (np.sum(fft_magnitude) + 1e-10) spectral_entropy = -np.sum(norm_fft * np.log(norm_fft + 1e-10)) # 更新频域特征显示 self.freq_feature_vars["重心频率"].set(f"{fc:.4f}") self.freq_feature_vars["均方频率"].set(f"{msf:.4f}") self.freq_feature_vars["频率方差"].set(f"{vf:.4f}") self.freq_feature_vars["频率标准差"].set(f"{freq_std:.4f}") self.freq_feature_vars["幅度最大值"].set(f"{max_magnitude:.4f}") self.freq_feature_vars["主要频率"].set(f"{main_freq:.4f}") self.freq_feature_vars["频率幅值方差"].set(f"{magnitude_variance:.4f}") self.freq_feature_vars["频谱熵"].set(f"{spectral_entropy:.4f}") self.status_label.config(text="已计算时域和频域特征") def save_status(self): # 获取当前状态 self.valve_id = self.valve_id_entry.get() self.valve_status = self.status_var.get() self.sensor_status = self.sensor_var.get() # 创建记录 record = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "valve_id": self.valve_id, "status": self.valve_status, "sensor_status": self.sensor_status, "time_features": {k: v.get() for k, v in self.time_feature_vars.items()}, "freq_features": {k: v.get() for k, v in self.freq_feature_vars.items()} } # 添加到历史记录 self.history_data.append(record) # 保存到文件 try: with open("valve_history.json", "w") as f: json.dump(self.history_data, f, indent=2) self.status_label.config(text="状态已保存") messagebox.showinfo("成功", "阀门状态和特征值已保存到历史记录") except Exception as e: messagebox.showerror("错误", f"保存历史记录时出错: {str(e)}") def load_history(self): try: if os.path.exists("valve_history.json"): with open("valve_history.json", "r") as f: self.history_data = json.load(f) except: self.history_data = [] def view_history(self): # 创建历史记录窗口 history_window = tk.Toplevel(self.root) history_window.title("历史记录") history_window.geometry("800x500") # 创建表格 columns = ("时间", "阀门编号", "状态", "传感器状态") tree = ttk.Treeview(history_window, columns=columns, show="headings") for col in columns: tree.heading(col, text=col) tree.column(col, width=150) # 添加数据 for record in self.history_data[-50:]: # 显示最近50条记录 tree.insert("", "end", values=( record["timestamp"], record["valve_id"], record["status"], record["sensor_status"] )) # 添加滚动条 scrollbar = ttk.Scrollbar(history_window, orient=tk.VERTICAL, command=tree.yview) tree.configure(yscroll=scrollbar.set) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) tree.pack(fill=tk.BOTH, expand=True) # 详情按钮 def show_details(): selected = tree.focus() if not selected: return item = tree.item(selected) timestamp = item["values"][0] # 查找详细记录 for record in self.history_data: if record["timestamp"] == timestamp: # 创建详情窗口 detail_window = tk.Toplevel(history_window) detail_window.title(f"详细信息 - {timestamp}") detail_window.geometry("600x400") # 创建文本框 text_widget = tk.Text(detail_window, wrap=tk.WORD) text_widget.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 添加信息 info = f"时间: {record['timestamp']}\n" info += f"阀门编号: {record['valve_id']}\n" info += f"运行状态: {record['status']}\n" info += f"传感器状态: {record['sensor_status']}\n\n" info += "时域特征:\n" for feature, value in record["time_features"].items(): info += f" {feature}: {value}\n" info += "\n频域特征:\n" for feature, value in record["freq_features"].items(): info += f" {feature}: {value}\n" text_widget.insert(tk.END, info) text_widget.config(state=tk.DISABLED) break detail_button = ttk.Button(history_window, text="查看详情", command=show_details) detail_button.pack(pady=10) def main(): root = tk.Tk() app = ValveMonitoringSystem(root) root.mainloop() if __name__ == "__main__": main()