Files

53 KiB

APOPHIS WebSocket Extension — Technical Specification

1. Overview

This specification extends APOPHIS v1.0 with first-class WebSocket contract testing. WebSockets differ fundamentally from HTTP:

Dimension HTTP WebSocket
Connection Ephemeral, per-request Persistent, bidirectional
Protocol Request/response pairs Message-oriented streams
Metadata Status codes, headers, body No status codes; headers only at handshake
Lifecycle Stateless (per request) Stateful (connection-scoped)
Errors HTTP status codes Connection close codes + application-level errors

APOPHIS must treat WebSocket routes as first-class citizens in the contract/stateful testing pipeline without breaking existing HTTP-only workflows.


2. Goals

  1. Annotate WebSocket routes in Fastify using @fastify/websocket (or equivalent) with APOPHIS contract annotations.
  2. Define message-schema contracts (what messages may be sent/received) and state-machine contracts (valid transitions between connection states).
  3. Provide a dedicated ws-runner.ts for WebSocket contract validation.
  4. Validate message sequences (e.g., AUTHREADYSUBSCRIBEDATA).
  5. Support stateful testing for connection lifecycle, reconnection, and error handling.
  6. Extend APOSTL with ws_message and ws_state operations.
  7. Integrate seamlessly with existing contract() and stateful() methods.

3. Non-Goals

  • WebSocket performance/load testing (out of scope; use autocannon or k6).
  • Browser-based WebSocket testing (APOPHIS is server-side only).
  • Raw TCP/WebSocket frame inspection (we operate at the message level).

4. Architecture

4.1 High-Level Flow

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  Fastify Route  │────▶│  WS Contract     │────▶│  ws-runner.ts   │
│  (with ws opts) │     │  Extraction      │     │  (validation)   │
└─────────────────┘     └──────────────────┘     └─────────────────┘
                              │                           │
                              ▼                           ▼
                    ┌──────────────────┐        ┌─────────────────┐
                    │  Message Schema  │        │  APOSTL Eval     │
                    │  (x-ws-messages) │        │  (ws_message,    │
                    └──────────────────┘        │   ws_state)      │
                                                └─────────────────┘

4.2 File Additions & Modifications

File Action Purpose
src/types.ts Modify Add WS-specific types to public API
src/domain/contract.ts Modify Extract WS contracts from route schema
src/domain/discovery.ts Modify Discover WebSocket routes alongside HTTP
src/formula/parser.ts Modify Add ws_message, ws_state to APOSTL grammar
src/formula/evaluator.ts Modify Evaluate ws_message and ws_state nodes
src/test/ws-runner.ts Create Dedicated WebSocket test runner
src/test/ws-client.ts Create Thin WebSocket client wrapper for testing
src/domain/ws-contract-validation.ts Create WS-specific validation logic
src/plugin/index.ts Modify Wire WS runner into contract() / stateful()
src/domain/category.ts Modify Categorize WS routes (default: observer for GET upgrades)

5. Type Changes (src/types.ts)

5.1 New Types

// ============================================================================
// WebSocket: Message Types
// ============================================================================

export interface WebSocketMessage {
  readonly direction: 'incoming' | 'outgoing'
  readonly type: string           // e.g., 'auth', 'ready', 'data', 'error'
  readonly payload: unknown
  readonly timestamp: number
}

export interface WebSocketConnection {
  readonly id: string
  readonly url: string
  readonly headers: Record<string, string>
  readonly state: WebSocketState
  readonly messages: ReadonlyArray<WebSocketMessage>
  readonly closeCode?: number
  readonly closeReason?: string
}

export type WebSocketState =
  | 'connecting'
  | 'open'
  | 'authenticating'
  | 'ready'
  | 'subscribed'
  | 'closing'
  | 'closed'
  | 'error'

// ============================================================================
// WebSocket: Contract Annotations
// ============================================================================

export interface WebSocketMessageSchema {
  readonly type: string
  readonly direction: 'incoming' | 'outgoing'
  readonly schema: Record<string, unknown>   // JSON Schema for payload
  readonly required?: boolean
}

export interface WebSocketStateTransition {
  readonly from: WebSocketState
  readonly to: WebSocketState
  readonly trigger: string      // message type that triggers transition
  readonly guard?: string       // APOSTL guard formula
}

export interface WebSocketContract {
  readonly path: string
  readonly method: 'GET'        // WS always upgrades from GET
  readonly category: OperationCategory
  readonly messages: WebSocketMessageSchema[]
  readonly transitions: WebSocketStateTransition[]
  readonly requires: string[]   // APOSTL preconditions (e.g., auth token in headers)
  readonly ensures: string[]    // APOSTL postconditions on final state
  readonly invariants: string[] // APOSTL invariants over message sequence
  readonly validateRuntime: boolean
}

// ============================================================================
// WebSocket: EvalContext Extension
// ============================================================================

export interface WebSocketEvalContext {
  readonly connection: WebSocketConnection
  readonly message: WebSocketMessage
  readonly state: WebSocketState
  readonly previousMessage?: WebSocketMessage
}

// ============================================================================
// WebSocket: Test Results
// ============================================================================

export interface WebSocketTestResult {
  readonly ok: boolean
  readonly name: string
  readonly id: number
  readonly connectionId: string
  readonly directive?: string
  readonly diagnostics?: WebSocketTestDiagnostics
}

export interface WebSocketTestDiagnostics {
  readonly error?: string
  readonly violation?: ContractViolation
  readonly expectedSequence?: string[]
  readonly actualSequence?: string[]
  readonly stateTransition?: {
    readonly from: WebSocketState
    readonly to: WebSocketState
    readonly expectedTrigger: string
    readonly actualTrigger: string
  }
}

5.2 Modified Types

// In RouteContract, add optional ws field
export interface RouteContract {
  path: string
  method: string
  category: OperationCategory
  requires: string[]
  ensures: string[]
  invariants: string[]
  regexPatterns: Record<string, string>
  validateRuntime: boolean
  schema?: Record<string, unknown>
  ws?: WebSocketContract   // <-- NEW: present if route is a WebSocket upgrade
}

// In EvalContext, add optional ws field
export interface EvalContext {
  readonly request: { /* ... */ }
  readonly response: { /* ... */ }
  readonly previous?: EvalContext
  readonly ws?: WebSocketEvalContext   // <-- NEW: populated during WS testing
}

