Files
apophis-fastify/docs/extensions/HTTP-EXTENSIONS.md
T

45 KiB

APOPHIS v1.0 HTTP Extensions Technical Specification

Overview

This document specifies four HTTP feature extensions for APOPHIS:

  1. Multipart File Uploads (multipart/form-data)
  2. Custom Serializers (Protobuf, MessagePack, XML, etc.)
  3. Streaming Responses (chunked transfer, NDJSON)
  4. Server-Sent Events (SSE)

Each feature is specified with symbol-level anchors, public types, affected modules, function signatures, and pseudocode for implementation.


1. Multipart File Uploads (multipart/form-data)

1.1 JSON Schema Annotations

New x-* properties for multipart support:

Property Type Description
x-content-type string Override content type for request body. Value: "multipart/form-data"
x-multipart-fields object Map of field names to their schema definitions
x-multipart-files object Map of file field names to constraints (maxSize, mimeTypes, etc.)

Example schema annotation:

// Route schema example
schema: {
  body: {
    type: 'object',
    'x-content-type': 'multipart/form-data',
    'x-multipart-fields': {
      description: { type: 'string', maxLength: 500 }
    },
    'x-multipart-files': {
      avatar: {
        maxSize: 5 * 1024 * 1024,  // 5MB
        mimeTypes: ['image/jpeg', 'image/png', 'image/webp'],
        maxCount: 1
      },
      attachments: {
        maxSize: 10 * 1024 * 1024,  // 10MB per file
        mimeTypes: ['application/pdf', 'text/plain'],
        maxCount: 5
      }
    }
  }
}

1.2 Changes to src/types.ts

Line 12-22: Extend RouteContract interface:

export interface RouteContract {
  path: string
  method: string
  category: OperationCategory
  requires: string[]
  ensures: string[]
  invariants: string[]
  regexPatterns: Record<string, string>
  validateRuntime: boolean
  schema?: Record<string, unknown>
  // NEW: Multipart configuration extracted from schema
  multipart?: {
    fields: Record<string, Record<string, unknown>>
    files: Record<string, MultipartFileConstraint>
  }
}

// NEW: Multipart file constraint type
export interface MultipartFileConstraint {
  readonly maxSize: number        // bytes
  readonly mimeTypes: string[]    // allowed MIME types
  readonly maxCount: number       // max files per field
}

Line 8-16: Extend RequestStructure interface:

export interface RequestStructure {
  method: string
  url: string
  headers: Record<string, string>
  query?: Record<string, string>
  body?: unknown
  contentType?: string
  // NEW: Multipart payload
  multipart?: {
    fields: Record<string, string>
    files: MultipartFile[]
  }
}

// NEW: Multipart file representation for test data generation
export interface MultipartFile {
  readonly fieldname: string
  readonly originalname: string
  readonly mimetype: string
  readonly size: number
  readonly buffer: Buffer  // Injected by test generator
}

1.3 Changes to src/domain/schema-to-arbitrary.ts

Line 9-12: Extend SchemaToArbOptions:

export interface SchemaToArbOptions {
  readonly context: 'request' | 'response'
  // NEW: Generate multipart payloads when true
  readonly generateMultipart?: boolean
}

Line 47-74: Add buildMultipartArb function after buildStringArb:

const buildMultipartArb = (
  schema: Record<string, unknown>
): Arbitrary<{ fields: Record<string, unknown>; files: MultipartFile[] }> => {
  const fieldsSchema = schema['x-multipart-fields'] as Record<string, Record<string, unknown>> | undefined
  const filesSchema = schema['x-multipart-files'] as Record<string, Record<string, unknown>> | undefined

  // Build field arbitraries
  const fieldArbs: Record<string, Arbitrary<unknown>> = {}
  if (fieldsSchema) {
    for (const [key, fieldSchema] of Object.entries(fieldsSchema)) {
      if (isObject(fieldSchema)) {
        fieldArbs[key] = convertSchemaInternal(fieldSchema, { context: 'request' })
      }
    }
  }

  // Build file arbitraries
  const fileArbs: Arbitrary<MultipartFile[]> = fc.array(
    fc.record<MultipartFile>({
      fieldname: fc.string({ minLength: 1, maxLength: 50 }),
      originalname: fc.string({ minLength: 1, maxLength: 100 }),
      mimetype: fc.constantFrom('image/jpeg', 'image/png', 'application/pdf', 'text/plain'),
      size: fc.integer({ min: 1, max: 1024 * 1024 }), // 1MB max for tests
      buffer: fc.uint8Array({ minLength: 1, maxLength: 1024 }).map(arr => Buffer.from(arr)),
    }),
    { minLength: 1, maxLength: 3 }
  )

  return fc.tuple(
    Object.keys(fieldArbs).length > 0 ? fc.record(fieldArbs) : fc.constant({}),
    fileArbs
  ).map(([fields, files]) => ({ fields, files }))
}

Line 134-167: Modify convertSchemaInternal to detect multipart:

