#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys import os import json import requests import paramiko from PyQt5.QtWidgets import (QApplication, QMainWindow, QLabel, QVBoxLayout, QWidget, QProgressBar, QTextEdit, QPushButton) from PyQt5.QtCore import Qt, QThread, pyqtSignal from PyQt5.QtGui import QDragEnterEvent, QDropEvent, QFont class TestDirectoryThread(QThread): """测试目录可写性线程""" result = pyqtSignal(bool, str) # 成功/失败,目录路径或错误信息 def __init__(self, test_dir, sftp_config): super().__init__() self.test_dir = test_dir self.sftp_config = sftp_config def run(self): try: transport = paramiko.Transport(( self.sftp_config['host'], self.sftp_config['port'] )) transport.connect( username=self.sftp_config['username'], password=self.sftp_config['password'] ) sftp = paramiko.SFTPClient.from_transport(transport) # 测试文件名 test_file = f"{self.test_dir}/.wwy_test_{os.getpid()}" if not self.test_dir.endswith('/'): test_file = f"{self.test_dir}/.wwy_test_{os.getpid()}" else: test_file = f"{self.test_dir}.wwy_test_{os.getpid()}" # 尝试创建测试文件 try: with sftp.open(test_file, 'w') as f: f.write('test') # 删除测试文件 try: sftp.remove(test_file) except: pass sftp.close() transport.close() self.result.emit(True, self.test_dir) except Exception as e: sftp.close() transport.close() self.result.emit(False, str(e)) except Exception as e: self.result.emit(False, str(e)) class UploadThread(QThread): """上传线程""" progress = pyqtSignal(int, str) # 进度,状态信息 finished = pyqtSignal(bool, str) # 成功/失败,消息 def __init__(self, sftp_config, file_path, remote_dir): super().__init__() self.sftp_config = sftp_config self.file_path = file_path self.remote_dir = remote_dir def run(self): try: # 连接SFTP self.progress.emit(10, f'正在连接服务器...') transport = paramiko.Transport(( self.sftp_config['host'], self.sftp_config['port'] )) transport.connect( username=self.sftp_config['username'], password=self.sftp_config['password'] ) sftp = paramiko.SFTPClient.from_transport(transport) self.progress.emit(30, f'连接成功,开始上传...') # 获取文件名 filename = os.path.basename(self.file_path) # 构建远程路径 if self.remote_dir.endswith('/'): remote_path = f'{self.remote_dir}{filename}' else: remote_path = f'{self.remote_dir}/{filename}' # 上传文件(带进度)- 使用临时文件避免.fuse_hidden问题 file_size = os.path.getsize(self.file_path) uploaded = 0 # 使用临时文件名 import time temp_remote_path = f'{remote_path}.uploading_{int(time.time() * 1000)}' def callback(transferred, total): nonlocal uploaded uploaded = transferred percent = int((transferred / total) * 100) if total > 0 else 0 # 进度从30%到90% progress_value = 30 + int(percent * 0.6) # 计算速度 size_mb = transferred / (1024 * 1024) self.progress.emit(progress_value, f'上传中: {filename} ({size_mb:.2f} MB / {total/(1024*1024):.2f} MB)') # 第一步:上传到临时文件 sftp.put(self.file_path, temp_remote_path, callback=callback) # 第二步:删除旧文件(如果存在) try: sftp.stat(remote_path) sftp.remove(remote_path) except: pass # 文件不存在,无需删除 # 第三步:重命名临时文件为目标文件 sftp.rename(temp_remote_path, remote_path) # 关闭连接 sftp.close() transport.close() self.progress.emit(100, f'上传完成!') self.finished.emit(True, f'文件 {filename} 上传成功!') except Exception as e: self.finished.emit(False, f'上传失败: {str(e)}') class UploadWindow(QMainWindow): def __init__(self): super().__init__() self.config = self.load_config() self.sftp_config = None self.remote_dir = '/' # 默认上传目录 self.upload_queue = [] # 上传队列 self.is_uploading = False # 是否正在上传 self.initUI() self.get_sftp_config() def load_config(self): """加载配置文件""" try: # PyInstaller打包后使用sys._MEIPASS if getattr(sys, 'frozen', False): # 打包后的exe base_path = os.path.dirname(sys.executable) else: # 开发环境 base_path = os.path.dirname(__file__) config_path = os.path.join(base_path, 'config.json') if not os.path.exists(config_path): from PyQt5.QtWidgets import QMessageBox QMessageBox.critical(None, '错误', f'找不到配置文件: {config_path}\n\n请确保config.json与程序在同一目录下!') sys.exit(1) with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: from PyQt5.QtWidgets import QMessageBox QMessageBox.critical(None, '错误', f'加载配置失败:\n{str(e)}') sys.exit(1) def get_sftp_config(self): """从服务器获取SFTP配置""" try: response = requests.post( f"{self.config['api_base_url']}/api/upload/get-config", json={'api_key': self.config['api_key']}, timeout=10 ) if response.status_code == 200: data = response.json() if data['success']: self.sftp_config = data['sftp_config'] # 自动测试并设置上传目录 self.test_and_set_upload_directory() else: self.show_error(data.get('message', '获取配置失败')) else: self.show_error(f'服务器错误: {response.status_code}') except Exception as e: self.show_error(f'无法连接到服务器: {str(e)}') def test_and_set_upload_directory(self): """测试并设置上传目录""" if not self.sftp_config: return self.log('开始测试上传目录...') self.status_label.setText( f'