// In OperationHeader, add ws operations
export type OperationHeader =
  | 'request_body' | 'response_body' | 'response_code'
  | 'request_headers' | 'response_headers' | 'query_params'
  | 'cookies' | 'response_time'
  | 'ws_message' | 'ws_state'           // <-- NEW

6. Fastify WebSocket Route Annotation

6.1 Route Definition (Example)

Using @fastify/websocket:

import fastify from 'fastify'
import websocket from '@fastify/websocket'
import apophis from 'apophis-fastify'

const app = fastify()

await app.register(websocket)
await app.register(apophis)

// WebSocket route with APOPHIS contract annotations
app.get('/ws/events', {
  websocket: true,
  schema: {
    // Standard HTTP schema for the upgrade request
    querystring: {
      type: 'object',
      properties: {
        stream: { type: 'string', enum: ['live', 'snapshot'] }
      }
    },
    // WebSocket-specific contract annotations
    'x-ws-messages': [
      {
        type: 'auth',
        direction: 'outgoing',
        schema: {
          type: 'object',
          properties: {
            token: { type: 'string' }
          },
          required: ['token']
        },
        required: true
      },
      {
        type: 'ready',
        direction: 'incoming',
        schema: {
          type: 'object',
          properties: {
            status: { type: 'string', const: 'ready' }
          }
        }
      },
      {
        type: 'subscribe',
        direction: 'outgoing',
        schema: {
          type: 'object',
          properties: {
            channels: { type: 'array', items: { type: 'string' } }
          }
        }
      },
      {
        type: 'data',
        direction: 'incoming',
        schema: {
          type: 'object',
          properties: {
            channel: { type: 'string' },
            payload: { type: 'object' }
          }
        }
      }
    ],
    'x-ws-transitions': [
      { from: 'open', to: 'authenticating', trigger: 'auth' },
      { from: 'authenticating', to: 'ready', trigger: 'ready' },
      { from: 'ready', to: 'subscribed', trigger: 'subscribe' },
      { from: 'subscribed', to: 'subscribed', trigger: 'data' }
    ],
    'x-requires': [
      'request_headers(this).authorization != null'
    ],
    'x-ensures': [
      'ws_state(this) == "ready"',
      'previous(ws_message(this).type) == "auth" && ws_message(this).type == "ready"'
    ],
    'x-invariants': [
      'for msg in ws_message(this): msg.direction == "incoming" || msg.direction == "outgoing"'
    ]
  }
}, (connection, req) => {
  // Handler implementation
  connection.socket.on('message', (raw) => {
    const msg = JSON.parse(raw.toString())
    // ... handle messages
  })
})

6.2 Schema Annotation Reference

Annotation Type Description
x-ws-messages WebSocketMessageSchema[] Defines valid message types, directions, and payload schemas
x-ws-transitions WebSocketStateTransition[] Defines valid state machine transitions
x-requires string[] APOSTL preconditions evaluated on the HTTP upgrade request
x-ensures string[] APOSTL postconditions evaluated after connection close or per-message
x-invariants string[] APOSTL invariants checked after every message
x-category string Override category (default: observer for WS upgrades)
x-validate-runtime boolean Enable runtime validation of WS contracts

7. Contract Extraction (src/domain/contract.ts)

7.1 Modified extractContract

export const extractContract = (
  path: string,
  method: string,
  schema: Record<string, unknown> | undefined,
  isWebSocket: boolean = false          // <-- NEW parameter
): RouteContract => {
  const s = schema ?? {}
  // ... existing cache logic ...

  const override = typeof s['x-category'] === 'string' ? s['x-category'] : undefined
  // For WS routes, default to 'observer' unless overridden
  const category = isWebSocket
    ? (override as OperationCategory ?? 'observer')
    : inferCategory(path, method, override)

  // ... existing requires/ensures extraction ...

  const contract: RouteContract = {
    path,
    method: method.toUpperCase(),
    category,
    requires,
    ensures,
    invariants: EMPTY_INVARIANTS,
    regexPatterns: {},
    validateRuntime,
    schema: s,
  }

  // If WebSocket, extract WS-specific contract
  if (isWebSocket) {
    contract.ws = extractWebSocketContract(s)
  }

  // ... cache and return ...
  return contract
}

7.2 New extractWebSocketContract

const extractWebSocketContract = (
  schema: Record<string, unknown>
): WebSocketContract | undefined => {
  const messages = schema['x-ws-messages']
  const transitions = schema['x-ws-transitions']

  if (!Array.isArray(messages) || messages.length === 0) {
    return undefined
  }

  const requires = schema['x-requires']
  const ensures = schema['x-ensures']
  const invariants = schema['x-invariants']
  const validateRuntime = schema['x-validate-runtime'] !== false

  return {
    path: '',   // populated by caller
    method: 'GET',
    category: 'observer',
    messages: messages as WebSocketMessageSchema[],
    transitions: (transitions as WebSocketStateTransition[]) ?? [],
    requires: Array.isArray(requires) ? requires as string[] : [],
    ensures: Array.isArray(ensures) ? ensures as string[] : [],
    invariants: Array.isArray(invariants) ? invariants as string[] : [],
    validateRuntime,
  }
}

8. Route Discovery (src/domain/discovery.ts)

8.1 Modified captureRoute

export const captureRoute = (
  instance: object,
  route: CapturedRoute & { websocket?: boolean }   // <-- NEW: websocket flag
): void => {
  const existing = capturedRoutes.get(instance) ?? []
  existing.push(route)
  capturedRoutes.set(instance, existing)
}

8.2 Modified discoverRoutes

export const discoverRoutes = (
  instance: {
    routes?: Array<{
      method: string
      url: string
      schema?: Record<string, unknown>
      websocket?: boolean
    }>
  }
): RouteContract[] => {
  const captured = capturedRoutes.get(instance)
  if (captured && captured.length > 0) {
    return captured.map((route) =>
      extractContract(route.url, route.method, route.schema, route.websocket ?? false)
    )
  }

  if (Array.isArray(instance.routes) && instance.routes.length > 0) {
    return instance.routes.map((route) =>
      extractContract(route.url, route.method, route.schema, route.websocket ?? false)
    )
  }

  return []
}

8.3 Plugin Hook Modification (src/plugin/index.ts:114-138)