const convertSchemaInternal = (
  schema: Record<string, unknown>,
  options: SchemaToArbOptions
): Arbitrary<unknown> => {
  const type = getString(schema, 'type')
  const enumValues = getArray(schema, 'enum')
  const nullable = getBoolean(schema, 'nullable')
  const contentType = getString(schema, 'x-content-type')

  // NEW: Handle multipart schemas
  if (contentType === 'multipart/form-data' && options.generateMultipart) {
    return buildMultipartArb(schema) as Arbitrary<unknown>
  }

  let arb: Arbitrary<unknown>

  if (enumValues !== undefined && enumValues.length > 0) {
    arb = fc.constantFrom(...enumValues)
  } else if (type === 'string') {
    arb = buildStringArb(schema)
  } else if (type === 'integer') {
    arb = buildIntegerArb(schema)
  } else if (type === 'number') {
    arb = fc.float()
  } else if (type === 'boolean') {
    arb = fc.boolean()
  } else if (type === 'array') {
    arb = buildArrayArb(schema, options)
  } else if (type === 'object') {
    arb = buildObjectArb(schema, options)
  } else {
    arb = fc.anything()
  }

  if (nullable === true) {
    return fc.option(arb, { nil: null })
  }

  return arb
}

1.4 Changes to src/infrastructure/http-executor.ts

Line 64-129: Modify executeHttp to handle multipart payloads:

export const executeHttp = async (
  fastify: FastifyInjectInstance,
  route: RouteContract,
  request: RequestStructure,
  previous?: EvalContext
): Promise<EvalContext> => {
  const queryString = buildQueryString(request.query)
  const fullUrl = queryString ? `${request.url}?${queryString}` : request.url

  if (process.env.APOPHIS_DEBUG === '1') {
    log.debug(`→ ${request.method} ${fullUrl}`, {
      headers: request.headers,
      body: request.body,
      multipart: request.multipart,
    })
  }

  // NEW: Handle multipart uploads
  let payload: unknown = request.body
  let headers = { ...request.headers }

  if (request.multipart) {
    // Build FormData for multipart
    const formData = new FormData()
    
    // Add fields
    for (const [key, value] of Object.entries(request.multipart.fields)) {
      formData.append(key, String(value))
    }
    
    // Add files
    for (const file of request.multipart.files) {
      const blob = new Blob([file.buffer], { type: file.mimetype })
      formData.append(file.fieldname, blob, file.originalname)
    }
    
    payload = formData
    // FormData sets its own content-type with boundary
    delete headers['content-type']
  }

  const response = await fastify.inject({
    method: request.method,
    url: fullUrl,
    payload,
    headers,
  })

  const pathParams = extractPathParams(route.path, request.url)

  let responseBody: unknown
  try {
    responseBody = response.json()
  } catch {
    responseBody = undefined
  }

  if (process.env.APOPHIS_DEBUG === '1') {
    log.debug(`← ${response.statusCode} ${request.method} ${fullUrl}`, {
      headers: response.headers,
      body: responseBody,
    })
  }

  const ctx: EvalContext = {
    request: {
      body: request.body,
      headers: request.headers,
      query: request.query || {},
      params: pathParams,
      // NEW: Include multipart info in context for formula evaluation
      multipart: request.multipart,
    },
    response: {
      body: responseBody,
      headers: stringifyHeaders(response.headers),
      statusCode: response.statusCode,
    },
    previous,
  }

  return ctx
}

Line 71-90: Update FastifyInjectInstance interface to accept FormData:

export interface FastifyInjectInstance {
  routes?: Array<{ method: string; url: string; schema?: Record<string, unknown> }>
  inject(opts: {
    method: string
    url: string
    payload?: unknown | FormData
    headers?: Record<string, string>
  }): Promise<{
    json(): unknown
    statusCode: number
    headers: Record<string, unknown>
  }>
}

1.5 Changes to src/infrastructure/hook-validator.ts

Line 53-66: Update buildPreContext to include multipart:

const buildPreContext = (request: FastifyRequest): EvalContext => ({
  request: {
    body: request.body,
    headers: request.headers as Record<string, string>,
    query: request.query as Record<string, unknown>,
    params: request.params as Record<string, string>,
    cookies: getCookies(request),
    // NEW: Extract multipart data if present
    multipart: (request as any).multipart,
  },
  response: {
    body: null,
    headers: {},
    statusCode: 0,
  },
})

1.6 Changes to src/domain/request-builder.ts

Line 135-163: Modify buildRequest to detect and build multipart:

export const buildRequest = (
  route: RouteContract,
  generatedData: Record<string, unknown>,
  scopeHeaders: Record<string, string>,
  state: ModelState,
  rng?: SeededRng
): RequestStructure => {
  const url = substitutePathParams(route.path, generatedData, state, rng)

  // Check if route expects multipart
  const bodySchema = route.schema?.body as Record<string, unknown> | undefined
  const contentType = bodySchema?.['x-content-type'] as string | undefined

  if (contentType === 'multipart/form-data') {
    // Extract multipart data from generated payload
    const multipartData = generatedData as { fields: Record<string, unknown>; files: MultipartFile[] }
    
    return {
      method: route.method,
      url,
      headers: { ...scopeHeaders },
      multipart: {
        fields: Object.fromEntries(
          Object.entries(multipartData.fields).map(([k, v]) => [k, String(v)])
        ),
        files: multipartData.files,
      },
      contentType: 'multipart/form-data',
    }
  }

  // Existing body/query extraction logic
  const body = bodySchema
    ? extractBodyParams(generatedData, bodySchema)
    : undefined

  const querySchema = route.schema?.querystring as Record<string, unknown> | undefined
  const query = querySchema
    ? extractQueryParams(generatedData, querySchema)
    : extractRemainingParams(generatedData, parseRouteParams(route.path), body)

  const headers = buildHeaders(route, scopeHeaders, generatedData, state)
  const contentTypeHeader = body ? 'application/json' : undefined

  return { method: route.method, url, headers, query, body, contentType: contentTypeHeader }
}

1.7 New APOSTL Operations/Formulas