玩玩云上传工具 v2.0

' f'

正在测试上传目录...

' ) # 按优先级测试目录 self.test_dirs = ['/', '/upload', '/uploads', '/files', '/home', '/tmp'] self.current_test_index = 0 self.test_next_directory() def test_next_directory(self): """测试下一个目录""" if self.current_test_index >= len(self.test_dirs): self.log('✗ 所有目录都无法写入,请检查SFTP权限') self.show_error('无法找到可写入的目录,请检查SFTP权限') return test_dir = self.test_dirs[self.current_test_index] self.log(f'测试目录: {test_dir}') self.test_thread = TestDirectoryThread(test_dir, self.sftp_config) self.test_thread.result.connect(self.on_test_result) self.test_thread.start() def on_test_result(self, success, message): """处理测试结果""" if success: self.remote_dir = message self.log(f'✓ 已设置上传目录为: {message}') self.status_label.setText( f'

玩玩云上传工具 v2.0

' f'

✓ 已连接 - 用户: {self.config["username"]}

' f'

拖拽文件/文件夹到此处上传

' f'

上传目录: {self.remote_dir}

' ) else: self.log(f'✗ 目录 {self.test_dirs[self.current_test_index]} 不可写: {message}') self.current_test_index += 1 self.test_next_directory() def show_error(self, message): """显示错误信息""" self.status_label.setText( f'

玩玩云上传工具 v2.0

' f'

✗ 错误: {message}

' f'

请检查网络连接或联系管理员