fastify.addHook('onRoute', (routeOptions) => {
  const method = Array.isArray(routeOptions.method)
    ? routeOptions.method.join(',')
    : routeOptions.method
  const schema = routeOptions.schema as Record<string, unknown> | undefined
  const prefix = (routeOptions as unknown as Record<string, unknown>).prefix as string | undefined
  const url = prefix && !routeOptions.url.startsWith(prefix)
    ? `${prefix}${routeOptions.url}`
    : routeOptions.url

  // Detect WebSocket routes
  const isWebSocket = (routeOptions as unknown as Record<string, unknown>).websocket === true

  captureRoute(fastify, {
    method,
    url,
    schema,
    prefix,
    websocket: isWebSocket,   // <-- NEW
  })

  const contract = extractContract(url, method, schema, isWebSocket)
  if (contract.validateRuntime && (contract.requires.length > 0 || contract.ensures.length > 0 || contract.ws !== undefined)) {
    const config = routeOptions.config as Record<string, unknown> || {}
    config.apophisContract = contract
    routeOptions.config = config as typeof routeOptions.config
  }
})

9. APOSTL Formula Extensions

9.1 New Operations

Operation Syntax Returns Description
ws_message(this) ws_message(this) WebSocketMessage Current message being evaluated
ws_message(this).type ws_message(this).type string Type of current message
ws_message(this).payload ws_message(this).payload unknown Payload of current message
ws_message(this).direction ws_message(this).direction 'incoming' | 'outgoing' Direction of current message
ws_state(this) ws_state(this) WebSocketState Current connection state
previous(ws_message(this)) previous(ws_message(this)) WebSocketMessage Previous message in sequence

9.2 Parser Changes (src/formula/parser.ts)

Add ws_message and ws_state to VALID_HEADERS:

const VALID_HEADERS: OperationHeader[] = [
  'request_body', 'response_body', 'response_code',
  'request_headers', 'response_headers', 'query_params', 'cookies', 'response_time',
  'ws_message', 'ws_state'   // <-- NEW
]

Add manual char-code parsing for ws_message (10 chars) and ws_state (8 chars) in parseOperation:

// In parseOperation, add after cookies check:
if (!header && p + 10 <= len) {
  const c0 = input.charCodeAt(p)
  const c1 = input.charCodeAt(p + 1)
  // ... check for 'ws_message' ...
  if (c0 === 119 && c1 === 115) {
    // ws_ prefix
    const c3 = input.charCodeAt(p + 3)
    if (c3 === 109) {
      // ws_message
      header = 'ws_message'
      headerLen = 10
    } else if (c3 === 115) {
      // ws_state
      header = 'ws_state'
      headerLen = 8
    }
  }
}

9.3 Evaluator Changes (src/formula/evaluator.ts)

Add cases to resolveOperation:

function resolveOperation(node: Extract<FormulaNode, { type: 'operation' }>, ctx: EvalContext): unknown {
  const { header, parameter, accessor } = node

  let target: unknown

  switch (header) {
    // ... existing cases ...
    case 'ws_message':
      target = ctx.ws?.message ?? null
      break
    case 'ws_state':
      target = ctx.ws?.state ?? null
      break
    default:
      throw new Error(`Unknown operation header: ${header}`)
  }

  // ... existing accessor logic ...
}

10. WebSocket Test Runner (src/test/ws-runner.ts)

10.1 Runner Interface

export interface WebSocketTestRunner {
  run(
    fastify: FastifyInjectInstance,
    config: TestConfig,
    scopeRegistry?: ScopeRegistry
  ): Promise<TestSuite>
}

10.2 Pseudocode

/**
 * WebSocket Contract Test Runner
 * Validates WS routes for:
 *   1. Message schema compliance (payload matches JSON Schema)
 *   2. State machine transitions (valid sequence of states)
 *   3. APOSTL preconditions (on upgrade request)
 *   4. APOSTL postconditions (on connection close or per-message)
 *   5. APOSTL invariants (over entire message sequence)
 */

export const runWebSocketTests = async (
  fastify: FastifyInjectInstance,
  config: TestConfig,
  scopeRegistry?: ScopeRegistry
): Promise<TestSuite> => {
  const startTime = Date.now()
  const depth = resolveDepth(config.depth ?? 'standard')

  // 1. Discover WS routes
  const allRoutes = discoverRoutes(fastify)
  const wsRoutes = allRoutes.filter(r => r.ws !== undefined)

  if (wsRoutes.length === 0) {
    return {
      tests: [],
      summary: { passed: 0, failed: 0, skipped: 0, timeMs: 0, cacheHits: 0, cacheMisses: 0 },
      routes: [],
    }
  }

  const scopeHeaders = scopeRegistry?.getHeaders(config.scope ?? null) ?? {}
  const results: TestResult[] = []
  let testId = 0

  // 2. For each WS route, run test sequences
  for (const route of wsRoutes) {
    const wsContract = route.ws!
    const commandsPerRoute = Math.max(1, Math.floor(depth.contractRuns / Math.max(wsRoutes.length, 1)))

    for (let run = 0; run < commandsPerRoute; run++) {
      testId++
      const connectionId = `ws-${testId}`
      const name = `WS ${route.method} ${route.path} (#${testId})`

      // 3. Establish WebSocket connection
      let connection: WebSocketConnection
      try {
        connection = await establishConnection(fastify, route, scopeHeaders)
      } catch (err) {
        results.push({
          ok: false,
          name,
          id: testId,
          diagnostics: { error: `Connection failed: ${err instanceof Error ? err.message : String(err)}` }
        })
        continue
      }

      // 4. Validate upgrade request preconditions
      const upgradeCtx = buildUpgradeContext(connection, route)
      const preResult = validatePostconditions(wsContract.requires, upgradeCtx, {
        method: route.method,
        path: route.path,
      })
      if (!preResult.success) {
        results.push({
          ok: false,
          name,
          id: testId,
          diagnostics: {
            error: `Precondition failed: ${preResult.error}`,
            violation: preResult.violation,
          }
        })
        await closeConnection(connection)
        continue
      }

      // 5. Run message sequence
      const sequenceResult = await runMessageSequence(
        connection,
        wsContract,
        route,
        testId,
        scopeHeaders
      )
      results.push(...sequenceResult.results)

      // 6. Validate postconditions on final state
      const finalCtx = buildFinalContext(connection, route)
      const postResult = validatePostconditions(wsContract.ensures, finalCtx, {
        method: route.method,
        path: route.path,
      })
      if (!postResult.success) {
        testId++
        results.push({
          ok: false,
          name: `POST ${name}`,
          id: testId,
          diagnostics: {
            error: `Postcondition failed: ${postResult.error}`,
            violation: postResult.violation,
          }
        })
      }

      // 7. Check invariants over entire sequence
      const invariantResults = checkWebSocketInvariants(wsContract.invariants, connection, route)
      for (const inv of invariantResults) {
        if (!inv.success) {
          testId++
          results.push({
            ok: false,
            name: `INVARIANT: ${inv.name}`,
            id: testId,
            diagnostics: { error: inv.error }
          })
        }
      }

      await closeConnection(connection)
    }
  }

  // 8. Build TestSuite
  const passed = results.filter(r => r.ok && r.directive === undefined).length
  const failed = results.filter(r => !r.ok).length
  const skipped = results.filter(r => r.directive !== undefined).length

  return {
    tests: results,
    summary: { passed, failed, skipped, timeMs: Date.now() - startTime, cacheHits: 0, cacheMisses: 0 },
    routes: allRoutes.map(r => ({
      path: r.path,
      method: r.method,
      status: r.ws !== undefined ? 'tested' : 'no-contract',
    })),
  }
}

