#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CUPS 打印机驱动管理器 轻量级Web服务,用于上传和安装打印机驱动 """ import os import sys import subprocess import tempfile import shutil import tarfile import zipfile from pathlib import Path from functools import wraps from flask import Flask, request, render_template_string, redirect, url_for, flash, jsonify, Response from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.urandom(24) # 配置 UPLOAD_FOLDER = '/tmp/cups-drivers' MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB ALLOWED_EXTENSIONS = {'deb', 'ppd', 'gz', 'tar', 'tgz', 'zip', 'rpm', 'sh', 'run'} # 管理员凭据(可通过环境变量设置) ADMIN_USERNAME = os.environ.get('DRIVER_MANAGER_USERNAME', 'admin') ADMIN_PASSWORD = os.environ.get('DRIVER_MANAGER_PASSWORD', 'admin') app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH # 确保上传目录存在 os.makedirs(UPLOAD_FOLDER, exist_ok=True) def check_auth(username, password): """验证用户名和密码""" return username == ADMIN_USERNAME and password == ADMIN_PASSWORD def authenticate(): """发送401响应""" return Response( '需要登录才能访问驱动管理器\n', 401, {'WWW-Authenticate': 'Basic realm="CUPS Driver Manager"'} ) def requires_auth(f): """认证装饰器""" @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated def allowed_file(filename): """检查文件类型是否允许""" if '.' not in filename: return False ext = filename.rsplit('.', 1)[1].lower() # 处理 .tar.gz 情况 if filename.endswith('.tar.gz'): return True return ext in ALLOWED_EXTENSIONS def get_file_type(filename): """获取文件类型""" filename_lower = filename.lower() if filename_lower.endswith('.deb'): return 'deb' elif filename_lower.endswith('.ppd') or filename_lower.endswith('.ppd.gz'): return 'ppd' elif filename_lower.endswith('.tar.gz') or filename_lower.endswith('.tgz'): return 'tar.gz' elif filename_lower.endswith('.tar'): return 'tar' elif filename_lower.endswith('.zip'): return 'zip' elif filename_lower.endswith('.rpm'): return 'rpm' elif filename_lower.endswith('.sh') or filename_lower.endswith('.run'): return 'script' else: return 'unknown' def run_command(cmd, shell=False): """执行命令并返回结果""" try: if shell: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300) else: result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) return { 'success': result.returncode == 0, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode } except subprocess.TimeoutExpired: return {'success': False, 'stdout': '', 'stderr': '命令执行超时', 'returncode': -1} except Exception as e: return {'success': False, 'stdout': '', 'stderr': str(e), 'returncode': -1} def install_deb(filepath): """安装 .deb 包""" results = [] # 先尝试直接安装 result = run_command(['dpkg', '-i', filepath]) results.append(('dpkg -i', result)) # 修复依赖 if not result['success']: fix_result = run_command(['apt-get', 'install', '-f', '-y']) results.append(('apt-get install -f', fix_result)) return results def install_ppd(filepath): """安装 .ppd 文件""" results = [] # PPD文件目录 ppd_dirs = [ '/usr/share/ppd/custom', '/usr/share/cups/model' ] # 确保目录存在 for ppd_dir in ppd_dirs: os.makedirs(ppd_dir, exist_ok=True) # 复制PPD文件 filename = os.path.basename(filepath) dest = os.path.join(ppd_dirs[0], filename) try: shutil.copy2(filepath, dest) os.chmod(dest, 0o644) results.append(('复制PPD文件', { 'success': True, 'stdout': f'已复制到 {dest}', 'stderr': '', 'returncode': 0 })) except Exception as e: results.append(('复制PPD文件', { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': 1 })) return results def install_tar_gz(filepath): """安装 .tar.gz 包""" results = [] # 创建临时解压目录 extract_dir = tempfile.mkdtemp(prefix='driver_') try: # 解压 with tarfile.open(filepath, 'r:gz') as tar: tar.extractall(extract_dir) results.append(('解压文件', { 'success': True, 'stdout': f'已解压到 {extract_dir}', 'stderr': '', 'returncode': 0 })) # 查找安装脚本 install_scripts = ['install.sh', 'setup.sh', 'install', 'setup'] found_script = None for root, dirs, files in os.walk(extract_dir): for script in install_scripts: if script in files: found_script = os.path.join(root, script) break if found_script: break if found_script: os.chmod(found_script, 0o755) result = run_command(found_script, shell=True) results.append(('执行安装脚本', result)) else: # 查找 Makefile for root, dirs, files in os.walk(extract_dir): if 'Makefile' in files: make_result = run_command(['make', '-C', root]) results.append(('make', make_result)) if make_result['success']: install_result = run_command(['make', '-C', root, 'install']) results.append(('make install', install_result)) break else: # 查找 PPD 文件 ppd_files = list(Path(extract_dir).rglob('*.ppd')) if ppd_files: for ppd_file in ppd_files: ppd_results = install_ppd(str(ppd_file)) results.extend(ppd_results) else: results.append(('查找安装方式', { 'success': False, 'stdout': '', 'stderr': '未找到安装脚本、Makefile或PPD文件', 'returncode': 1 })) except Exception as e: results.append(('解压文件', { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': 1 })) finally: # 清理临时目录 shutil.rmtree(extract_dir, ignore_errors=True) return results def install_zip(filepath): """安装 .zip 包""" results = [] # 创建临时解压目录 extract_dir = tempfile.mkdtemp(prefix='driver_') try: # 解压 with zipfile.ZipFile(filepath, 'r') as zip_ref: zip_ref.extractall(extract_dir) results.append(('解压文件', { 'success': True, 'stdout': f'已解压到 {extract_dir}', 'stderr': '', 'returncode': 0 })) # 查找 deb 文件 deb_files = list(Path(extract_dir).rglob('*.deb')) if deb_files: for deb_file in deb_files: deb_results = install_deb(str(deb_file)) results.extend(deb_results) return results # 查找安装脚本 install_scripts = ['install.sh', 'setup.sh', 'install', 'setup'] found_script = None for root, dirs, files in os.walk(extract_dir): for script in install_scripts: if script in files: found_script = os.path.join(root, script) break if found_script: break if found_script: os.chmod(found_script, 0o755) result = run_command(found_script, shell=True) results.append(('执行安装脚本', result)) else: # 查找 PPD 文件 ppd_files = list(Path(extract_dir).rglob('*.ppd')) if ppd_files: for ppd_file in ppd_files: ppd_results = install_ppd(str(ppd_file)) results.extend(ppd_results) else: results.append(('查找安装方式', { 'success': False, 'stdout': '', 'stderr': '未找到deb包、安装脚本或PPD文件', 'returncode': 1 })) except Exception as e: results.append(('解压文件', { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': 1 })) finally: # 清理临时目录 shutil.rmtree(extract_dir, ignore_errors=True) return results def install_rpm(filepath): """安装 .rpm 包(转换为deb后安装)""" results = [] # 检查 alien 是否安装 alien_check = run_command(['which', 'alien']) if not alien_check['success']: # 安装 alien install_result = run_command(['apt-get', 'install', '-y', 'alien']) results.append(('安装alien工具', install_result)) if not install_result['success']: return results # 使用 alien 转换 work_dir = os.path.dirname(filepath) convert_result = run_command(f'cd {work_dir} && alien -d {filepath}', shell=True) results.append(('转换RPM为DEB', convert_result)) if convert_result['success']: # 查找生成的 deb 文件 deb_files = list(Path(work_dir).glob('*.deb')) for deb_file in deb_files: deb_results = install_deb(str(deb_file)) results.extend(deb_results) os.remove(deb_file) # 清理临时deb文件 return results def install_script(filepath): """执行安装脚本""" results = [] os.chmod(filepath, 0o755) result = run_command(filepath, shell=True) results.append(('执行安装脚本', result)) return results def install_driver(filepath, file_type): """根据文件类型安装驱动""" if file_type == 'deb': return install_deb(filepath) elif file_type == 'ppd': return install_ppd(filepath) elif file_type in ('tar.gz', 'tar', 'tgz'): return install_tar_gz(filepath) elif file_type == 'zip': return install_zip(filepath) elif file_type == 'rpm': return install_rpm(filepath) elif file_type == 'script': return install_script(filepath) else: return [('未知类型', { 'success': False, 'stdout': '', 'stderr': f'不支持的文件类型: {file_type}', 'returncode': 1 })] # HTML模板 HTML_TEMPLATE = ''' CUPS 驱动管理器