New operation headers for multipart access:

// In src/types.ts, extend OperationHeader:
export type OperationHeader = 
  | 'request_body' | 'response_body' | 'response_code' 
  | 'request_headers' | 'response_headers' | 'query_params' 
  | 'cookies' | 'response_time'
  // NEW:
  | 'request_files' | 'request_fields'

Example formulas:

// Check file count
request_files(this).avatar.count == 1

// Check file size
request_files(this).avatar.size <= 5242880

// Check MIME type
request_files(this).avatar.mimetype matches "image/(jpeg|png|webp)"

// Check field presence
request_fields(this).description != null

// Check field value
request_fields(this).description.length > 10

1.8 Changes to src/formula/parser.ts

Line 222-225: Add new valid headers:

const VALID_HEADERS: OperationHeader[] = [
  'request_body', 'response_body', 'response_code',
  'request_headers', 'response_headers', 'query_params', 'cookies', 'response_time',
  // NEW:
  'request_files', 'request_fields'
]

Line 227-379: Extend parseOperation to handle new headers (add manual charCode checks for request_files and request_fields).

1.9 Changes to src/formula/evaluator.ts

Line 9-65: Extend resolveOperation:

function resolveOperation(node: Extract<FormulaNode, { type: 'operation' }>, ctx: EvalContext): unknown {
  const { header, parameter, accessor } = node

  let target: unknown

  switch (header) {
    // ... existing cases ...
    
    // NEW: Multipart access
    case 'request_files':
      target = ctx.request.multipart?.files ?? []
      break
    case 'request_fields':
      target = ctx.request.multipart?.fields ?? {}
      break
    
    default:
      throw new Error(`Unknown operation header: ${header}`)
  }

  // ... existing accessor logic ...
}

1.10 Changes to src/domain/contract.ts

Line 63-73: Extract multipart config:

const contract: RouteContract = {
  path,
  method: method.toUpperCase(),
  category,
  requires,
  ensures,
  invariants: EMPTY_INVARIANTS,
  regexPatterns: {},
  validateRuntime,
  schema: s,
  // NEW: Extract multipart configuration
  multipart: bodySchema?.['x-content-type'] === 'multipart/form-data'
    ? {
        fields: (bodySchema['x-multipart-fields'] as Record<string, Record<string, unknown>>) ?? {},
        files: (bodySchema['x-multipart-files'] as Record<string, Record<string, unknown>>) ?? {},
      }
    : undefined,
}

1.11 Example Fastify Route Definition

fastify.post('/upload', {
  schema: {
    description: 'Upload avatar with metadata',
    body: {
      type: 'object',
      'x-content-type': 'multipart/form-data',
      'x-multipart-fields': {
        description: { type: 'string', maxLength: 500 }
      },
      'x-multipart-files': {
        avatar: {
          maxSize: 5 * 1024 * 1024,
          mimeTypes: ['image/jpeg', 'image/png', 'image/webp'],
          maxCount: 1
        }
      }
    },
    response: {
      201: {
        type: 'object',
        properties: {
          id: { type: 'string', format: 'uuid' },
          url: { type: 'string', format: 'uri' },
          size: { type: 'integer' }
        },
        'x-ensures': [
          'response_body(this).url matches "^https?://"',
          'response_body(this).size > 0'
        ]
      }
    }
  }
}, async (request, reply) => {
  // Handler receives multipart data via @fastify/multipart
  const data = await request.file()
  // ... process upload ...
  return { id: '...', url: '...', size: 12345 }
})

1.12 Backward Compatibility

  • Routes without x-content-type: multipart/form-data behave exactly as before
  • RequestStructure.multipart is optional
  • EvalContext.request.multipart is optional
  • Test generators only produce multipart data when generateMultipart: true is passed

2. Custom Serializers (Protobuf, MessagePack, XML, etc.)

2.1 JSON Schema Annotations

Property Type Description
x-serializer string Serializer identifier: "protobuf", "msgpack", "xml", "custom"
x-serializer-schema string Path to serializer schema file (e.g., .proto, .xsd)
x-serializer-version string Schema version for compatibility checks

Example:

schema: {
  body: {
    type: 'object',
    'x-serializer': 'protobuf',
    'x-serializer-schema': './schemas/user.proto',
    'x-serializer-version': 'v1.2.3'
  }
}

2.2 Changes to src/types.ts

Line 12-22: Extend RouteContract:

export interface RouteContract {
  // ... existing fields ...
  // NEW: Serializer configuration
  serializer?: {
    name: string
    schemaPath?: string
    version?: string
  }
}

Line 8-16: Extend RequestStructure:

export interface RequestStructure {
  // ... existing fields ...
  // NEW: Serialization hint
  serializer?: string
}

Line 71-86: Extend EvalContext:

export interface EvalContext {
  request: {
    // ... existing fields ...
    // NEW: Raw serialized payload for inspection
    rawBody?: Buffer
  }
  response: {
    // ... existing fields ...
    // NEW: Raw serialized response for inspection
    rawBody?: Buffer
    // NEW: Serializer used for response
    serializer?: string
  }
}

2.3 Changes to src/domain/schema-to-arbitrary.ts

No changes required. Custom serializers do not affect test data generation — APOPHIS generates native JS objects, and serialization happens at the HTTP layer.

2.4 Changes to src/infrastructure/http-executor.ts

Line 1-18: Add serializer interface:

// NEW: Serializer registry interface
export interface Serializer {
  readonly name: string
  encode(data: unknown): Buffer
  decode(buffer: Buffer): unknown
}