10.3 Message Sequence Validation

interface SequenceResult {
  results: TestResult[]
  connection: WebSocketConnection
}

const runMessageSequence = async (
  connection: WebSocketConnection,
  wsContract: WebSocketContract,
  route: RouteContract,
  baseTestId: number,
  scopeHeaders: Record<string, string>
): Promise<SequenceResult> => {
  const results: TestResult[] = []
  let currentState: WebSocketState = 'open'
  let testId = baseTestId

  // Generate a valid message sequence based on state machine
  const sequence = generateValidSequence(wsContract.transitions)

  for (const expectedMsg of sequence) {
    testId++
    const msgName = `MSG ${connection.id} ${expectedMsg.type} (#${testId})`

    // Send or receive message
    let actualMsg: WebSocketMessage
    try {
      if (expectedMsg.direction === 'outgoing') {
        actualMsg = await sendMessage(connection, expectedMsg)
      } else {
        actualMsg = await receiveMessage(connection, expectedMsg, 5000)
      }
    } catch (err) {
      results.push({
        ok: false,
        name: msgName,
        id: testId,
        diagnostics: {
          error: `Message exchange failed: ${err instanceof Error ? err.message : String(err)}`,
          expectedSequence: sequence.map(m => m.type),
        }
      })
      break
    }

    // Validate message schema
    const schemaValidation = validateMessageSchema(actualMsg, wsContract.messages)
    if (!schemaValidation.valid) {
      results.push({
        ok: false,
        name: msgName,
        id: testId,
        diagnostics: {
          error: `Schema violation: ${schemaValidation.error}`,
          violation: schemaValidation.violation,
        }
      })
      continue
    }

    // Validate state transition
    const transition = wsContract.transitions.find(t =>
      t.from === currentState && t.trigger === actualMsg.type
    )
    if (!transition) {
      results.push({
        ok: false,
        name: msgName,
        id: testId,
        diagnostics: {
          error: `Invalid state transition from ${currentState} via ${actualMsg.type}`,
          stateTransition: {
            from: currentState,
            to: currentState,
            expectedTrigger: expectedMsg.type,
            actualTrigger: actualMsg.type,
          }
        }
      })
      continue
    }

    // Update state
    currentState = transition.to

    // Evaluate APOSTL formulas for this message
    const msgCtx = buildMessageContext(connection, actualMsg, currentState)
    const postResult = validatePostconditions(wsContract.ensures, msgCtx, {
      method: route.method,
      path: route.path,
    })
    if (!postResult.success) {
      results.push({
        ok: false,
        name: msgName,
        id: testId,
        diagnostics: {
          error: `Postcondition failed: ${postResult.error}`,
          violation: postResult.violation,
        }
      })
      continue
    }

    results.push({ ok: true, name: msgName, id: testId })
  }

  return { results, connection }
}

10.4 Sequence Generation

const generateValidSequence = (
  transitions: WebSocketStateTransition[]
): Array<{ type: string; direction: 'incoming' | 'outgoing' }> => {
  // Build adjacency list
  const adj = new Map<WebSocketState, WebSocketStateTransition[]>()
  for (const t of transitions) {
    const existing = adj.get(t.from) ?? []
    existing.push(t)
    adj.set(t.from, existing)
  }

  // BFS/DFS to find a path from 'open' to a terminal state
  const sequence: Array<{ type: string; direction: 'incoming' | 'outgoing' }> = []
  let current: WebSocketState = 'open'
  const visited = new Set<string>()

  while (true) {
    const options = adj.get(current) ?? []
    const unvisited = options.filter(t => !visited.has(`${t.from}-${t.to}-${t.trigger}`))

    if (unvisited.length === 0) break

    const choice = unvisited[0]!
    visited.add(`${choice.from}-${choice.to}-${choice.trigger}`)

    // Determine direction based on typical patterns
    // (outgoing = client→server, incoming = server→client)
    const direction: 'incoming' | 'outgoing' =
      ['auth', 'subscribe', 'ping'].includes(choice.trigger) ? 'outgoing' : 'incoming'

    sequence.push({ type: choice.trigger, direction })
    current = choice.to

    // Prevent infinite loops
    if (sequence.length > 50) break
  }

  return sequence
}

11. WebSocket Client (src/test/ws-client.ts)

Thin wrapper around ws or native WebSocket for testing:

import WebSocket from 'ws'