CUPS 打印机驱动管理器

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}

上传并安装驱动

📦

点击或拖拽文件到此处上传

已选择:
支持的格式: .deb .ppd .tar.gz .zip .rpm .sh

正在安装驱动,请稍候...

{% if results %}

安装结果

{% for step, result in results %}
▶ {{ step }}
{% if result.success %}
✓ 成功
{% else %}
✗ 失败 (返回码: {{ result.returncode }})
{% endif %} {% if result.stdout %}
{{ result.stdout }}
{% endif %} {% if result.stderr %}
{{ result.stderr }}
{% endif %}
{% endfor %}
{% endif %}

快速链接

''' DRIVERS_TEMPLATE = ''' 已安装驱动 - CUPS 驱动管理器

已安装的打印机驱动

PPD 文件 (/usr/share/ppd/custom/)

{% if ppd_custom %}
    {% for ppd in ppd_custom %}
  • {{ ppd }}
  • {% endfor %}
{% else %}

暂无自定义PPD文件

{% endif %}

CUPS 模型 (/usr/share/cups/model/)

{% if cups_model %}
    {% for model in cups_model %}
  • {{ model }}
  • {% endfor %}
{% else %}

暂无自定义模型文件

{% endif %}

已安装的打印机相关软件包

{% if packages %}
    {% for pkg in packages %}
  • {{ pkg }}
  • {% endfor %}
{% else %}