// NEW: Serializer registry (injected)
export interface SerializerRegistry {
  get(name: string): Serializer | undefined
  register(name: string, serializer: Serializer): void
}

Line 64-129: Modify executeHttp to handle serialization:

export const executeHttp = async (
  fastify: FastifyInjectInstance,
  route: RouteContract,
  request: RequestStructure,
  previous?: EvalContext,
  // NEW: Injected serializer registry
  serializers?: SerializerRegistry
): Promise<EvalContext> => {
  const queryString = buildQueryString(request.query)
  const fullUrl = queryString ? `${request.url}?${queryString}` : request.url

  // NEW: Serialize request body if serializer specified
  let payload: unknown = request.body
  let requestRawBody: Buffer | undefined

  if (route.serializer && serializers) {
    const serializer = serializers.get(route.serializer.name)
    if (serializer) {
      requestRawBody = serializer.encode(request.body)
      payload = requestRawBody
    }
  }

  const response = await fastify.inject({
    method: request.method,
    url: fullUrl,
    payload,
    headers: request.headers,
  })

  const pathParams = extractPathParams(route.path, request.url)

  // NEW: Deserialize response body if serializer specified
  let responseBody: unknown
  let responseRawBody: Buffer | undefined

  try {
    if (route.serializer && serializers) {
      const serializer = serializers.get(route.serializer.name)
      if (serializer) {
        // Assuming response.raw is available or we can get buffer
        responseRawBody = Buffer.from(JSON.stringify(response.json())) // Fallback
        responseBody = serializer.decode(responseRawBody)
      } else {
        responseBody = response.json()
      }
    } else {
      responseBody = response.json()
    }
  } catch {
    responseBody = undefined
  }

  const ctx: EvalContext = {
    request: {
      body: request.body,
      headers: request.headers,
      query: request.query || {},
      params: pathParams,
      rawBody: requestRawBody,
    },
    response: {
      body: responseBody,
      headers: stringifyHeaders(response.headers),
      statusCode: response.statusCode,
      rawBody: responseRawBody,
      serializer: route.serializer?.name,
    },
    previous,
  }

  return ctx
}

2.5 Changes to src/infrastructure/hook-validator.ts

Line 68-81: Update buildPostContext to capture serialized payload:

const buildPostContext = (request: FastifyRequest, reply: FastifyReply): EvalContext => ({
  request: {
    body: request.body,
    headers: request.headers as Record<string, string>,
    query: request.query as Record<string, unknown>,
    params: request.params as Record<string, string>,
    cookies: getCookies(request),
    // NEW: Capture raw body if available
    rawBody: (request as any).rawBody,
  },
  response: {
    body: reply[kApophisPayload] ?? null,
    headers: reply.getHeaders() as Record<string, string>,
    statusCode: reply.statusCode,
    // NEW: Serializer info from route config
    serializer: (request.routeOptions?.config as any)?.apophisContract?.serializer?.name,
  },
})

2.6 New APOSTL Operations/Formulas

New formula functions for serializer validation:

// Check serializer used
response_headers(this)['content-type'] == "application/x-protobuf"

// Check schema version (via custom header)
response_headers(this)['x-schema-version'] == "v1.2.3"

// Check raw body size
response_body(this) != null

2.7 Changes to src/domain/contract.ts

Line 63-73: Extract serializer config:

const contract: RouteContract = {
  path,
  method: method.toUpperCase(),
  category,
  requires,
  ensures,
  invariants: EMPTY_INVARIANTS,
  regexPatterns: {},
  validateRuntime,
  schema: s,
  // NEW: Extract serializer configuration
  serializer: s['x-serializer']
    ? {
        name: String(s['x-serializer']),
        schemaPath: s['x-serializer-schema'] as string | undefined,
        version: s['x-serializer-version'] as string | undefined,
      }
    : undefined,
}

2.8 Example Fastify Route Definition

// Protobuf route
fastify.post('/users', {
  schema: {
    body: {
      type: 'object',
      'x-serializer': 'protobuf',
      'x-serializer-schema': './schemas/user.proto',
      properties: {
        name: { type: 'string' },
        email: { type: 'string', format: 'email' }
      }
    },
    response: {
      201: {
        type: 'object',
        'x-serializer': 'protobuf',
        properties: {
          id: { type: 'string' },
          createdAt: { type: 'string', format: 'date-time' }
        }
      }
    }
  }
}, async (request, reply) => {
  // Handler receives deserialized protobuf message
  // Fastify plugin handles serialization/deserialization
})

2.9 Backward Compatibility

  • Routes without x-serializer behave exactly as before (JSON default)
  • SerializerRegistry is optional dependency — injected at plugin initialization
  • Raw body capture only occurs when serializer is configured

3. Streaming Responses (Chunked Transfer, NDJSON)

3.1 JSON Schema Annotations

Property Type Description
x-streaming boolean Enable streaming response handling
x-stream-format string Stream format: "ndjson", "sse", "chunked"
x-stream-max-chunks number Max chunks to collect for validation
x-stream-timeout number Milliseconds to wait for stream completion

Example:

schema: {
  response: {
    200: {
      type: 'object',
      'x-streaming': true,
      'x-stream-format': 'ndjson',
      'x-stream-max-chunks': 100,
      'x-stream-timeout': 5000,
      properties: {
        items: {
          type: 'array',
          items: { type: 'object' }
        }
      }
    }
  }
}

3.2 Changes to src/types.ts

Line 12-22: Extend RouteContract:

