#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CUPS 打印机驱动管理器 轻量级Web服务,用于上传和安装打印机驱动 """ import os import sys import subprocess import tempfile import shutil import tarfile import zipfile from pathlib import Path from functools import wraps from flask import Flask, request, render_template_string, redirect, url_for, flash, jsonify, Response from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.urandom(24) # 配置 UPLOAD_FOLDER = '/tmp/cups-drivers' MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB ALLOWED_EXTENSIONS = {'deb', 'ppd', 'gz', 'tar', 'tgz', 'zip', 'rpm', 'sh', 'run'} # 管理员凭据(可通过环境变量设置) ADMIN_USERNAME = os.environ.get('DRIVER_MANAGER_USERNAME', 'admin') ADMIN_PASSWORD = os.environ.get('DRIVER_MANAGER_PASSWORD', 'admin') app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH # 确保上传目录存在 os.makedirs(UPLOAD_FOLDER, exist_ok=True) def check_auth(username, password): """验证用户名和密码""" return username == ADMIN_USERNAME and password == ADMIN_PASSWORD def authenticate(): """发送401响应""" return Response( '需要登录才能访问驱动管理器\n', 401, {'WWW-Authenticate': 'Basic realm="CUPS Driver Manager"'} ) def requires_auth(f): """认证装饰器""" @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated def allowed_file(filename): """检查文件类型是否允许""" if '.' not in filename: return False ext = filename.rsplit('.', 1)[1].lower() # 处理 .tar.gz 情况 if filename.endswith('.tar.gz'): return True return ext in ALLOWED_EXTENSIONS def get_file_type(filename): """获取文件类型""" filename_lower = filename.lower() if filename_lower.endswith('.deb'): return 'deb' elif filename_lower.endswith('.ppd') or filename_lower.endswith('.ppd.gz'): return 'ppd' elif filename_lower.endswith('.tar.gz') or filename_lower.endswith('.tgz'): return 'tar.gz' elif filename_lower.endswith('.tar'): return 'tar' elif filename_lower.endswith('.zip'): return 'zip' elif filename_lower.endswith('.rpm'): return 'rpm' elif filename_lower.endswith('.sh') or filename_lower.endswith('.run'): return 'script' else: return 'unknown' def run_command(cmd, shell=False): """执行命令并返回结果""" try: if shell: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300) else: result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) return { 'success': result.returncode == 0, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode } except subprocess.TimeoutExpired: return {'success': False, 'stdout': '', 'stderr': '命令执行超时', 'returncode': -1} except Exception as e: return {'success': False, 'stdout': '', 'stderr': str(e), 'returncode': -1} def install_deb(filepath): """安装 .deb 包""" results = [] # 先尝试直接安装 result = run_command(['dpkg', '-i', filepath]) results.append(('dpkg -i', result)) # 修复依赖 if not result['success']: fix_result = run_command(['apt-get', 'install', '-f', '-y']) results.append(('apt-get install -f', fix_result)) return results def install_ppd(filepath): """安装 .ppd 文件""" results = [] # PPD文件目录 ppd_dirs = [ '/usr/share/ppd/custom', '/usr/share/cups/model' ] # 确保目录存在 for ppd_dir in ppd_dirs: os.makedirs(ppd_dir, exist_ok=True) # 复制PPD文件 filename = os.path.basename(filepath) dest = os.path.join(ppd_dirs[0], filename) try: shutil.copy2(filepath, dest) os.chmod(dest, 0o644) results.append(('复制PPD文件', { 'success': True, 'stdout': f'已复制到 {dest}', 'stderr': '', 'returncode': 0 })) except Exception as e: results.append(('复制PPD文件', { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': 1 })) return results def install_tar_gz(filepath): """安装 .tar.gz 包""" results = [] # 创建临时解压目录 extract_dir = tempfile.mkdtemp(prefix='driver_') try: # 解压 with tarfile.open(filepath, 'r:gz') as tar: tar.extractall(extract_dir) results.append(('解压文件', { 'success': True, 'stdout': f'已解压到 {extract_dir}', 'stderr': '', 'returncode': 0 })) # 查找安装脚本 install_scripts = ['install.sh', 'setup.sh', 'install', 'setup'] found_script = None for root, dirs, files in os.walk(extract_dir): for script in install_scripts: if script in files: found_script = os.path.join(root, script) break if found_script: break if found_script: os.chmod(found_script, 0o755) result = run_command(found_script, shell=True) results.append(('执行安装脚本', result)) else: # 查找 Makefile for root, dirs, files in os.walk(extract_dir): if 'Makefile' in files: make_result = run_command(['make', '-C', root]) results.append(('make', make_result)) if make_result['success']: install_result = run_command(['make', '-C', root, 'install']) results.append(('make install', install_result)) break else: # 查找 PPD 文件 ppd_files = list(Path(extract_dir).rglob('*.ppd')) if ppd_files: for ppd_file in ppd_files: ppd_results = install_ppd(str(ppd_file)) results.extend(ppd_results) else: results.append(('查找安装方式', { 'success': False, 'stdout': '', 'stderr': '未找到安装脚本、Makefile或PPD文件', 'returncode': 1 })) except Exception as e: results.append(('解压文件', { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': 1 })) finally: # 清理临时目录 shutil.rmtree(extract_dir, ignore_errors=True) return results def install_zip(filepath): """安装 .zip 包""" results = [] # 创建临时解压目录 extract_dir = tempfile.mkdtemp(prefix='driver_') try: # 解压 with zipfile.ZipFile(filepath, 'r') as zip_ref: zip_ref.extractall(extract_dir) results.append(('解压文件', { 'success': True, 'stdout': f'已解压到 {extract_dir}', 'stderr': '', 'returncode': 0 })) # 查找 deb 文件 deb_files = list(Path(extract_dir).rglob('*.deb')) if deb_files: for deb_file in deb_files: deb_results = install_deb(str(deb_file)) results.extend(deb_results) return results # 查找安装脚本 install_scripts = ['install.sh', 'setup.sh', 'install', 'setup'] found_script = None for root, dirs, files in os.walk(extract_dir): for script in install_scripts: if script in files: found_script = os.path.join(root, script) break if found_script: break if found_script: os.chmod(found_script, 0o755) result = run_command(found_script, shell=True) results.append(('执行安装脚本', result)) else: # 查找 PPD 文件 ppd_files = list(Path(extract_dir).rglob('*.ppd')) if ppd_files: for ppd_file in ppd_files: ppd_results = install_ppd(str(ppd_file)) results.extend(ppd_results) else: results.append(('查找安装方式', { 'success': False, 'stdout': '', 'stderr': '未找到deb包、安装脚本或PPD文件', 'returncode': 1 })) except Exception as e: results.append(('解压文件', { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': 1 })) finally: # 清理临时目录 shutil.rmtree(extract_dir, ignore_errors=True) return results def install_rpm(filepath): """安装 .rpm 包(转换为deb后安装)""" results = [] # 检查 alien 是否安装 alien_check = run_command(['which', 'alien']) if not alien_check['success']: # 安装 alien install_result = run_command(['apt-get', 'install', '-y', 'alien']) results.append(('安装alien工具', install_result)) if not install_result['success']: return results # 使用 alien 转换 work_dir = os.path.dirname(filepath) convert_result = run_command(f'cd {work_dir} && alien -d {filepath}', shell=True) results.append(('转换RPM为DEB', convert_result)) if convert_result['success']: # 查找生成的 deb 文件 deb_files = list(Path(work_dir).glob('*.deb')) for deb_file in deb_files: deb_results = install_deb(str(deb_file)) results.extend(deb_results) os.remove(deb_file) # 清理临时deb文件 return results def install_script(filepath): """执行安装脚本""" results = [] filename = os.path.basename(filepath).lower() os.chmod(filepath, 0o755) # HP 插件需要特殊处理(非交互式安装) if 'hplip' in filename and 'plugin' in filename: # 方法1:尝试使用 hp-plugin 命令安装 hp_plugin_check = run_command(['which', 'hp-plugin']) if hp_plugin_check['success']: result = run_command(f'hp-plugin -i -p {filepath}', shell=True) results.append(('使用 hp-plugin 安装 HP 插件', result)) else: # 方法2:使用 yes 命令自动确认交互式提示 result = run_command(f'yes | {filepath}', shell=True) results.append(('执行 HP 插件安装脚本', result)) else: # 普通脚本直接执行 result = run_command(filepath, shell=True) results.append(('执行安装脚本', result)) return results def install_driver(filepath, file_type): """根据文件类型安装驱动""" if file_type == 'deb': return install_deb(filepath) elif file_type == 'ppd': return install_ppd(filepath) elif file_type in ('tar.gz', 'tar', 'tgz'): return install_tar_gz(filepath) elif file_type == 'zip': return install_zip(filepath) elif file_type == 'rpm': return install_rpm(filepath) elif file_type == 'script': return install_script(filepath) else: return [('未知类型', { 'success': False, 'stdout': '', 'stderr': f'不支持的文件类型: {file_type}', 'returncode': 1 })] # HTML模板 HTML_TEMPLATE = '''
正在安装驱动,请稍候...
暂无自定义PPD文件
{% endif %}暂无自定义模型文件
{% endif %}未找到打印机相关软件包
{% endif %}