export const establishConnection = async (
  fastify: FastifyInjectInstance,
  route: RouteContract,
  headers: Record<string, string>
): Promise<WebSocketConnection> => {
  // Fastify inject doesn't support WS; we need the actual HTTP server
  const address = fastify.server?.address()
  if (!address || typeof address === 'string') {
    throw new Error('Fastify server must be listening for WebSocket tests')
  }

  const url = `ws://localhost:${address.port}${route.path}`
  const ws = new WebSocket(url, { headers })

  return new Promise((resolve, reject) => {
    const connection: WebSocketConnection = {
      id: `ws-${Date.now()}-${Math.random().toString(36).slice(2)}`,
      url,
      headers,
      state: 'connecting',
      messages: [],
    }

    ws.on('open', () => {
      connection.state = 'open'
      resolve(connection)
    })

    ws.on('error', (err) => {
      connection.state = 'error'
      reject(err)
    })

    ws.on('message', (data) => {
      const msg: WebSocketMessage = {
        direction: 'incoming',
        type: inferMessageType(data),
        payload: parsePayload(data),
        timestamp: Date.now(),
      }
      connection.messages = [...connection.messages, msg]
    })

    ws.on('close', (code, reason) => {
      connection.state = 'closed'
      connection.closeCode = code
      connection.closeReason = reason.toString()
    })

    // Attach ws instance to connection for send/close operations
    (connection as any)._ws = ws
  })
}

export const sendMessage = async (
  connection: WebSocketConnection,
  msg: { type: string; payload?: unknown }
): Promise<WebSocketMessage> => {
  const ws = (connection as any)._ws as WebSocket
  const payload = JSON.stringify({ type: msg.type, ...msg.payload })
  ws.send(payload)

  const sent: WebSocketMessage = {
    direction: 'outgoing',
    type: msg.type,
    payload: msg.payload,
    timestamp: Date.now(),
  }
  connection.messages = [...connection.messages, sent]
  return sent
}

export const receiveMessage = async (
  connection: WebSocketConnection,
  expected: { type: string },
  timeoutMs: number
): Promise<WebSocketMessage> => {
  return new Promise((resolve, reject) => {
    const startTime = Date.now()
    const check = () => {
      const lastMsg = connection.messages[connection.messages.length - 1]
      if (lastMsg && lastMsg.direction === 'incoming' && lastMsg.type === expected.type) {
        resolve(lastMsg)
        return
      }
      if (Date.now() - startTime > timeoutMs) {
        reject(new Error(`Timeout waiting for message type: ${expected.type}`))
        return
      }
      setTimeout(check, 10)
    }
    check()
  })
}

export const closeConnection = async (
  connection: WebSocketConnection
): Promise<void> => {
  const ws = (connection as any)._ws as WebSocket
  ws.close()
  return new Promise((resolve) => {
    ws.on('close', () => resolve())
  })
}

12. Stateful Testing for WebSockets

12.1 Connection Lifecycle States

connecting → open → authenticating → ready → subscribed → closing → closed
                ↓         ↓              ↓          ↓
              error     error          error      error

12.2 Reconnection Testing

const testReconnection = async (
  route: RouteContract,
  scopeHeaders: Record<string, string>
): Promise<TestResult[]> => {
  const results: TestResult[] = []

  // Test 1: Clean reconnect after normal close
  const conn1 = await establishConnection(fastify, route, scopeHeaders)
  await runAuthSequence(conn1)
  await closeConnection(conn1)

  const conn2 = await establishConnection(fastify, route, scopeHeaders)
  const reconnectOk = conn2.state === 'open'
  results.push({
    ok: reconnectOk,
    name: `Reconnection after clean close`,
    id: 1,
    diagnostics: reconnectOk ? undefined : { error: 'Reconnection failed after clean close' }
  })

  // Test 2: Reconnect after error
  const conn3 = await establishConnection(fastify, route, scopeHeaders)
  await sendInvalidMessage(conn3) // Force error
  await closeConnection(conn3)

  const conn4 = await establishConnection(fastify, route, scopeHeaders)
  const errorReconnectOk = conn4.state === 'open'
  results.push({
    ok: errorReconnectOk,
    name: `Reconnection after error`,
    id: 2,
    diagnostics: errorReconnectOk ? undefined : { error: 'Reconnection failed after error' }
  })

  return results
}

12.3 Error Handling Testing

const testErrorHandling = async (
  route: RouteContract,
  wsContract: WebSocketContract,
  scopeHeaders: Record<string, string>
): Promise<TestResult[]> => {
  const results: TestResult[] = []

  // Test: Invalid message type
  const conn = await establishConnection(fastify, route, scopeHeaders)
  await sendMessage(conn, { type: 'invalid_type_xyz', payload: {} })

  const errorMsg = await waitForMessage(conn, 'error', 1000)
  const handledCorrectly = errorMsg !== null && errorMsg.payload !== undefined

  results.push({
    ok: handledCorrectly,
    name: `Error handling for invalid message type`,
    id: 1,
    diagnostics: handledCorrectly ? undefined : {
      error: 'Server did not respond with error message for invalid type'
    }
  })

  // Test: Message without required auth
  const conn2 = await establishConnection(fastify, route, {})
  await sendMessage(conn2, { type: 'subscribe', payload: { channels: ['test'] } })

  const authError = await waitForMessage(conn2, 'error', 1000)
  const authHandled = authError !== null

  results.push({
    ok: authHandled,
    name: `Error handling for unauthenticated subscribe`,
    id: 2,
    diagnostics: authHandled ? undefined : {
      error: 'Server allowed subscribe without auth'
    }
  })

  return results
}

13. Integration with contract() and stateful()

13.1 Modified Plugin (src/plugin/index.ts)

const buildContract = (fastify: FastifyInstance, scope: ScopeRegistry) => async (opts: TestConfig = {}): Promise<TestSuite> => {
  const config = {
    depth: opts.depth ?? 'standard',
    scope: opts.scope,
    seed: opts.seed,
  }
  const injectInstance = fastify as unknown as import('../types.js').FastifyInjectInstance

  // Run HTTP contract tests
  const httpSuite = await runPetitTests(injectInstance, config, scope)

  // Run WebSocket contract tests
  const wsSuite = await runWebSocketTests(injectInstance, config, scope)

  // Merge results
  const mergedTests = [...httpSuite.tests, ...wsSuite.tests]
  const mergedRoutes = [...httpSuite.routes, ...wsSuite.routes]
  const mergedSummary = {
    passed: httpSuite.summary.passed + wsSuite.summary.passed,
    failed: httpSuite.summary.failed + wsSuite.summary.failed,
    skipped: httpSuite.summary.skipped + wsSuite.summary.skipped,
    timeMs: httpSuite.summary.timeMs + wsSuite.summary.timeMs,
    cacheHits: httpSuite.summary.cacheHits + wsSuite.summary.cacheHits,
    cacheMisses: httpSuite.summary.cacheMisses + wsSuite.summary.cacheMisses,
  }

  // Loud failure on empty discovery
  if (mergedTests.length === 0) {
    const routes = discoverRoutes(fastify as unknown as { routes?: Array<{ method: string; url: string; schema?: Record<string, unknown> }> })
    if (routes.length === 0) {
      throw new Error(
        'No routes discovered. Did you register APOPHIS before defining routes? ' +
        'APOPHIS must be registered via `await fastify.register(apophis)` before any routes are defined.'
      )
    }
  }

  return {
    tests: mergedTests,
    summary: mergedSummary,
    routes: mergedRoutes,
  }
}