export interface RouteContract {
  // ... existing fields ...
  // NEW: Streaming configuration
  streaming?: {
    enabled: boolean
    format: 'ndjson' | 'sse' | 'chunked'
    maxChunks: number
    timeoutMs: number
  }
}

Line 71-86: Extend EvalContext:

export interface EvalContext {
  request: {
    // ... existing fields ...
  }
  response: {
    // ... existing fields ...
    // NEW: Streaming response data
    chunks?: unknown[]
    streamFormat?: string
    streamDurationMs?: number
  }
}

3.3 Changes to src/domain/schema-to-arbitrary.ts

No changes required for streaming. Test data generation for requests is unchanged.

3.4 Changes to src/infrastructure/http-executor.ts

Line 64-129: Add streaming response handling:

export const executeHttp = async (
  fastify: FastifyInjectInstance,
  route: RouteContract,
  request: RequestStructure,
  previous?: EvalContext
): Promise<EvalContext> => {
  const queryString = buildQueryString(request.query)
  const fullUrl = queryString ? `${request.url}?${queryString}` : request.url

  const response = await fastify.inject({
    method: request.method,
    url: fullUrl,
    payload: request.body,
    headers: request.headers,
  })

  const pathParams = extractPathParams(route.path, request.url)

  // NEW: Handle streaming responses
  let responseBody: unknown
  let chunks: unknown[] | undefined
  let streamDurationMs: number | undefined

  if (route.streaming?.enabled) {
    const startTime = Date.now()
    chunks = await collectStreamChunks(response, route.streaming)
    streamDurationMs = Date.now() - startTime
    
    // Aggregate chunks based on format
    if (route.streaming.format === 'ndjson') {
      responseBody = chunks
    } else if (route.streaming.format === 'sse') {
      responseBody = parseSSEEvents(chunks)
    } else {
      responseBody = chunks.join('')
    }
  } else {
    try {
      responseBody = response.json()
    } catch {
      responseBody = undefined
    }
  }

  const ctx: EvalContext = {
    request: {
      body: request.body,
      headers: request.headers,
      query: request.query || {},
      params: pathParams,
    },
    response: {
      body: responseBody,
      headers: stringifyHeaders(response.headers),
      statusCode: response.statusCode,
      chunks,
      streamFormat: route.streaming?.format,
      streamDurationMs,
    },
    previous,
  }

  return ctx
}

// NEW: Stream chunk collection
async function collectStreamChunks(
  response: any,
  config: { maxChunks: number; timeoutMs: number; format: string }
): Promise<unknown[]> {
  const chunks: unknown[] = []
  const startTime = Date.now()

  // Fastify injection does not provide a portable stream-consumption API here.
  // Verify implementation against light-my-request behavior before declaring streaming support.
  
  if (response.raw && response.raw.readable) {
    for await (const chunk of response.raw) {
      if (Date.now() - startTime > config.timeoutMs) {
        break
      }
      
      if (config.format === 'ndjson') {
        // Parse each line as JSON
        const lines = chunk.toString().split('\n').filter(Boolean)
        for (const line of lines) {
          try {
            chunks.push(JSON.parse(line))
          } catch {
            chunks.push(line)
          }
        }
      } else {
        chunks.push(chunk.toString())
      }
      
      if (chunks.length >= config.maxChunks) {
        break
      }
    }
  } else {
    // Non-streaming fallback
    try {
      chunks.push(response.json())
    } catch {
      chunks.push(response.payload)
    }
  }

  return chunks
}

// NEW: Parse SSE events
function parseSSEEvents(chunks: unknown[]): Array<{ event?: string; data: unknown; id?: string }> {
  const events: Array<{ event?: string; data: unknown; id?: string }> = []
  let currentEvent: Partial<{ event?: string; data: string; id?: string }> = {}

  for (const chunk of chunks) {
    const lines = String(chunk).split('\n')
    for (const line of lines) {
      if (line.startsWith('event:')) {
        currentEvent.event = line.slice(6).trim()
      } else if (line.startsWith('id:')) {
        currentEvent.id = line.slice(3).trim()
      } else if (line.startsWith('data:')) {
        currentEvent.data = (currentEvent.data || '') + line.slice(5).trim()
      } else if (line === '') {
        if (currentEvent.data) {
          try {
            events.push({ ...currentEvent, data: JSON.parse(currentEvent.data) })
          } catch {
            events.push({ ...currentEvent, data: currentEvent.data })
          }
          currentEvent = {}
        }
      }
    }
  }

  return events
}

3.5 Changes to src/infrastructure/hook-validator.ts

Line 68-81: Update buildPostContext:

const buildPostContext = (request: FastifyRequest, reply: FastifyReply): EvalContext => ({
  request: {
    body: request.body,
    headers: request.headers as Record<string, string>,
    query: request.query as Record<string, unknown>,
    params: request.params as Record<string, string>,
    cookies: getCookies(request),
  },
  response: {
    body: reply[kApophisPayload] ?? null,
    headers: reply.getHeaders() as Record<string, string>,
    statusCode: reply.statusCode,
    // NEW: Streaming info from route config
    chunks: (reply as any).apophisChunks,
    streamFormat: (request.routeOptions?.config as any)?.apophisContract?.streaming?.format,
  },
})

3.6 New APOSTL Operations/Formulas

New formula functions for streaming validation:

// Check chunk count
response_body(this).chunks.length <= 100

// Check stream duration
response_time(this) < 5000

// Check NDJSON structure (each chunk has required field)
for item in response_body(this): item.id != null