未找到打印机相关软件包

{% endif %}
← 返回上传页面
''' @app.route('/') @requires_auth def index(): return render_template_string(HTML_TEMPLATE, results=None) @app.route('/upload', methods=['POST']) @requires_auth def upload_file(): if 'driver_file' not in request.files: flash('没有选择文件', 'danger') return redirect(url_for('index')) file = request.files['driver_file'] if file.filename == '': flash('没有选择文件', 'danger') return redirect(url_for('index')) if not allowed_file(file.filename): flash('不支持的文件类型', 'danger') return redirect(url_for('index')) # 保存文件 filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # 获取文件类型 file_type = get_file_type(filename) # 安装驱动 results = install_driver(filepath, file_type) # 清理上传的文件 try: os.remove(filepath) except: pass # 检查是否全部成功 all_success = all(r[1]['success'] for r in results) if all_success: flash('驱动安装成功!', 'success') else: flash('驱动安装过程中出现错误,请查看详细信息', 'danger') return render_template_string(HTML_TEMPLATE, results=results) @app.route('/drivers') @requires_auth def list_drivers(): # 获取自定义PPD文件 ppd_custom = [] ppd_dir = '/usr/share/ppd/custom' if os.path.exists(ppd_dir): ppd_custom = os.listdir(ppd_dir) # 获取CUPS模型 cups_model = [] model_dir = '/usr/share/cups/model' if os.path.exists(model_dir): for f in os.listdir(model_dir): if f.endswith('.ppd') or f.endswith('.ppd.gz'): cups_model.append(f) # 获取已安装的打印机相关软件包 packages = [] result = run_command(['dpkg', '-l']) if result['success']: for line in result['stdout'].split('\n'): if any(keyword in line.lower() for keyword in ['printer', 'cups', 'hplip', 'gutenprint', 'foomatic', 'epson', 'canon', 'brother', 'samsung', 'pantum']): parts = line.split() if len(parts) >= 3 and parts[0] == 'ii': packages.append(f"{parts[1]} ({parts[2]})") return render_template_string(DRIVERS_TEMPLATE, ppd_custom=ppd_custom, cups_model=cups_model, packages=packages[:50]) # 限制显示数量 @app.route('/api/install', methods=['POST']) @requires_auth def api_install(): """API接口,返回JSON""" if 'driver_file' not in request.files: return jsonify({'success': False, 'error': '没有选择文件'}) file = request.files['driver_file'] if file.filename == '': return jsonify({'success': False, 'error': '没有选择文件'}) if not allowed_file(file.filename): return jsonify({'success': False, 'error': '不支持的文件类型'}) # 保存文件 filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # 获取文件类型并安装 file_type = get_file_type(filename) results = install_driver(filepath, file_type) # 清理 try: os.remove(filepath) except: pass all_success = all(r[1]['success'] for r in results) return jsonify({ 'success': all_success, 'results': [{'step': r[0], 'success': r[1]['success'], 'stdout': r[1]['stdout'], 'stderr': r[1]['stderr']} for r in results] }) def main(): import argparse parser = argparse.ArgumentParser(description='CUPS 打印机驱动管理器') parser.add_argument('-p', '--port', type=int, default=632, help='监听端口 (默认: 632)') parser.add_argument('-H', '--host', default='0.0.0.0', help='监听地址 (默认: 0.0.0.0)') parser.add_argument('--password', default=None, help='管理员密码') args = parser.parse_args() global ADMIN_PASSWORD if args.password: ADMIN_PASSWORD = args.password print(f"CUPS 驱动管理器启动中...") print(f"访问地址: http://localhost:{args.port}/") print(f"默认用户名: admin") print(f"默认密码: {ADMIN_PASSWORD}") print("-" * 40) app.run(host=args.host, port=args.port, debug=False) if __name__ == '__main__': main()