const buildStateful = (fastify: FastifyInstance, scope: ScopeRegistry, cleanupManager: CleanupManager) => async (opts: TestConfig = {}): Promise<TestSuite> => {
  const config = {
    depth: opts.depth ?? 'standard',
    scope: opts.scope,
    seed: opts.seed,
  }
  const injectInstance = fastify as unknown as import('../types.js').FastifyInjectInstance

  // Run HTTP stateful tests
  const httpSuite = await runStatefulTests(injectInstance, config, cleanupManager, scope)

  // Run WebSocket stateful tests (reconnection, error handling)
  const wsSuite = await runWebSocketStatefulTests(injectInstance, config, scope)

  // Merge results (same pattern as contract())
  // ...

  return {
    tests: [...httpSuite.tests, ...wsSuite.tests],
    summary: { /* merged */ },
    routes: [...httpSuite.routes, ...wsSuite.routes],
  }
}

13.2 New runWebSocketStatefulTests

export const runWebSocketStatefulTests = async (
  fastify: FastifyInjectInstance,
  config: TestConfig,
  scopeRegistry?: ScopeRegistry
): Promise<TestSuite> => {
  const startTime = Date.now()
  const depth = resolveDepth(config.depth ?? 'standard')

  const allRoutes = discoverRoutes(fastify)
  const wsRoutes = allRoutes.filter(r => r.ws !== undefined)

  if (wsRoutes.length === 0) {
    return {
      tests: [],
      summary: { passed: 0, failed: 0, skipped: 0, timeMs: 0, cacheHits: 0, cacheMisses: 0 },
      routes: [],
    }
  }

  const scopeHeaders = scopeRegistry?.getHeaders(config.scope ?? null) ?? {}
  const results: TestResult[] = []
  let testId = 0

  for (const route of wsRoutes) {
    // Test 1: Connection lifecycle
    testId++
    const lifecycleResults = await testConnectionLifecycle(route, scopeHeaders)
    results.push(...lifecycleResults.map((r, i) => ({ ...r, id: testId + i })))
    testId += lifecycleResults.length

    // Test 2: Reconnection
    testId++
    const reconnectResults = await testReconnection(route, scopeHeaders)
    results.push(...reconnectResults.map((r, i) => ({ ...r, id: testId + i })))
    testId += reconnectResults.length

    // Test 3: Error handling
    testId++
    const errorResults = await testErrorHandling(route, route.ws!, scopeHeaders)
    results.push(...errorResults.map((r, i) => ({ ...r, id: testId + i })))
    testId += errorResults.length

    // Test 4: Message sequence fuzzing (fast-check)
    const numRuns = depth.statefulRuns
    const prop = fc.asyncProperty(
      fc.array(fc.constantFrom(...generateValidMessages(route.ws!)), { minLength: 1, maxLength: depth.maxCommands }),
      async (messages) => {
        const conn = await establishConnection(fastify, route, scopeHeaders)
        try {
          for (const msg of messages) {
            await sendMessage(conn, msg)
            const response = await receiveMessage(conn, { type: '*' }, 1000)
            // Validate response
          }
          return true
        } finally {
          await closeConnection(conn)
        }
      }
    )

    try {
      await fc.assert(prop, { numRuns, seed: config.seed })
    } catch (err) {
      // Format counterexample...
    }
  }

  const passed = results.filter(r => r.ok && r.directive === undefined).length
  const failed = results.filter(r => !r.ok).length
  const skipped = results.filter(r => r.directive !== undefined).length

  return {
    tests: results,
    summary: { passed, failed, skipped, timeMs: Date.now() - startTime, cacheHits: 0, cacheMisses: 0 },
    routes: allRoutes.map(r => ({
      path: r.path,
      method: r.method,
      status: r.ws !== undefined ? 'tested' : 'no-contract',
    })),
  }
}

14. Category Inference for WebSockets (src/domain/category.ts)

export const inferCategory = (
  path: string,
  method: string,
  override: string | undefined,
  isWebSocket: boolean = false    // <-- NEW parameter
): OperationCategory => {
  if (override !== undefined && override !== '') {
    return override as OperationCategory
  }

  // WebSocket upgrades default to observer (read-only stream)
  if (isWebSocket) {
    return 'observer'
  }

  // ... existing logic ...
}

15. Runtime Validation Hooks for WebSockets

15.1 Modified Hook Validator (src/infrastructure/hook-validator.ts)

WebSocket runtime validation is more complex because messages arrive asynchronously. We validate:

  1. On upgrade (onRoute with websocket: true): Validate x-requires on the HTTP upgrade request.
  2. On each message: Validate message schema against x-ws-messages.
  3. On state change: Validate x-ws-transitions.
  4. On close: Validate x-ensures on the final connection state.
// Add to registerValidationHooks:
if (opts.validateRuntime) {
  fastify.addHook('preHandler', createPreHandler(opts))
  fastify.addHook('preSerialization', (_request, reply, payload, done) => {
    reply[kApophisPayload] = payload
    done()
  })
  fastify.addHook('onSend', createOnSend(opts))

  // WebSocket-specific hooks
  fastify.addHook('onRoute', (routeOptions) => {
    const contract = (routeOptions.config as Record<string, unknown>)?.apophisContract as RouteContract | undefined
    if (contract?.ws && contract.validateRuntime) {
      // Register WS validation middleware
      registerWebSocketValidation(fastify, routeOptions, contract.ws, opts)
    }
  })
}

15.2 WebSocket Validation Middleware