// Check SSE event type
response_body(this).events.0.event == "update"

// Check stream completed
response_headers(this)['transfer-encoding'] == "chunked"

3.7 Changes to src/domain/contract.ts

Line 63-73: Extract streaming config:

const contract: RouteContract = {
  path,
  method: method.toUpperCase(),
  category,
  requires,
  ensures,
  invariants: EMPTY_INVARIANTS,
  regexPatterns: {},
  validateRuntime,
  schema: s,
  // NEW: Extract streaming configuration from response schema
  streaming: (() => {
    const responseSchema = (s.response ?? {}) as Record<string, Record<string, unknown>>
    const firstStatus = Object.values(responseSchema)[0]
    if (firstStatus?.['x-streaming'] === true) {
      return {
        enabled: true,
        format: (firstStatus['x-stream-format'] as 'ndjson' | 'sse' | 'chunked') ?? 'chunked',
        maxChunks: (firstStatus['x-stream-max-chunks'] as number) ?? 100,
        timeoutMs: (firstStatus['x-stream-timeout'] as number) ?? 5000,
      }
    }
    return undefined
  })(),
}

3.8 Example Fastify Route Definition

// NDJSON streaming route
fastify.get('/events', {
  schema: {
    response: {
      200: {
        type: 'object',
        'x-streaming': true,
        'x-stream-format': 'ndjson',
        'x-stream-max-chunks': 50,
        'x-stream-timeout': 3000,
        properties: {
          events: {
            type: 'array',
            items: {
              type: 'object',
              properties: {
                id: { type: 'string' },
                timestamp: { type: 'string', format: 'date-time' },
                data: { type: 'object' }
              }
            }
          }
        },
        'x-ensures': [
          'for item in response_body(this): item.id != null',
          'response_body(this).length <= 50'
        ]
      }
    }
  }
}, async (request, reply) => {
  reply.header('content-type', 'application/x-ndjson')
  
  const stream = new Readable({
    read() {
      // Stream NDJSON data
      this.push(JSON.stringify({ id: '1', timestamp: new Date().toISOString(), data: {} }) + '\n')
      this.push(null)
    }
  })
  
  return reply.send(stream)
})

3.9 Backward Compatibility

  • Routes without x-streaming behave exactly as before
  • EvalContext.response.chunks is optional
  • Non-streaming responses fallback to existing JSON parsing

4. Server-Sent Events (SSE)

4.1 JSON Schema Annotations

Property Type Description
x-sse boolean Mark response as SSE stream
x-sse-events string[] Allowed event types
x-sse-max-events number Max events to collect for validation
x-sse-timeout number Milliseconds to wait for events
x-sse-retry number Expected retry interval (ms)

Example:

schema: {
  response: {
    200: {
      type: 'object',
      'x-sse': true,
      'x-sse-events': ['update', 'delete', 'heartbeat'],
      'x-sse-max-events': 10,
      'x-sse-timeout': 30000,
      'x-sse-retry': 3000,
      properties: {
        events: {
          type: 'array',
          items: {
            type: 'object',
            properties: {
              event: { type: 'string' },
              data: { type: 'object' },
              id: { type: 'string' }
            }
          }
        }
      }
    }
  }
}

4.2 Changes to src/types.ts

Line 12-22: Extend RouteContract:

export interface RouteContract {
  // ... existing fields ...
  // NEW: SSE configuration
  sse?: {
    enabled: boolean
    allowedEvents: string[]
    maxEvents: number
    timeoutMs: number
    retryMs?: number
  }
}

Line 71-86: Extend EvalContext:

export interface EvalContext {
  request: {
    // ... existing fields ...
  }
  response: {
    // ... existing fields ...
    // NEW: SSE-specific data
    sseEvents?: Array<{
      event?: string
      data: unknown
      id?: string
      retry?: number
    }>
    sseDurationMs?: number
  }
}

4.3 Changes to src/domain/schema-to-arbitrary.ts

No changes required. SSE is response-only; request generation is unchanged.

4.4 Changes to src/infrastructure/http-executor.ts

Line 64-129: Add SSE handling:

export const executeHttp = async (
  fastify: FastifyInjectInstance,
  route: RouteContract,
  request: RequestStructure,
  previous?: EvalContext
): Promise<EvalContext> => {
  const queryString = buildQueryString(request.query)
  const fullUrl = queryString ? `${request.url}?${queryString}` : request.url

  const response = await fastify.inject({
    method: request.method,
    url: fullUrl,
    payload: request.body,
    headers: {
      ...request.headers,
      // NEW: SSE requires Accept: text/event-stream
      accept: route.sse?.enabled ? 'text/event-stream' : request.headers.accept,
    },
  })

  const pathParams = extractPathParams(route.path, request.url)

  // NEW: Handle SSE responses
  let responseBody: unknown
  let sseEvents: Array<{ event?: string; data: unknown; id?: string; retry?: number }> | undefined
  let sseDurationMs: number | undefined

  if (route.sse?.enabled) {
    const startTime = Date.now()
    sseEvents = await collectSSEEvents(response, route.sse)
    sseDurationMs = Date.now() - startTime
    responseBody = { events: sseEvents }
  } else {
    try {
      responseBody = response.json()
    } catch {
      responseBody = undefined
    }
  }

  const ctx: EvalContext = {
    request: {
      body: request.body,
      headers: request.headers,
      query: request.query || {},
      params: pathParams,
    },
    response: {
      body: responseBody,
      headers: stringifyHeaders(response.headers),
      statusCode: response.statusCode,
      sseEvents,
      sseDurationMs,
    },
    previous,
  }

  return ctx
}

