53 KiB
APOPHIS WebSocket Extension — Technical Specification
1. Overview
This specification extends APOPHIS v1.0 with first-class WebSocket contract testing. WebSockets differ fundamentally from HTTP:
| Dimension | HTTP | WebSocket |
|---|---|---|
| Connection | Ephemeral, per-request | Persistent, bidirectional |
| Protocol | Request/response pairs | Message-oriented streams |
| Metadata | Status codes, headers, body | No status codes; headers only at handshake |
| Lifecycle | Stateless (per request) | Stateful (connection-scoped) |
| Errors | HTTP status codes | Connection close codes + application-level errors |
APOPHIS must treat WebSocket routes as first-class citizens in the contract/stateful testing pipeline without breaking existing HTTP-only workflows.
2. Goals
- Annotate WebSocket routes in Fastify using
@fastify/websocket(or equivalent) with APOPHIS contract annotations. - Define message-schema contracts (what messages may be sent/received) and state-machine contracts (valid transitions between connection states).
- Provide a dedicated
ws-runner.tsfor WebSocket contract validation. - Validate message sequences (e.g.,
AUTH→READY→SUBSCRIBE→DATA). - Support stateful testing for connection lifecycle, reconnection, and error handling.
- Extend APOSTL with
ws_messageandws_stateoperations. - Integrate seamlessly with existing
contract()andstateful()methods.
3. Non-Goals
- WebSocket performance/load testing (out of scope; use
autocannonork6). - Browser-based WebSocket testing (APOPHIS is server-side only).
- Raw TCP/WebSocket frame inspection (we operate at the message level).
4. Architecture
4.1 High-Level Flow
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Fastify Route │────▶│ WS Contract │────▶│ ws-runner.ts │
│ (with ws opts) │ │ Extraction │ │ (validation) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Message Schema │ │ APOSTL Eval │
│ (x-ws-messages) │ │ (ws_message, │
└──────────────────┘ │ ws_state) │
└─────────────────┘
4.2 File Additions & Modifications
| File | Action | Purpose |
|---|---|---|
src/types.ts |
Modify | Add WS-specific types to public API |
src/domain/contract.ts |
Modify | Extract WS contracts from route schema |
src/domain/discovery.ts |
Modify | Discover WebSocket routes alongside HTTP |
src/formula/parser.ts |
Modify | Add ws_message, ws_state to APOSTL grammar |
src/formula/evaluator.ts |
Modify | Evaluate ws_message and ws_state nodes |
src/test/ws-runner.ts |
Create | Dedicated WebSocket test runner |
src/test/ws-client.ts |
Create | Thin WebSocket client wrapper for testing |
src/domain/ws-contract-validation.ts |
Create | WS-specific validation logic |
src/plugin/index.ts |
Modify | Wire WS runner into contract() / stateful() |
src/domain/category.ts |
Modify | Categorize WS routes (default: observer for GET upgrades) |
5. Type Changes (src/types.ts)
5.1 New Types
// ============================================================================
// WebSocket: Message Types
// ============================================================================
export interface WebSocketMessage {
readonly direction: 'incoming' | 'outgoing'
readonly type: string // e.g., 'auth', 'ready', 'data', 'error'
readonly payload: unknown
readonly timestamp: number
}
export interface WebSocketConnection {
readonly id: string
readonly url: string
readonly headers: Record<string, string>
readonly state: WebSocketState
readonly messages: ReadonlyArray<WebSocketMessage>
readonly closeCode?: number
readonly closeReason?: string
}
export type WebSocketState =
| 'connecting'
| 'open'
| 'authenticating'
| 'ready'
| 'subscribed'
| 'closing'
| 'closed'
| 'error'
// ============================================================================
// WebSocket: Contract Annotations
// ============================================================================
export interface WebSocketMessageSchema {
readonly type: string
readonly direction: 'incoming' | 'outgoing'
readonly schema: Record<string, unknown> // JSON Schema for payload
readonly required?: boolean
}
export interface WebSocketStateTransition {
readonly from: WebSocketState
readonly to: WebSocketState
readonly trigger: string // message type that triggers transition
readonly guard?: string // APOSTL guard formula
}
export interface WebSocketContract {
readonly path: string
readonly method: 'GET' // WS always upgrades from GET
readonly category: OperationCategory
readonly messages: WebSocketMessageSchema[]
readonly transitions: WebSocketStateTransition[]
readonly requires: string[] // APOSTL preconditions (e.g., auth token in headers)
readonly ensures: string[] // APOSTL postconditions on final state
readonly invariants: string[] // APOSTL invariants over message sequence
readonly validateRuntime: boolean
}
// ============================================================================
// WebSocket: EvalContext Extension
// ============================================================================
export interface WebSocketEvalContext {
readonly connection: WebSocketConnection
readonly message: WebSocketMessage
readonly state: WebSocketState
readonly previousMessage?: WebSocketMessage
}
// ============================================================================
// WebSocket: Test Results
// ============================================================================
export interface WebSocketTestResult {
readonly ok: boolean
readonly name: string
readonly id: number
readonly connectionId: string
readonly directive?: string
readonly diagnostics?: WebSocketTestDiagnostics
}
export interface WebSocketTestDiagnostics {
readonly error?: string
readonly violation?: ContractViolation
readonly expectedSequence?: string[]
readonly actualSequence?: string[]
readonly stateTransition?: {
readonly from: WebSocketState
readonly to: WebSocketState
readonly expectedTrigger: string
readonly actualTrigger: string
}
}
5.2 Modified Types
// In RouteContract, add optional ws field
export interface RouteContract {
path: string
method: string
category: OperationCategory
requires: string[]
ensures: string[]
invariants: string[]
regexPatterns: Record<string, string>
validateRuntime: boolean
schema?: Record<string, unknown>
ws?: WebSocketContract // <-- NEW: present if route is a WebSocket upgrade
}
// In EvalContext, add optional ws field
export interface EvalContext {
readonly request: { /* ... */ }
readonly response: { /* ... */ }
readonly previous?: EvalContext
readonly ws?: WebSocketEvalContext // <-- NEW: populated during WS testing
}
// In OperationHeader, add ws operations
export type OperationHeader =
| 'request_body' | 'response_body' | 'response_code'
| 'request_headers' | 'response_headers' | 'query_params'
| 'cookies' | 'response_time'
| 'ws_message' | 'ws_state' // <-- NEW
6. Fastify WebSocket Route Annotation
6.1 Route Definition (Example)
Using @fastify/websocket:
import fastify from 'fastify'
import websocket from '@fastify/websocket'
import apophis from 'apophis-fastify'
const app = fastify()
await app.register(websocket)
await app.register(apophis)
// WebSocket route with APOPHIS contract annotations
app.get('/ws/events', {
websocket: true,
schema: {
// Standard HTTP schema for the upgrade request
querystring: {
type: 'object',
properties: {
stream: { type: 'string', enum: ['live', 'snapshot'] }
}
},
// WebSocket-specific contract annotations
'x-ws-messages': [
{
type: 'auth',
direction: 'outgoing',
schema: {
type: 'object',
properties: {
token: { type: 'string' }
},
required: ['token']
},
required: true
},
{
type: 'ready',
direction: 'incoming',
schema: {
type: 'object',
properties: {
status: { type: 'string', const: 'ready' }
}
}
},
{
type: 'subscribe',
direction: 'outgoing',
schema: {
type: 'object',
properties: {
channels: { type: 'array', items: { type: 'string' } }
}
}
},
{
type: 'data',
direction: 'incoming',
schema: {
type: 'object',
properties: {
channel: { type: 'string' },
payload: { type: 'object' }
}
}
}
],
'x-ws-transitions': [
{ from: 'open', to: 'authenticating', trigger: 'auth' },
{ from: 'authenticating', to: 'ready', trigger: 'ready' },
{ from: 'ready', to: 'subscribed', trigger: 'subscribe' },
{ from: 'subscribed', to: 'subscribed', trigger: 'data' }
],
'x-requires': [
'request_headers(this).authorization != null'
],
'x-ensures': [
'ws_state(this) == "ready"',
'previous(ws_message(this).type) == "auth" && ws_message(this).type == "ready"'
],
'x-invariants': [
'for msg in ws_message(this): msg.direction == "incoming" || msg.direction == "outgoing"'
]
}
}, (connection, req) => {
// Handler implementation
connection.socket.on('message', (raw) => {
const msg = JSON.parse(raw.toString())
// ... handle messages
})
})
6.2 Schema Annotation Reference
| Annotation | Type | Description |
|---|---|---|
x-ws-messages |
WebSocketMessageSchema[] |
Defines valid message types, directions, and payload schemas |
x-ws-transitions |
WebSocketStateTransition[] |
Defines valid state machine transitions |
x-requires |
string[] |
APOSTL preconditions evaluated on the HTTP upgrade request |
x-ensures |
string[] |
APOSTL postconditions evaluated after connection close or per-message |
x-invariants |
string[] |
APOSTL invariants checked after every message |
x-category |
string |
Override category (default: observer for WS upgrades) |
x-validate-runtime |
boolean |
Enable runtime validation of WS contracts |
7. Contract Extraction (src/domain/contract.ts)
7.1 Modified extractContract
export const extractContract = (
path: string,
method: string,
schema: Record<string, unknown> | undefined,
isWebSocket: boolean = false // <-- NEW parameter
): RouteContract => {
const s = schema ?? {}
// ... existing cache logic ...
const override = typeof s['x-category'] === 'string' ? s['x-category'] : undefined
// For WS routes, default to 'observer' unless overridden
const category = isWebSocket
? (override as OperationCategory ?? 'observer')
: inferCategory(path, method, override)
// ... existing requires/ensures extraction ...
const contract: RouteContract = {
path,
method: method.toUpperCase(),
category,
requires,
ensures,
invariants: EMPTY_INVARIANTS,
regexPatterns: {},
validateRuntime,
schema: s,
}
// If WebSocket, extract WS-specific contract
if (isWebSocket) {
contract.ws = extractWebSocketContract(s)
}
// ... cache and return ...
return contract
}
7.2 New extractWebSocketContract
const extractWebSocketContract = (
schema: Record<string, unknown>
): WebSocketContract | undefined => {
const messages = schema['x-ws-messages']
const transitions = schema['x-ws-transitions']
if (!Array.isArray(messages) || messages.length === 0) {
return undefined
}
const requires = schema['x-requires']
const ensures = schema['x-ensures']
const invariants = schema['x-invariants']
const validateRuntime = schema['x-validate-runtime'] !== false
return {
path: '', // populated by caller
method: 'GET',
category: 'observer',
messages: messages as WebSocketMessageSchema[],
transitions: (transitions as WebSocketStateTransition[]) ?? [],
requires: Array.isArray(requires) ? requires as string[] : [],
ensures: Array.isArray(ensures) ? ensures as string[] : [],
invariants: Array.isArray(invariants) ? invariants as string[] : [],
validateRuntime,
}
}
8. Route Discovery (src/domain/discovery.ts)
8.1 Modified captureRoute
export const captureRoute = (
instance: object,
route: CapturedRoute & { websocket?: boolean } // <-- NEW: websocket flag
): void => {
const existing = capturedRoutes.get(instance) ?? []
existing.push(route)
capturedRoutes.set(instance, existing)
}
8.2 Modified discoverRoutes
export const discoverRoutes = (
instance: {
routes?: Array<{
method: string
url: string
schema?: Record<string, unknown>
websocket?: boolean
}>
}
): RouteContract[] => {
const captured = capturedRoutes.get(instance)
if (captured && captured.length > 0) {
return captured.map((route) =>
extractContract(route.url, route.method, route.schema, route.websocket ?? false)
)
}
if (Array.isArray(instance.routes) && instance.routes.length > 0) {
return instance.routes.map((route) =>
extractContract(route.url, route.method, route.schema, route.websocket ?? false)
)
}
return []
}
8.3 Plugin Hook Modification (src/plugin/index.ts:114-138)
fastify.addHook('onRoute', (routeOptions) => {
const method = Array.isArray(routeOptions.method)
? routeOptions.method.join(',')
: routeOptions.method
const schema = routeOptions.schema as Record<string, unknown> | undefined
const prefix = (routeOptions as unknown as Record<string, unknown>).prefix as string | undefined
const url = prefix && !routeOptions.url.startsWith(prefix)
? `${prefix}${routeOptions.url}`
: routeOptions.url
// Detect WebSocket routes
const isWebSocket = (routeOptions as unknown as Record<string, unknown>).websocket === true
captureRoute(fastify, {
method,
url,
schema,
prefix,
websocket: isWebSocket, // <-- NEW
})
const contract = extractContract(url, method, schema, isWebSocket)
if (contract.validateRuntime && (contract.requires.length > 0 || contract.ensures.length > 0 || contract.ws !== undefined)) {
const config = routeOptions.config as Record<string, unknown> || {}
config.apophisContract = contract
routeOptions.config = config as typeof routeOptions.config
}
})
9. APOSTL Formula Extensions
9.1 New Operations
| Operation | Syntax | Returns | Description |
|---|---|---|---|
ws_message(this) |
ws_message(this) |
WebSocketMessage |
Current message being evaluated |
ws_message(this).type |
ws_message(this).type |
string |
Type of current message |
ws_message(this).payload |
ws_message(this).payload |
unknown |
Payload of current message |
ws_message(this).direction |
ws_message(this).direction |
'incoming' | 'outgoing' |
Direction of current message |
ws_state(this) |
ws_state(this) |
WebSocketState |
Current connection state |
previous(ws_message(this)) |
previous(ws_message(this)) |
WebSocketMessage |
Previous message in sequence |
9.2 Parser Changes (src/formula/parser.ts)
Add ws_message and ws_state to VALID_HEADERS:
const VALID_HEADERS: OperationHeader[] = [
'request_body', 'response_body', 'response_code',
'request_headers', 'response_headers', 'query_params', 'cookies', 'response_time',
'ws_message', 'ws_state' // <-- NEW
]
Add manual char-code parsing for ws_message (10 chars) and ws_state (8 chars) in parseOperation:
// In parseOperation, add after cookies check:
if (!header && p + 10 <= len) {
const c0 = input.charCodeAt(p)
const c1 = input.charCodeAt(p + 1)
// ... check for 'ws_message' ...
if (c0 === 119 && c1 === 115) {
// ws_ prefix
const c3 = input.charCodeAt(p + 3)
if (c3 === 109) {
// ws_message
header = 'ws_message'
headerLen = 10
} else if (c3 === 115) {
// ws_state
header = 'ws_state'
headerLen = 8
}
}
}
9.3 Evaluator Changes (src/formula/evaluator.ts)
Add cases to resolveOperation:
function resolveOperation(node: Extract<FormulaNode, { type: 'operation' }>, ctx: EvalContext): unknown {
const { header, parameter, accessor } = node
let target: unknown
switch (header) {
// ... existing cases ...
case 'ws_message':
target = ctx.ws?.message ?? null
break
case 'ws_state':
target = ctx.ws?.state ?? null
break
default:
throw new Error(`Unknown operation header: ${header}`)
}
// ... existing accessor logic ...
}
10. WebSocket Test Runner (src/test/ws-runner.ts)
10.1 Runner Interface
export interface WebSocketTestRunner {
run(
fastify: FastifyInjectInstance,
config: TestConfig,
scopeRegistry?: ScopeRegistry
): Promise<TestSuite>
}
10.2 Pseudocode
/**
* WebSocket Contract Test Runner
* Validates WS routes for:
* 1. Message schema compliance (payload matches JSON Schema)
* 2. State machine transitions (valid sequence of states)
* 3. APOSTL preconditions (on upgrade request)
* 4. APOSTL postconditions (on connection close or per-message)
* 5. APOSTL invariants (over entire message sequence)
*/
export const runWebSocketTests = async (
fastify: FastifyInjectInstance,
config: TestConfig,
scopeRegistry?: ScopeRegistry
): Promise<TestSuite> => {
const startTime = Date.now()
const depth = resolveDepth(config.depth ?? 'standard')
// 1. Discover WS routes
const allRoutes = discoverRoutes(fastify)
const wsRoutes = allRoutes.filter(r => r.ws !== undefined)
if (wsRoutes.length === 0) {
return {
tests: [],
summary: { passed: 0, failed: 0, skipped: 0, timeMs: 0, cacheHits: 0, cacheMisses: 0 },
routes: [],
}
}
const scopeHeaders = scopeRegistry?.getHeaders(config.scope ?? null) ?? {}
const results: TestResult[] = []
let testId = 0
// 2. For each WS route, run test sequences
for (const route of wsRoutes) {
const wsContract = route.ws!
const commandsPerRoute = Math.max(1, Math.floor(depth.contractRuns / Math.max(wsRoutes.length, 1)))
for (let run = 0; run < commandsPerRoute; run++) {
testId++
const connectionId = `ws-${testId}`
const name = `WS ${route.method} ${route.path} (#${testId})`
// 3. Establish WebSocket connection
let connection: WebSocketConnection
try {
connection = await establishConnection(fastify, route, scopeHeaders)
} catch (err) {
results.push({
ok: false,
name,
id: testId,
diagnostics: { error: `Connection failed: ${err instanceof Error ? err.message : String(err)}` }
})
continue
}
// 4. Validate upgrade request preconditions
const upgradeCtx = buildUpgradeContext(connection, route)
const preResult = validatePostconditions(wsContract.requires, upgradeCtx, {
method: route.method,
path: route.path,
})
if (!preResult.success) {
results.push({
ok: false,
name,
id: testId,
diagnostics: {
error: `Precondition failed: ${preResult.error}`,
violation: preResult.violation,
}
})
await closeConnection(connection)
continue
}
// 5. Run message sequence
const sequenceResult = await runMessageSequence(
connection,
wsContract,
route,
testId,
scopeHeaders
)
results.push(...sequenceResult.results)
// 6. Validate postconditions on final state
const finalCtx = buildFinalContext(connection, route)
const postResult = validatePostconditions(wsContract.ensures, finalCtx, {
method: route.method,
path: route.path,
})
if (!postResult.success) {
testId++
results.push({
ok: false,
name: `POST ${name}`,
id: testId,
diagnostics: {
error: `Postcondition failed: ${postResult.error}`,
violation: postResult.violation,
}
})
}
// 7. Check invariants over entire sequence
const invariantResults = checkWebSocketInvariants(wsContract.invariants, connection, route)
for (const inv of invariantResults) {
if (!inv.success) {
testId++
results.push({
ok: false,
name: `INVARIANT: ${inv.name}`,
id: testId,
diagnostics: { error: inv.error }
})
}
}
await closeConnection(connection)
}
}
// 8. Build TestSuite
const passed = results.filter(r => r.ok && r.directive === undefined).length
const failed = results.filter(r => !r.ok).length
const skipped = results.filter(r => r.directive !== undefined).length
return {
tests: results,
summary: { passed, failed, skipped, timeMs: Date.now() - startTime, cacheHits: 0, cacheMisses: 0 },
routes: allRoutes.map(r => ({
path: r.path,
method: r.method,
status: r.ws !== undefined ? 'tested' : 'no-contract',
})),
}
}
10.3 Message Sequence Validation
interface SequenceResult {
results: TestResult[]
connection: WebSocketConnection
}
const runMessageSequence = async (
connection: WebSocketConnection,
wsContract: WebSocketContract,
route: RouteContract,
baseTestId: number,
scopeHeaders: Record<string, string>
): Promise<SequenceResult> => {
const results: TestResult[] = []
let currentState: WebSocketState = 'open'
let testId = baseTestId
// Generate a valid message sequence based on state machine
const sequence = generateValidSequence(wsContract.transitions)
for (const expectedMsg of sequence) {
testId++
const msgName = `MSG ${connection.id} ${expectedMsg.type} (#${testId})`
// Send or receive message
let actualMsg: WebSocketMessage
try {
if (expectedMsg.direction === 'outgoing') {
actualMsg = await sendMessage(connection, expectedMsg)
} else {
actualMsg = await receiveMessage(connection, expectedMsg, 5000)
}
} catch (err) {
results.push({
ok: false,
name: msgName,
id: testId,
diagnostics: {
error: `Message exchange failed: ${err instanceof Error ? err.message : String(err)}`,
expectedSequence: sequence.map(m => m.type),
}
})
break
}
// Validate message schema
const schemaValidation = validateMessageSchema(actualMsg, wsContract.messages)
if (!schemaValidation.valid) {
results.push({
ok: false,
name: msgName,
id: testId,
diagnostics: {
error: `Schema violation: ${schemaValidation.error}`,
violation: schemaValidation.violation,
}
})
continue
}
// Validate state transition
const transition = wsContract.transitions.find(t =>
t.from === currentState && t.trigger === actualMsg.type
)
if (!transition) {
results.push({
ok: false,
name: msgName,
id: testId,
diagnostics: {
error: `Invalid state transition from ${currentState} via ${actualMsg.type}`,
stateTransition: {
from: currentState,
to: currentState,
expectedTrigger: expectedMsg.type,
actualTrigger: actualMsg.type,
}
}
})
continue
}
// Update state
currentState = transition.to
// Evaluate APOSTL formulas for this message
const msgCtx = buildMessageContext(connection, actualMsg, currentState)
const postResult = validatePostconditions(wsContract.ensures, msgCtx, {
method: route.method,
path: route.path,
})
if (!postResult.success) {
results.push({
ok: false,
name: msgName,
id: testId,
diagnostics: {
error: `Postcondition failed: ${postResult.error}`,
violation: postResult.violation,
}
})
continue
}
results.push({ ok: true, name: msgName, id: testId })
}
return { results, connection }
}
10.4 Sequence Generation
const generateValidSequence = (
transitions: WebSocketStateTransition[]
): Array<{ type: string; direction: 'incoming' | 'outgoing' }> => {
// Build adjacency list
const adj = new Map<WebSocketState, WebSocketStateTransition[]>()
for (const t of transitions) {
const existing = adj.get(t.from) ?? []
existing.push(t)
adj.set(t.from, existing)
}
// BFS/DFS to find a path from 'open' to a terminal state
const sequence: Array<{ type: string; direction: 'incoming' | 'outgoing' }> = []
let current: WebSocketState = 'open'
const visited = new Set<string>()
while (true) {
const options = adj.get(current) ?? []
const unvisited = options.filter(t => !visited.has(`${t.from}-${t.to}-${t.trigger}`))
if (unvisited.length === 0) break
const choice = unvisited[0]!
visited.add(`${choice.from}-${choice.to}-${choice.trigger}`)
// Determine direction based on typical patterns
// (outgoing = client→server, incoming = server→client)
const direction: 'incoming' | 'outgoing' =
['auth', 'subscribe', 'ping'].includes(choice.trigger) ? 'outgoing' : 'incoming'
sequence.push({ type: choice.trigger, direction })
current = choice.to
// Prevent infinite loops
if (sequence.length > 50) break
}
return sequence
}
11. WebSocket Client (src/test/ws-client.ts)
Thin wrapper around ws or native WebSocket for testing:
import WebSocket from 'ws'
export const establishConnection = async (
fastify: FastifyInjectInstance,
route: RouteContract,
headers: Record<string, string>
): Promise<WebSocketConnection> => {
// Fastify inject doesn't support WS; we need the actual HTTP server
const address = fastify.server?.address()
if (!address || typeof address === 'string') {
throw new Error('Fastify server must be listening for WebSocket tests')
}
const url = `ws://localhost:${address.port}${route.path}`
const ws = new WebSocket(url, { headers })
return new Promise((resolve, reject) => {
const connection: WebSocketConnection = {
id: `ws-${Date.now()}-${Math.random().toString(36).slice(2)}`,
url,
headers,
state: 'connecting',
messages: [],
}
ws.on('open', () => {
connection.state = 'open'
resolve(connection)
})
ws.on('error', (err) => {
connection.state = 'error'
reject(err)
})
ws.on('message', (data) => {
const msg: WebSocketMessage = {
direction: 'incoming',
type: inferMessageType(data),
payload: parsePayload(data),
timestamp: Date.now(),
}
connection.messages = [...connection.messages, msg]
})
ws.on('close', (code, reason) => {
connection.state = 'closed'
connection.closeCode = code
connection.closeReason = reason.toString()
})
// Attach ws instance to connection for send/close operations
(connection as any)._ws = ws
})
}
export const sendMessage = async (
connection: WebSocketConnection,
msg: { type: string; payload?: unknown }
): Promise<WebSocketMessage> => {
const ws = (connection as any)._ws as WebSocket
const payload = JSON.stringify({ type: msg.type, ...msg.payload })
ws.send(payload)
const sent: WebSocketMessage = {
direction: 'outgoing',
type: msg.type,
payload: msg.payload,
timestamp: Date.now(),
}
connection.messages = [...connection.messages, sent]
return sent
}
export const receiveMessage = async (
connection: WebSocketConnection,
expected: { type: string },
timeoutMs: number
): Promise<WebSocketMessage> => {
return new Promise((resolve, reject) => {
const startTime = Date.now()
const check = () => {
const lastMsg = connection.messages[connection.messages.length - 1]
if (lastMsg && lastMsg.direction === 'incoming' && lastMsg.type === expected.type) {
resolve(lastMsg)
return
}
if (Date.now() - startTime > timeoutMs) {
reject(new Error(`Timeout waiting for message type: ${expected.type}`))
return
}
setTimeout(check, 10)
}
check()
})
}
export const closeConnection = async (
connection: WebSocketConnection
): Promise<void> => {
const ws = (connection as any)._ws as WebSocket
ws.close()
return new Promise((resolve) => {
ws.on('close', () => resolve())
})
}
12. Stateful Testing for WebSockets
12.1 Connection Lifecycle States
connecting → open → authenticating → ready → subscribed → closing → closed
↓ ↓ ↓ ↓
error error error error
12.2 Reconnection Testing
const testReconnection = async (
route: RouteContract,
scopeHeaders: Record<string, string>
): Promise<TestResult[]> => {
const results: TestResult[] = []
// Test 1: Clean reconnect after normal close
const conn1 = await establishConnection(fastify, route, scopeHeaders)
await runAuthSequence(conn1)
await closeConnection(conn1)
const conn2 = await establishConnection(fastify, route, scopeHeaders)
const reconnectOk = conn2.state === 'open'
results.push({
ok: reconnectOk,
name: `Reconnection after clean close`,
id: 1,
diagnostics: reconnectOk ? undefined : { error: 'Reconnection failed after clean close' }
})
// Test 2: Reconnect after error
const conn3 = await establishConnection(fastify, route, scopeHeaders)
await sendInvalidMessage(conn3) // Force error
await closeConnection(conn3)
const conn4 = await establishConnection(fastify, route, scopeHeaders)
const errorReconnectOk = conn4.state === 'open'
results.push({
ok: errorReconnectOk,
name: `Reconnection after error`,
id: 2,
diagnostics: errorReconnectOk ? undefined : { error: 'Reconnection failed after error' }
})
return results
}
12.3 Error Handling Testing
const testErrorHandling = async (
route: RouteContract,
wsContract: WebSocketContract,
scopeHeaders: Record<string, string>
): Promise<TestResult[]> => {
const results: TestResult[] = []
// Test: Invalid message type
const conn = await establishConnection(fastify, route, scopeHeaders)
await sendMessage(conn, { type: 'invalid_type_xyz', payload: {} })
const errorMsg = await waitForMessage(conn, 'error', 1000)
const handledCorrectly = errorMsg !== null && errorMsg.payload !== undefined
results.push({
ok: handledCorrectly,
name: `Error handling for invalid message type`,
id: 1,
diagnostics: handledCorrectly ? undefined : {
error: 'Server did not respond with error message for invalid type'
}
})
// Test: Message without required auth
const conn2 = await establishConnection(fastify, route, {})
await sendMessage(conn2, { type: 'subscribe', payload: { channels: ['test'] } })
const authError = await waitForMessage(conn2, 'error', 1000)
const authHandled = authError !== null
results.push({
ok: authHandled,
name: `Error handling for unauthenticated subscribe`,
id: 2,
diagnostics: authHandled ? undefined : {
error: 'Server allowed subscribe without auth'
}
})
return results
}
13. Integration with contract() and stateful()
13.1 Modified Plugin (src/plugin/index.ts)
const buildContract = (fastify: FastifyInstance, scope: ScopeRegistry) => async (opts: TestConfig = {}): Promise<TestSuite> => {
const config = {
depth: opts.depth ?? 'standard',
scope: opts.scope,
seed: opts.seed,
}
const injectInstance = fastify as unknown as import('../types.js').FastifyInjectInstance
// Run HTTP contract tests
const httpSuite = await runPetitTests(injectInstance, config, scope)
// Run WebSocket contract tests
const wsSuite = await runWebSocketTests(injectInstance, config, scope)
// Merge results
const mergedTests = [...httpSuite.tests, ...wsSuite.tests]
const mergedRoutes = [...httpSuite.routes, ...wsSuite.routes]
const mergedSummary = {
passed: httpSuite.summary.passed + wsSuite.summary.passed,
failed: httpSuite.summary.failed + wsSuite.summary.failed,
skipped: httpSuite.summary.skipped + wsSuite.summary.skipped,
timeMs: httpSuite.summary.timeMs + wsSuite.summary.timeMs,
cacheHits: httpSuite.summary.cacheHits + wsSuite.summary.cacheHits,
cacheMisses: httpSuite.summary.cacheMisses + wsSuite.summary.cacheMisses,
}
// Loud failure on empty discovery
if (mergedTests.length === 0) {
const routes = discoverRoutes(fastify as unknown as { routes?: Array<{ method: string; url: string; schema?: Record<string, unknown> }> })
if (routes.length === 0) {
throw new Error(
'No routes discovered. Did you register APOPHIS before defining routes? ' +
'APOPHIS must be registered via `await fastify.register(apophis)` before any routes are defined.'
)
}
}
return {
tests: mergedTests,
summary: mergedSummary,
routes: mergedRoutes,
}
}
const buildStateful = (fastify: FastifyInstance, scope: ScopeRegistry, cleanupManager: CleanupManager) => async (opts: TestConfig = {}): Promise<TestSuite> => {
const config = {
depth: opts.depth ?? 'standard',
scope: opts.scope,
seed: opts.seed,
}
const injectInstance = fastify as unknown as import('../types.js').FastifyInjectInstance
// Run HTTP stateful tests
const httpSuite = await runStatefulTests(injectInstance, config, cleanupManager, scope)
// Run WebSocket stateful tests (reconnection, error handling)
const wsSuite = await runWebSocketStatefulTests(injectInstance, config, scope)
// Merge results (same pattern as contract())
// ...
return {
tests: [...httpSuite.tests, ...wsSuite.tests],
summary: { /* merged */ },
routes: [...httpSuite.routes, ...wsSuite.routes],
}
}
13.2 New runWebSocketStatefulTests
export const runWebSocketStatefulTests = async (
fastify: FastifyInjectInstance,
config: TestConfig,
scopeRegistry?: ScopeRegistry
): Promise<TestSuite> => {
const startTime = Date.now()
const depth = resolveDepth(config.depth ?? 'standard')
const allRoutes = discoverRoutes(fastify)
const wsRoutes = allRoutes.filter(r => r.ws !== undefined)
if (wsRoutes.length === 0) {
return {
tests: [],
summary: { passed: 0, failed: 0, skipped: 0, timeMs: 0, cacheHits: 0, cacheMisses: 0 },
routes: [],
}
}
const scopeHeaders = scopeRegistry?.getHeaders(config.scope ?? null) ?? {}
const results: TestResult[] = []
let testId = 0
for (const route of wsRoutes) {
// Test 1: Connection lifecycle
testId++
const lifecycleResults = await testConnectionLifecycle(route, scopeHeaders)
results.push(...lifecycleResults.map((r, i) => ({ ...r, id: testId + i })))
testId += lifecycleResults.length
// Test 2: Reconnection
testId++
const reconnectResults = await testReconnection(route, scopeHeaders)
results.push(...reconnectResults.map((r, i) => ({ ...r, id: testId + i })))
testId += reconnectResults.length
// Test 3: Error handling
testId++
const errorResults = await testErrorHandling(route, route.ws!, scopeHeaders)
results.push(...errorResults.map((r, i) => ({ ...r, id: testId + i })))
testId += errorResults.length
// Test 4: Message sequence fuzzing (fast-check)
const numRuns = depth.statefulRuns
const prop = fc.asyncProperty(
fc.array(fc.constantFrom(...generateValidMessages(route.ws!)), { minLength: 1, maxLength: depth.maxCommands }),
async (messages) => {
const conn = await establishConnection(fastify, route, scopeHeaders)
try {
for (const msg of messages) {
await sendMessage(conn, msg)
const response = await receiveMessage(conn, { type: '*' }, 1000)
// Validate response
}
return true
} finally {
await closeConnection(conn)
}
}
)
try {
await fc.assert(prop, { numRuns, seed: config.seed })
} catch (err) {
// Format counterexample...
}
}
const passed = results.filter(r => r.ok && r.directive === undefined).length
const failed = results.filter(r => !r.ok).length
const skipped = results.filter(r => r.directive !== undefined).length
return {
tests: results,
summary: { passed, failed, skipped, timeMs: Date.now() - startTime, cacheHits: 0, cacheMisses: 0 },
routes: allRoutes.map(r => ({
path: r.path,
method: r.method,
status: r.ws !== undefined ? 'tested' : 'no-contract',
})),
}
}
14. Category Inference for WebSockets (src/domain/category.ts)
export const inferCategory = (
path: string,
method: string,
override: string | undefined,
isWebSocket: boolean = false // <-- NEW parameter
): OperationCategory => {
if (override !== undefined && override !== '') {
return override as OperationCategory
}
// WebSocket upgrades default to observer (read-only stream)
if (isWebSocket) {
return 'observer'
}
// ... existing logic ...
}
15. Runtime Validation Hooks for WebSockets
15.1 Modified Hook Validator (src/infrastructure/hook-validator.ts)
WebSocket runtime validation is more complex because messages arrive asynchronously. We validate:
- On upgrade (
onRoutewithwebsocket: true): Validatex-requireson the HTTP upgrade request. - On each message: Validate message schema against
x-ws-messages. - On state change: Validate
x-ws-transitions. - On close: Validate
x-ensureson the final connection state.
// Add to registerValidationHooks:
if (opts.validateRuntime) {
fastify.addHook('preHandler', createPreHandler(opts))
fastify.addHook('preSerialization', (_request, reply, payload, done) => {
reply[kApophisPayload] = payload
done()
})
fastify.addHook('onSend', createOnSend(opts))
// WebSocket-specific hooks
fastify.addHook('onRoute', (routeOptions) => {
const contract = (routeOptions.config as Record<string, unknown>)?.apophisContract as RouteContract | undefined
if (contract?.ws && contract.validateRuntime) {
// Register WS validation middleware
registerWebSocketValidation(fastify, routeOptions, contract.ws, opts)
}
})
}
15.2 WebSocket Validation Middleware
const registerWebSocketValidation = (
fastify: FastifyInstance,
routeOptions: any,
wsContract: WebSocketContract,
opts: HookOptions
): void => {
// This integrates with @fastify/websocket's connection handler
const originalHandler = routeOptions.handler
routeOptions.handler = async (connection: any, req: any) => {
const wsConnection: WebSocketConnection = {
id: `ws-${Date.now()}`,
url: req.url,
headers: req.headers as Record<string, string>,
state: 'open',
messages: [],
}
// Validate upgrade preconditions
const upgradeCtx = buildUpgradeContext(wsConnection, { method: 'GET', path: req.url })
try {
validateFormulas(wsContract.requires, upgradeCtx)
} catch (err) {
if (opts.runtimeLevel === 'error') {
connection.socket.close(1008, 'Policy violation: precondition failed')
return
}
console.warn(`WS precondition warning: ${err instanceof Error ? err.message : String(err)}`)
}
// Wrap message handler
const originalOnMessage = connection.socket.on
connection.socket.on = function(event: string, handler: Function) {
if (event === 'message') {
return originalOnMessage.call(this, event, (data: any) => {
const msg = parseWebSocketMessage(data)
wsConnection.messages = [...wsConnection.messages, msg]
// Validate message schema
const schemaValidation = validateMessageSchema(msg, wsContract.messages)
if (!schemaValidation.valid) {
if (opts.runtimeLevel === 'error') {
connection.socket.close(1008, `Invalid message: ${schemaValidation.error}`)
return
}
console.warn(`WS schema warning: ${schemaValidation.error}`)
}
// Validate state transition
const transition = wsContract.transitions.find(t =>
t.from === wsConnection.state && t.trigger === msg.type
)
if (transition) {
wsConnection.state = transition.to
} else {
const validTriggers = wsContract.transitions
.filter(t => t.from === wsConnection.state)
.map(t => t.trigger)
if (opts.runtimeLevel === 'error') {
connection.socket.close(1008, `Invalid transition from ${wsConnection.state}. Valid: ${validTriggers.join(', ')}`)
return
}
console.warn(`WS transition warning: invalid transition from ${wsConnection.state} via ${msg.type}`)
}
return handler(data)
})
}
return originalOnMessage.call(this, event, handler)
}
// Validate postconditions on close
connection.socket.on('close', () => {
wsConnection.state = 'closed'
const finalCtx = buildFinalContext(wsConnection, { method: 'GET', path: req.url })
try {
validateFormulas(wsContract.ensures, finalCtx)
} catch (err) {
if (opts.runtimeLevel === 'error') {
console.error(`WS postcondition error: ${err instanceof Error ? err.message : String(err)}`)
} else {
console.warn(`WS postcondition warning: ${err instanceof Error ? err.message : String(err)}`)
}
}
})
return originalHandler(connection, req)
}
}
16. Example: Complete WebSocket Route with Contracts
import fastify from 'fastify'
import websocket from '@fastify/websocket'
import apophis from 'apophis-fastify'
const app = fastify()
await app.register(websocket)
await app.register(apophis, {
runtime: 'error', // Enforce contracts at runtime
})
app.get('/ws/notifications', {
websocket: true,
schema: {
querystring: {
type: 'object',
properties: {
userId: { type: 'string', pattern: '^[a-zA-Z0-9_-]+$' }
},
required: ['userId']
},
'x-ws-messages': [
{
type: 'auth',
direction: 'outgoing',
schema: {
type: 'object',
properties: {
token: { type: 'string', minLength: 32 }
},
required: ['token']
},
required: true
},
{
type: 'auth_success',
direction: 'incoming',
schema: {
type: 'object',
properties: {
status: { type: 'string', const: 'authenticated' },
userId: { type: 'string' }
},
required: ['status', 'userId']
}
},
{
type: 'subscribe',
direction: 'outgoing',
schema: {
type: 'object',
properties: {
channels: {
type: 'array',
items: { type: 'string', enum: ['alerts', 'messages', 'system'] },
minItems: 1
}
},
required: ['channels']
}
},
{
type: 'notification',
direction: 'incoming',
schema: {
type: 'object',
properties: {
channel: { type: 'string' },
data: { type: 'object' },
timestamp: { type: 'string', format: 'date-time' }
},
required: ['channel', 'data', 'timestamp']
}
},
{
type: 'ping',
direction: 'outgoing',
schema: {
type: 'object',
properties: {
timestamp: { type: 'number' }
}
}
},
{
type: 'pong',
direction: 'incoming',
schema: {
type: 'object',
properties: {
timestamp: { type: 'number' }
}
}
}
],
'x-ws-transitions': [
{ from: 'open', to: 'authenticating', trigger: 'auth' },
{ from: 'authenticating', to: 'ready', trigger: 'auth_success' },
{ from: 'ready', to: 'subscribed', trigger: 'subscribe' },
{ from: 'subscribed', to: 'subscribed', trigger: 'notification' },
{ from: 'subscribed', to: 'subscribed', trigger: 'ping' },
{ from: 'subscribed', to: 'subscribed', trigger: 'pong' }
],
'x-requires': [
'request_headers(this).authorization != null',
'query_params(this).userId matches "^[a-zA-Z0-9_-]+$"'
],
'x-ensures': [
'ws_state(this) == "ready" => previous(ws_message(this).type) == "auth"',
'ws_message(this).type == "notification" => ws_state(this) == "subscribed"'
],
'x-invariants': [
'for msg in ws_message(this): msg.direction == "incoming" || msg.direction == "outgoing"',
'ws_state(this) != "error"'
],
'x-validate-runtime': true
}
}, (connection, req) => {
const userId = (req.query as any).userId
connection.socket.on('message', (raw) => {
const msg = JSON.parse(raw.toString())
switch (msg.type) {
case 'auth':
validateToken(msg.token)
connection.socket.send(JSON.stringify({
type: 'auth_success',
status: 'authenticated',
userId
}))
break
case 'subscribe':
subscribeToChannels(userId, msg.channels)
break
case 'ping':
connection.socket.send(JSON.stringify({
type: 'pong',
timestamp: msg.timestamp
}))
break
}
})
// Simulate notifications
const interval = setInterval(() => {
connection.socket.send(JSON.stringify({
type: 'notification',
channel: 'alerts',
data: { message: 'New alert!' },
timestamp: new Date().toISOString()
}))
}, 5000)
connection.socket.on('close', () => {
clearInterval(interval)
})
})
// Run tests
const suite = await app.apophis.contract()
console.log(`Tests: ${suite.summary.passed} passed, ${suite.summary.failed} failed`)
17. Testing the WebSocket Extension
17.1 Unit Tests
// src/test/ws-runner.test.ts
import { test } from 'node:test'
import assert from 'node:assert'
import { runWebSocketTests } from '../test/ws-runner.js'
import { extractContract } from '../domain/contract.js'
test('ws-runner: validates message schema', async () => {
const route = extractContract('/ws/test', 'GET', {
'x-ws-messages': [
{
type: 'hello',
direction: 'incoming',
schema: { type: 'object', properties: { name: { type: 'string' } } }
}
]
}, true)
assert.ok(route.ws)
assert.equal(route.ws!.messages.length, 1)
})
test('ws-runner: detects invalid state transition', async () => {
const transitions = [
{ from: 'open', to: 'ready', trigger: 'auth' }
]
const result = validateStateTransition('open', 'subscribe', transitions)
assert.equal(result.valid, false)
assert.ok(result.error!.includes('Invalid transition'))
})
test('ws-runner: validates APOSTL ws_message operation', async () => {
const formula = 'ws_message(this).type == "auth"'
const ast = parse(formula)
assert.equal(ast.ast.type, 'operation')
assert.equal((ast.ast as any).header, 'ws_message')
})
17.2 Integration Tests
// src/test/integration.test.ts (additions)
test('integration: WebSocket contract testing', async () => {
const app = fastify()
await app.register(websocket)
await app.register(apophis)
app.get('/ws/echo', {
websocket: true,
schema: {
'x-ws-messages': [
{
type: 'echo',
direction: 'outgoing',
schema: { type: 'object', properties: { text: { type: 'string' } } }
},
{
type: 'echo_response',
direction: 'incoming',
schema: { type: 'object', properties: { text: { type: 'string' } } }
}
],
'x-ws-transitions': [
{ from: 'open', to: 'ready', trigger: 'echo' }
],
'x-ensures': [
'ws_message(this).type == "echo_response" => ws_message(this).payload.text == previous(ws_message(this).payload.text)'
]
}
}, (connection) => {
connection.socket.on('message', (raw) => {
const msg = JSON.parse(raw.toString())
connection.socket.send(JSON.stringify({
type: 'echo_response',
text: msg.text
}))
})
})
await app.ready()
const suite = await app.apophis.contract()
assert.ok(suite.summary.passed > 0)
assert.equal(suite.summary.failed, 0)
})
18. Migration Guide
18.1 For Existing APOPHIS Users
No breaking changes. Existing HTTP-only routes continue to work unchanged. WebSocket support is additive.
18.2 Enabling WebSocket Testing
-
Install
@fastify/websocket:npm install @fastify/websocket -
Register the WebSocket plugin before APOPHIS:
await app.register(websocket) await app.register(apophis) -
Add
websocket: trueto route options:app.get('/ws/stream', { websocket: true, schema: { ... } }, handler) -
Add
x-ws-messagesandx-ws-transitionsto route schema. -
Run tests normally:
const suite = await app.apophis.contract() // Tests both HTTP and WS
19. Performance Considerations
| Concern | Mitigation |
|---|---|
| WS connections are expensive | Reuse connections across test runs; pool WS clients |
| Message timeouts | Configurable timeout per message type (default: 5s) |
| Sequence explosion | Limit sequence length to depth.maxCommands |
| Memory leaks | Always close connections; use try/finally |
| Fastify inject doesn't support WS | Require fastify.listen() for WS tests; skip if not listening |
20. Security Considerations
| Concern | Mitigation |
|---|---|
| WS messages may contain secrets | Redact authorization, token, password fields in diagnostics |
| ReDoS in message schema patterns | Reuse existing regex-guard.ts for pattern validation |
| Connection flooding | Limit concurrent WS connections in test runner |
| Malformed binary messages | Only support JSON messages in v1; reject binary with clear error |
21. Open Questions
- Binary WebSocket frames: Should v1 support binary (ArrayBuffer/Buffer) messages, or only JSON?
- WebSocket subprotocols: How to handle
Sec-WebSocket-Protocolnegotiation in contracts? - Multiple WebSocket routes: Should we support route prefixes (e.g.,
/ws/v1/*,/ws/v2/*) with different contracts? - Server-Sent Events (SSE): Should this extension also cover SSE, or is that a separate spec?
- WebSocket compression: Should contracts specify
permessage-deflateexpectations?
22. Appendix: Complete Type Reference
// All WebSocket types are exported from src/types.ts
export type {
WebSocketMessage,
WebSocketConnection,
WebSocketState,
WebSocketMessageSchema,
WebSocketStateTransition,
WebSocketContract,
WebSocketEvalContext,
WebSocketTestResult,
WebSocketTestDiagnostics,
} from './types.js'
23. File Change Summary
| File | Lines Added | Lines Modified | Description |
|---|---|---|---|
src/types.ts |
~120 | ~10 | Add all WS types to public API |
src/domain/contract.ts |
~40 | ~15 | Extract WebSocketContract from schema |
src/domain/discovery.ts |
~5 | ~10 | Detect websocket: true flag |
src/formula/parser.ts |
~25 | ~5 | Parse ws_message and ws_state |
src/formula/evaluator.ts |
~15 | ~5 | Evaluate WS operations |
src/test/ws-runner.ts |
~350 | — | Main WS test runner |
src/test/ws-client.ts |
~120 | — | WS client wrapper |
src/domain/ws-contract-validation.ts |
~80 | — | WS-specific validation |
src/plugin/index.ts |
~60 | ~20 | Wire WS into contract()/stateful() |
src/domain/category.ts |
~5 | ~5 | Default WS to 'observer' |
src/infrastructure/hook-validator.ts |
~80 | ~10 | Runtime WS validation |
| Total | ~900 | ~80 |
Specification version: 1.0.0 Target APOPHIS version: 1.0.0 Last updated: 2025-01-09