const registerWebSocketValidation = (
  fastify: FastifyInstance,
  routeOptions: any,
  wsContract: WebSocketContract,
  opts: HookOptions
): void => {
  // This integrates with @fastify/websocket's connection handler
  const originalHandler = routeOptions.handler

  routeOptions.handler = async (connection: any, req: any) => {
    const wsConnection: WebSocketConnection = {
      id: `ws-${Date.now()}`,
      url: req.url,
      headers: req.headers as Record<string, string>,
      state: 'open',
      messages: [],
    }

    // Validate upgrade preconditions
    const upgradeCtx = buildUpgradeContext(wsConnection, { method: 'GET', path: req.url })
    try {
      validateFormulas(wsContract.requires, upgradeCtx)
    } catch (err) {
      if (opts.runtimeLevel === 'error') {
        connection.socket.close(1008, 'Policy violation: precondition failed')
        return
      }
      console.warn(`WS precondition warning: ${err instanceof Error ? err.message : String(err)}`)
    }

    // Wrap message handler
    const originalOnMessage = connection.socket.on
    connection.socket.on = function(event: string, handler: Function) {
      if (event === 'message') {
        return originalOnMessage.call(this, event, (data: any) => {
          const msg = parseWebSocketMessage(data)
          wsConnection.messages = [...wsConnection.messages, msg]

          // Validate message schema
          const schemaValidation = validateMessageSchema(msg, wsContract.messages)
          if (!schemaValidation.valid) {
            if (opts.runtimeLevel === 'error') {
              connection.socket.close(1008, `Invalid message: ${schemaValidation.error}`)
              return
            }
            console.warn(`WS schema warning: ${schemaValidation.error}`)
          }

          // Validate state transition
          const transition = wsContract.transitions.find(t =>
            t.from === wsConnection.state && t.trigger === msg.type
          )
          if (transition) {
            wsConnection.state = transition.to
          } else {
            const validTriggers = wsContract.transitions
              .filter(t => t.from === wsConnection.state)
              .map(t => t.trigger)
            if (opts.runtimeLevel === 'error') {
              connection.socket.close(1008, `Invalid transition from ${wsConnection.state}. Valid: ${validTriggers.join(', ')}`)
              return
            }
            console.warn(`WS transition warning: invalid transition from ${wsConnection.state} via ${msg.type}`)
          }

          return handler(data)
        })
      }
      return originalOnMessage.call(this, event, handler)
    }

    // Validate postconditions on close
    connection.socket.on('close', () => {
      wsConnection.state = 'closed'
      const finalCtx = buildFinalContext(wsConnection, { method: 'GET', path: req.url })
      try {
        validateFormulas(wsContract.ensures, finalCtx)
      } catch (err) {
        if (opts.runtimeLevel === 'error') {
          console.error(`WS postcondition error: ${err instanceof Error ? err.message : String(err)}`)
        } else {
          console.warn(`WS postcondition warning: ${err instanceof Error ? err.message : String(err)}`)
        }
      }
    })

    return originalHandler(connection, req)
  }
}

16. Example: Complete WebSocket Route with Contracts

import fastify from 'fastify'
import websocket from '@fastify/websocket'
import apophis from 'apophis-fastify'

const app = fastify()

await app.register(websocket)
await app.register(apophis, {
  runtime: 'error',  // Enforce contracts at runtime
})

app.get('/ws/notifications', {
  websocket: true,
  schema: {
    querystring: {
      type: 'object',
      properties: {
        userId: { type: 'string', pattern: '^[a-zA-Z0-9_-]+$' }
      },
      required: ['userId']
    },
    'x-ws-messages': [
      {
        type: 'auth',
        direction: 'outgoing',
        schema: {
          type: 'object',
          properties: {
            token: { type: 'string', minLength: 32 }
          },
          required: ['token']
        },
        required: true
      },
      {
        type: 'auth_success',
        direction: 'incoming',
        schema: {
          type: 'object',
          properties: {
            status: { type: 'string', const: 'authenticated' },
            userId: { type: 'string' }
          },
          required: ['status', 'userId']
        }
      },
      {
        type: 'subscribe',
        direction: 'outgoing',
        schema: {
          type: 'object',
          properties: {
            channels: {
              type: 'array',
              items: { type: 'string', enum: ['alerts', 'messages', 'system'] },
              minItems: 1
            }
          },
          required: ['channels']
        }
      },
      {
        type: 'notification',
        direction: 'incoming',
        schema: {
          type: 'object',
          properties: {
            channel: { type: 'string' },
            data: { type: 'object' },
            timestamp: { type: 'string', format: 'date-time' }
          },
          required: ['channel', 'data', 'timestamp']
        }
      },
      {
        type: 'ping',
        direction: 'outgoing',
        schema: {
          type: 'object',
          properties: {
            timestamp: { type: 'number' }
          }
        }
      },
      {
        type: 'pong',
        direction: 'incoming',
        schema: {
          type: 'object',
          properties: {
            timestamp: { type: 'number' }
          }
        }
      }
    ],
    'x-ws-transitions': [
      { from: 'open', to: 'authenticating', trigger: 'auth' },
      { from: 'authenticating', to: 'ready', trigger: 'auth_success' },
      { from: 'ready', to: 'subscribed', trigger: 'subscribe' },
      { from: 'subscribed', to: 'subscribed', trigger: 'notification' },
      { from: 'subscribed', to: 'subscribed', trigger: 'ping' },
      { from: 'subscribed', to: 'subscribed', trigger: 'pong' }
    ],
    'x-requires': [
      'request_headers(this).authorization != null',
      'query_params(this).userId matches "^[a-zA-Z0-9_-]+$"'
    ],
    'x-ensures': [
      'ws_state(this) == "ready" => previous(ws_message(this).type) == "auth"',
      'ws_message(this).type == "notification" => ws_state(this) == "subscribed"'
    ],
    'x-invariants': [
      'for msg in ws_message(this): msg.direction == "incoming" || msg.direction == "outgoing"',
      'ws_state(this) != "error"'
    ],
    'x-validate-runtime': true
  }
}, (connection, req) => {
  const userId = (req.query as any).userId

  connection.socket.on('message', (raw) => {
    const msg = JSON.parse(raw.toString())

    switch (msg.type) {
      case 'auth':
        validateToken(msg.token)
        connection.socket.send(JSON.stringify({
          type: 'auth_success',
          status: 'authenticated',
          userId
        }))
        break

      case 'subscribe':
        subscribeToChannels(userId, msg.channels)
        break

      case 'ping':
        connection.socket.send(JSON.stringify({
          type: 'pong',
          timestamp: msg.timestamp
        }))
        break
    }
  })

  // Simulate notifications
  const interval = setInterval(() => {
    connection.socket.send(JSON.stringify({
      type: 'notification',
      channel: 'alerts',
      data: { message: 'New alert!' },
      timestamp: new Date().toISOString()
    }))
  }, 5000)

  connection.socket.on('close', () => {
    clearInterval(interval)
  })
})