// NEW: SSE event collection
async function collectSSEEvents(
  response: any,
  config: { maxEvents: number; timeoutMs: number; retryMs?: number }
): Promise<Array<{ event?: string; data: unknown; id?: string; retry?: number }>> {
  const events: Array<{ event?: string; data: unknown; id?: string; retry?: number }> = []
  const startTime = Date.now()

  if (response.raw && response.raw.readable) {
    let buffer = ''
    
    for await (const chunk of response.raw) {
      if (Date.now() - startTime > config.timeoutMs) {
        break
      }

      buffer += chunk.toString()
      const lines = buffer.split('\n')
      buffer = lines.pop() || '' // Keep incomplete line in buffer

      let currentEvent: Partial<{ event?: string; data: string; id?: string; retry?: number }> = {}

      for (const line of lines) {
        if (line.startsWith('event:')) {
          currentEvent.event = line.slice(6).trim()
        } else if (line.startsWith('id:')) {
          currentEvent.id = line.slice(3).trim()
        } else if (line.startsWith('data:')) {
          currentEvent.data = (currentEvent.data || '') + line.slice(5).trim() + '\n'
        } else if (line.startsWith('retry:')) {
          currentEvent.retry = parseInt(line.slice(6).trim(), 10)
        } else if (line === '') {
          // End of event
          if (currentEvent.data) {
            const data = currentEvent.data.trim()
            try {
              events.push({
                ...currentEvent,
                data: JSON.parse(data),
              })
            } catch {
              events.push({
                ...currentEvent,
                data,
              })
            }
            currentEvent = {}
            
            if (events.length >= config.maxEvents) {
              return events
            }
          }
        }
      }
    }
  }

  return events
}

4.5 Changes to src/infrastructure/hook-validator.ts

Line 68-81: Update buildPostContext:

const buildPostContext = (request: FastifyRequest, reply: FastifyReply): EvalContext => ({
  request: {
    body: request.body,
    headers: request.headers as Record<string, string>,
    query: request.query as Record<string, unknown>,
    params: request.params as Record<string, string>,
    cookies: getCookies(request),
  },
  response: {
    body: reply[kApophisPayload] ?? null,
    headers: reply.getHeaders() as Record<string, string>,
    statusCode: reply.statusCode,
    // NEW: SSE events from reply
    sseEvents: (reply as any).apophisSseEvents,
  },
})

4.6 New APOSTL Operations/Formulas

New formula functions for SSE validation:

// Check event type is allowed
response_body(this).events.0.event == "update"

// Check event count
response_body(this).events.length <= 10

// Check SSE retry interval
response_body(this).events.0.retry == 3000

// Check event has data
response_body(this).events.0.data != null

// Check event ID is present
response_body(this).events.0.id != null

// Check content-type header
response_headers(this)['content-type'] == "text/event-stream"

// Check cache-control header for SSE
response_headers(this)['cache-control'] == "no-cache"

4.7 Changes to src/formula/evaluator.ts

Line 143-215: Extend evaluateNode to handle array indexing in accessors:

case 'operation': {
  return resolveOperation(node, ctx)
}

The existing accessor resolution in resolveOperation (lines 43-62) already supports numeric array indices via string accessors. For SSE, response_body(this).events.0.event will work as-is because:

  • events resolves to the array
  • 0 is used as a property key (works for arrays)
  • event resolves to the event property

No changes needed to the evaluator for basic SSE access.

4.8 Changes to src/domain/contract.ts

Line 63-73: Extract SSE config:

const contract: RouteContract = {
  path,
  method: method.toUpperCase(),
  category,
  requires,
  ensures,
  invariants: EMPTY_INVARIANTS,
  regexPatterns: {},
  validateRuntime,
  schema: s,
  // NEW: Extract SSE configuration from response schema
  sse: (() => {
    const responseSchema = (s.response ?? {}) as Record<string, Record<string, unknown>>
    const firstStatus = Object.values(responseSchema)[0]
    if (firstStatus?.['x-sse'] === true) {
      return {
        enabled: true,
        allowedEvents: (firstStatus['x-sse-events'] as string[]) ?? [],
        maxEvents: (firstStatus['x-sse-max-events'] as number) ?? 10,
        timeoutMs: (firstStatus['x-sse-timeout'] as number) ?? 30000,
        retryMs: firstStatus['x-sse-retry'] as number | undefined,
      }
    }
    return undefined
  })(),
}

4.9 Example Fastify Route Definition

// SSE route
fastify.get('/notifications', {
  schema: {
    response: {
      200: {
        type: 'object',
        'x-sse': true,
        'x-sse-events': ['notification', 'heartbeat'],
        'x-sse-max-events': 5,
        'x-sse-timeout': 10000,
        'x-sse-retry': 5000,
        properties: {
          events: {
            type: 'array',
            items: {
              type: 'object',
              properties: {
                event: { type: 'string' },
                data: { type: 'object' },
                id: { type: 'string' }
              }
            }
          }
        },
        'x-ensures': [
          'response_headers(this)["content-type"] == "text/event-stream"',
          'for event in response_body(this).events: event.data != null',
          'response_body(this).events.length <= 5'
        ]
      }
    }
  }
}, async (request, reply) => {
  reply.header('content-type', 'text/event-stream')
  reply.header('cache-control', 'no-cache')
  reply.header('connection', 'keep-alive')

  const stream = new Readable({
    read() {
      this.push(`event: notification\n`)
      this.push(`id: ${Date.now()}\n`)
      this.push(`data: ${JSON.stringify({ message: 'Hello' })}\n\n`)
      this.push(null)
    }
  })

  return reply.send(stream)
})