' ) def initUI(self): """初始化界面""" self.setWindowTitle('玩玩云上传工具 v2.0') self.setGeometry(300, 300, 500, 450) # 设置接受拖拽 self.setAcceptDrops(True) # 中心部件 central_widget = QWidget() self.setCentralWidget(central_widget) # 布局 layout = QVBoxLayout() # 状态标签 self.status_label = QLabel('正在连接服务器...') self.status_label.setAlignment(Qt.AlignCenter) self.status_label.setFont(QFont('Arial', 11)) self.status_label.setWordWrap(True) self.status_label.setStyleSheet('padding: 20px;') layout.addWidget(self.status_label) # 拖拽提示区域 self.drop_area = QLabel('📁\n\n支持多文件和文件夹') self.drop_area.setAlignment(Qt.AlignCenter) self.drop_area.setStyleSheet(""" QLabel { font-size: 50px; color: #667eea; border: 3px dashed #667eea; border-radius: 10px; background-color: #f5f7fa; padding: 40px; } """) layout.addWidget(self.drop_area) # 队列状态标签 self.queue_label = QLabel('队列: 0 个文件等待上传') self.queue_label.setAlignment(Qt.AlignCenter) self.queue_label.setStyleSheet('color: #2c3e50; font-weight: bold; padding: 5px;') layout.addWidget(self.queue_label) # 进度条 self.progress_bar = QProgressBar() self.progress_bar.setValue(0) self.progress_bar.setVisible(False) self.progress_bar.setStyleSheet(""" QProgressBar { border: 2px solid #e0e0e0; border-radius: 5px; text-align: center; height: 25px; } QProgressBar::chunk { background-color: #667eea; } """) layout.addWidget(self.progress_bar) # 进度信息 self.progress_label = QLabel('') self.progress_label.setAlignment(Qt.AlignCenter) self.progress_label.setVisible(False) layout.addWidget(self.progress_label) # 日志区域 self.log_text = QTextEdit() self.log_text.setReadOnly(True) self.log_text.setMaximumHeight(100) self.log_text.setStyleSheet(""" QTextEdit { background-color: #f9f9f9; border: 1px solid #e0e0e0; border-radius: 5px; padding: 5px; font-family: 'Courier New', monospace; font-size: 11px; } """) layout.addWidget(self.log_text) central_widget.setLayout(layout) self.log('程序已启动 - 版本 v2.0') def log(self, message): """添加日志""" self.log_text.append(f'[{self.get_time()}] {message}') # 自动滚动到底部 self.log_text.verticalScrollBar().setValue( self.log_text.verticalScrollBar().maximum() ) def get_time(self): """获取当前时间""" from datetime import datetime return datetime.now().strftime('%H:%M:%S') def dragEnterEvent(self, event: QDragEnterEvent): """拖拽进入事件""" if event.mimeData().hasUrls(): event.acceptProposedAction() self.drop_area.setStyleSheet(""" QLabel { font-size: 50px; color: #667eea; border: 3px dashed #667eea; border-radius: 10px; background-color: #e8ecf7; padding: 40px; } """) def dragLeaveEvent(self, event): """拖拽离开事件""" self.drop_area.setStyleSheet(""" QLabel { font-size: 50px; color: #667eea; border: 3px dashed #667eea; border-radius: 10px; background-color: #f5f7fa; padding: 40px; } """) def dropEvent(self, event: QDropEvent): """拖拽放下事件""" self.drop_area.setStyleSheet(""" QLabel { font-size: 50px; color: #667eea; border: 3px dashed #667eea; border-radius: 10px; background-color: #f5f7fa; padding: 40px; } """) if not self.sftp_config: self.log('错误: 未获取到SFTP配置') return paths = [url.toLocalFile() for url in event.mimeData().urls()] all_files = [] for path in paths: if os.path.isfile(path): all_files.append(path) elif os.path.isdir(path): self.log(f'扫描文件夹: {os.path.basename(path)}') folder_files = self.scan_folder(path) all_files.extend(folder_files) self.log(f'找到 {len(folder_files)} 个文件') if all_files: self.upload_queue.extend(all_files) self.update_queue_label() self.log(f'添加 {len(all_files)} 个文件到上传队列') if not self.is_uploading: self.process_upload_queue() def scan_folder(self, folder_path): """递归扫描文件夹""" files = [] try: for root, dirs, filenames in os.walk(folder_path): for filename in filenames: file_path = os.path.join(root, filename) files.append(file_path) except Exception as e: self.log(f'扫描文件夹失败: {str(e)}') return files def update_queue_label(self): """更新队列标签""" count = len(self.upload_queue) self.queue_label.setText(f'队列: {count} 个文件等待上传') def process_upload_queue(self): """处理上传队列""" if not self.upload_queue: self.is_uploading = False self.update_queue_label() self.log('✓ 所有文件上传完成!') return self.is_uploading = True file_path = self.upload_queue.pop(0) self.update_queue_label() self.upload_file(file_path) def upload_file(self, file_path): """上传文件""" self.log(f'开始上传: {os.path.basename(file_path)}') # 显示进度控件 self.progress_bar.setVisible(True) self.progress_bar.setValue(0) self.progress_label.setVisible(True) self.progress_label.setText('准备上传...') # 创建上传线程 self.upload_thread = UploadThread(self.sftp_config, file_path, self.remote_dir) self.upload_thread.progress.connect(self.on_progress) self.upload_thread.finished.connect(self.on_finished) self.upload_thread.start() def on_progress(self, value, message): """上传进度更新""" self.progress_bar.setValue(value) self.progress_label.setText(message) def on_finished(self, success, message): """上传完成""" self.log(message) if success: self.progress_label.setText('✓ ' + message) self.progress_label.setStyleSheet('color: green; font-weight: bold;') else: self.progress_label.setText('✗ ' + message) self.progress_label.setStyleSheet('color: red; font-weight: bold;') # 继续处理队列 from PyQt5.QtCore import QTimer QTimer.singleShot(1000, self.process_upload_queue) def main(): app = QApplication(sys.argv) window = UploadWindow() window.show() sys.exit(app.exec_()) if __name__ == '__main__': main()