// Run tests
const suite = await app.apophis.contract()
console.log(`Tests: ${suite.summary.passed} passed, ${suite.summary.failed} failed`)

17. Testing the WebSocket Extension

17.1 Unit Tests

// src/test/ws-runner.test.ts
import { test } from 'node:test'
import assert from 'node:assert'
import { runWebSocketTests } from '../test/ws-runner.js'
import { extractContract } from '../domain/contract.js'

test('ws-runner: validates message schema', async () => {
  const route = extractContract('/ws/test', 'GET', {
    'x-ws-messages': [
      {
        type: 'hello',
        direction: 'incoming',
        schema: { type: 'object', properties: { name: { type: 'string' } } }
      }
    ]
  }, true)

  assert.ok(route.ws)
  assert.equal(route.ws!.messages.length, 1)
})

test('ws-runner: detects invalid state transition', async () => {
  const transitions = [
    { from: 'open', to: 'ready', trigger: 'auth' }
  ]

  const result = validateStateTransition('open', 'subscribe', transitions)
  assert.equal(result.valid, false)
  assert.ok(result.error!.includes('Invalid transition'))
})

test('ws-runner: validates APOSTL ws_message operation', async () => {
  const formula = 'ws_message(this).type == "auth"'
  const ast = parse(formula)
  assert.equal(ast.ast.type, 'operation')
  assert.equal((ast.ast as any).header, 'ws_message')
})

17.2 Integration Tests

// src/test/integration.test.ts (additions)
test('integration: WebSocket contract testing', async () => {
  const app = fastify()
  await app.register(websocket)
  await app.register(apophis)

  app.get('/ws/echo', {
    websocket: true,
    schema: {
      'x-ws-messages': [
        {
          type: 'echo',
          direction: 'outgoing',
          schema: { type: 'object', properties: { text: { type: 'string' } } }
        },
        {
          type: 'echo_response',
          direction: 'incoming',
          schema: { type: 'object', properties: { text: { type: 'string' } } }
        }
      ],
      'x-ws-transitions': [
        { from: 'open', to: 'ready', trigger: 'echo' }
      ],
      'x-ensures': [
        'ws_message(this).type == "echo_response" => ws_message(this).payload.text == previous(ws_message(this).payload.text)'
      ]
    }
  }, (connection) => {
    connection.socket.on('message', (raw) => {
      const msg = JSON.parse(raw.toString())
      connection.socket.send(JSON.stringify({
        type: 'echo_response',
        text: msg.text
      }))
    })
  })

  await app.ready()
  const suite = await app.apophis.contract()

  assert.ok(suite.summary.passed > 0)
  assert.equal(suite.summary.failed, 0)
})

18. Migration Guide

18.1 For Existing APOPHIS Users

No breaking changes. Existing HTTP-only routes continue to work unchanged. WebSocket support is additive.

18.2 Enabling WebSocket Testing

  1. Install @fastify/websocket:

    npm install @fastify/websocket
    
  2. Register the WebSocket plugin before APOPHIS:

    await app.register(websocket)
    await app.register(apophis)
    
  3. Add websocket: true to route options:

    app.get('/ws/stream', { websocket: true, schema: { ... } }, handler)
    
  4. Add x-ws-messages and x-ws-transitions to route schema.

  5. Run tests normally:

    const suite = await app.apophis.contract()  // Tests both HTTP and WS
    

19. Performance Considerations

Concern Mitigation
WS connections are expensive Reuse connections across test runs; pool WS clients
Message timeouts Configurable timeout per message type (default: 5s)
Sequence explosion Limit sequence length to depth.maxCommands
Memory leaks Always close connections; use try/finally
Fastify inject doesn't support WS Require fastify.listen() for WS tests; skip if not listening

20. Security Considerations

Concern Mitigation
WS messages may contain secrets Redact authorization, token, password fields in diagnostics
ReDoS in message schema patterns Reuse existing regex-guard.ts for pattern validation
Connection flooding Limit concurrent WS connections in test runner
Malformed binary messages Only support JSON messages in v1; reject binary with clear error

21. Open Questions

  1. Binary WebSocket frames: Should v1 support binary (ArrayBuffer/Buffer) messages, or only JSON?
  2. WebSocket subprotocols: How to handle Sec-WebSocket-Protocol negotiation in contracts?
  3. Multiple WebSocket routes: Should we support route prefixes (e.g., /ws/v1/*, /ws/v2/*) with different contracts?
  4. Server-Sent Events (SSE): Should this extension also cover SSE, or is that a separate spec?
  5. WebSocket compression: Should contracts specify permessage-deflate expectations?

22. Appendix: Complete Type Reference

// All WebSocket types are exported from src/types.ts

export type {
  WebSocketMessage,
  WebSocketConnection,
  WebSocketState,
  WebSocketMessageSchema,
  WebSocketStateTransition,
  WebSocketContract,
  WebSocketEvalContext,
  WebSocketTestResult,
  WebSocketTestDiagnostics,
} from './types.js'

23. File Change Summary

File Lines Added Lines Modified Description
src/types.ts ~120 ~10 Add all WS types to public API
src/domain/contract.ts ~40 ~15 Extract WebSocketContract from schema
src/domain/discovery.ts ~5 ~10 Detect websocket: true flag
src/formula/parser.ts ~25 ~5 Parse ws_message and ws_state
src/formula/evaluator.ts ~15 ~5 Evaluate WS operations
src/test/ws-runner.ts ~350 Main WS test runner
src/test/ws-client.ts ~120 WS client wrapper
src/domain/ws-contract-validation.ts ~80 WS-specific validation
src/plugin/index.ts ~60 ~20 Wire WS into contract()/stateful()
src/domain/category.ts ~5 ~5 Default WS to 'observer'
src/infrastructure/hook-validator.ts ~80 ~10 Runtime WS validation
Total ~900 ~80

Specification version: 1.0.0 Target APOPHIS version: 1.0.0 Last updated: 2025-01-09