4.10 Backward Compatibility

  • Routes without x-sse behave exactly as before
  • EvalContext.response.sseEvents is optional
  • SSE-specific headers only sent when x-sse is enabled

Cross-Cutting Concerns

Plugin Options Extension

File: src/types.ts, lines 257-262

Extend ApophisOptions:

export interface ApophisOptions {
  readonly swagger?: Record<string, unknown>
  readonly runtime?: 'off' | 'warn' | 'error'
  readonly cleanup?: boolean
  readonly scopes?: Record<string, ScopeConfig>
  // NEW: Extension options
  readonly serializers?: SerializerRegistry
  readonly multipart?: {
    readonly maxFileSize: number
    readonly maxFiles: number
  }
}

Plugin Registration

File: src/plugin/index.ts, lines 110-159

Update plugin to inject serializer registry:

export const apophisPlugin = async (fastify: FastifyInstance, opts: ApophisOptions): Promise<void> => {
  await registerSwagger(fastify, opts)

  // NEW: Initialize serializer registry if provided
  const serializerRegistry = opts.serializers ?? {
    get: () => undefined,
    register: () => {},
  }

  // ... existing route capture ...

  const decorations: ApophisDecorations = {
    scope,
    contract: buildContract(fastify, scope),
    stateful: buildStateful(fastify, scope, cleanupManager),
    check: buildCheck(fastify, scope),
    cleanup: buildCleanup(cleanupManager),
    spec: buildSpec(fastify),
    // NEW: Expose serializer registry
    serializers: serializerRegistry,
  }

  fastify.decorate('apophis', decorations)

  // ... existing runtime validation ...
}

Type Exports

File: src/types.ts

Add to public API exports:

export type { MultipartFile, MultipartFileConstraint } from './types.js'
export type { Serializer, SerializerRegistry } from './types.js'

Error Suggestions Extension

File: src/domain/error-suggestions.ts

Add suggestions for new features:

// After line 145 (cookie checks)

// Multipart checks
if (formula.includes('request_files')) {
  return `File upload check failed. Ensure the file field name matches the schema, the file size is within limits, and the MIME type is allowed.`
}

if (formula.includes('request_fields')) {
  return `Multipart field check failed. Ensure the field is present in the form data and matches the expected type.`
}

// Streaming checks
if (formula.includes('chunks')) {
  return `Streaming response check failed. Verify the stream format matches the schema and chunk limits are respected.`
}

// SSE checks
if (formula.includes('events') && formula.includes('event')) {
  return `SSE event check failed. Ensure the event type is allowed and the event data matches the schema.`
}

Test Runner Integration

File: src/test/petit-runner.ts, lines 188-367

Update runPetitTests to pass serializer registry:

export const runPetitTests = async (
  fastify: FastifyInjectInstance,
  config: TestConfig,
  scopeRegistry?: ScopeRegistry,
  // NEW: Optional serializer registry
  serializerRegistry?: SerializerRegistry
): Promise<TestSuite> => {
  // ... existing code ...

  // Pass to executeHttp
  const ctx = await executeHttp(fastify, command.route, request, previousCtx, serializerRegistry)

  // ... rest of existing code ...
}

File: src/test/stateful-runner.ts, lines 222-464

Similar update for runStatefulTests.


Implementation Order

  1. Phase 1: Foundation

    • Extend src/types.ts with all new interfaces
    • Update src/domain/contract.ts to extract new annotations
    • Add error suggestions in src/domain/error-suggestions.ts
  2. Phase 2: Multipart

    • Implement buildMultipartArb in src/domain/schema-to-arbitrary.ts
    • Update src/domain/request-builder.ts for multipart
    • Update src/infrastructure/http-executor.ts for FormData
    • Add request_files/request_fields to parser and evaluator
  3. Phase 3: Custom Serializers

    • Define SerializerRegistry interface
    • Update src/infrastructure/http-executor.ts for serialization
    • Inject registry through plugin options
  4. Phase 4: Streaming

    • Implement stream collection in src/infrastructure/http-executor.ts
    • Add NDJSON/chunked parsing
    • Update contract extraction for streaming config
  5. Phase 5: SSE

    • Implement SSE event collection
    • Add SSE-specific header handling
    • Update contract extraction for SSE config
  6. Phase 6: Integration

    • Update test runners to pass new dependencies
    • Add comprehensive tests for each feature
    • Update plugin registration

Testing Strategy

For each feature, add tests in src/test/:

  1. Multipart: multipart.test.ts

    • Test data generation produces valid multipart payloads
    • HTTP executor correctly builds FormData
    • Formula evaluation accesses files and fields
  2. Serializers: serializers.test.ts

    • Serializer registry registration
    • Encode/decode roundtrip
    • HTTP executor uses correct serializer
  3. Streaming: streaming.test.ts

    • NDJSON chunk parsing
    • Stream timeout handling
    • Chunk count validation
  4. SSE: sse.test.ts

    • SSE event parsing
    • Event type validation
    • Retry interval checking

Backward Compatibility Summary

Feature Breaking Change? Migration
Multipart No Opt-in via x-content-type
Custom Serializers No Opt-in via x-serializer
Streaming No Opt-in via x-streaming
SSE No Opt-in via x-sse

Routes without these annotations must preserve existing behavior. Regression tests must cover non-multipart, non-streaming JSON routes.