Air-Gapped Defense
Telemetry Governance
Evaluate and route operational telemetry in-process — no cloud, no sidecar, no second-stage job. Every record exits the pipeline carrying a guardrail decision, route label, sensitivity score, reason codes, and a cryptographic provenance chain.
Operational telemetry in defense environments isn't homogeneous. The same data plane carries UNCLASSIFIED device heartbeats, CUI sensor-health events, and CONTROLLED logistics records simultaneously. Classification is embedded in the payload — it isn't enforced at the transport layer.
This pipeline pattern evaluates every event inside the JVM before it reaches any sink. By the time a record is written to Kafka it already knows its decision, its route, and exactly why.
This demo uses synthetic operational telemetry only. It avoids tactical targeting, weapons-release, lethality, or mission-planning data. All classification labels and sensitivity scores are generated for demonstration purposes.
Pipeline & Config
Three transforms. One process. No network hop between stages.
SOURCE
_WIREEVENT
GUARDRAIL
GUARDED
Pipeline profile — exact config from this run
This is the real properties file used to produce the benchmark results below. Drop it in
config/pipelines/ and point the runner at it.
# ── Pipeline identity ──────────────────────────────────────── pipeline.id = sk-defense-telemetry-guardrail pipeline.parallelism = 4 pipeline.batch.size = 16 pipeline.drain.timeout.ms = 30000 # ── Transform chain ────────────────────────────────────────── transform.chain = STRING_TO_WIREEVENT,DJL_EMBEDDING,DEFENSE_GUARDRAIL transform.version = defense-telemetry-guardrail-v1 # ── Guardrail policy ───────────────────────────────────────── transform.defense_guardrail.policy.version = defense-telemetry-policy-v1 transform.defense_guardrail.model.version = deterministic-telemetry-guardrail-v1 transform.defense_guardrail.review.threshold = 0.48 # sensitivity_score ≥ this → REVIEW transform.defense_guardrail.deny.threshold = 0.82 # sensitivity_score ≥ this → DENY # ── ONNX embedding ─────────────────────────────────────────── ai.embedding.enabled = true ai.embedding.engine = OnnxRuntime ai.embedding.device = cpu # swap to 'gpu' for CUDA ai.embedding.pool.size = 4 ai.embedding.batching.enabled = true ai.embedding.batching.max.size = 16 ai.embedding.batching.flush.ms = 10 ai.embedding.onnx.intra.op.threads = 2 # pool × intra ≤ physical cores ai.embedding.onnx.inter.op.threads = 1 ai.embedding.onnx.warmup.count = 64 ai.embedding.tokenizer.max.length = 24 # dominant perf factor (see tuning) ai.embedding.normalize = true ai.embedding.model.uri = file:///models/minilm-l6-v2-onnx/model.onnx ai.embedding.tokenizer.uri = file:///models/minilm-l6-v2-onnx/tokenizer.json ai.feature.version = defense-telemetry-features-v1 # ── Source ─────────────────────────────────────────────────── source.type = SYNTHETIC source.synthetic.text.profile = DEFENSE_TELEMETRY source.synthetic.payload.size = 640 source.synthetic.max.records.per.second = 0 # 0 = unlimited # ── Kafka sink ─────────────────────────────────────────────── sink.type = KAFKA sink.kafka.topic = streamkernel-defense-telemetry-guarded sink.kafka.bootstrap.servers = localhost:9092 sink.kafka.compression.type = lz4 sink.kafka.acks = 1 sink.kafka.batch.size = 262144 sink.kafka.linger.ms = 5 sink.kafka.enable.idempotence = false # ── Security ───────────────────────────────────────────────── security.type = PERMIT_ALL # replace with OIDC/mTLS for prod # ── Observability ──────────────────────────────────────────── metrics.provider = PROMETHEUS metrics.prometheus.port = 8080 metrics.tag.run_id = run-defense-telemetry-01 metrics.tag.env = defense-demo
JVM flags used in this run
# launch command (from test-java-runner) java \ -Xms4g -Xmx4g \ -XX:+UseG1GC -XX:MaxGCPauseMillis=50 \ -XX:ParallelGCThreads=3 \ -XX:G1HeapRegionSize=16m -XX:+AlwaysPreTouch \ -Dstreamkernel.executor.mode=FIXED \ -Dstreamkernel.sink.inflight.max=768 \ -Dstreamkernel.outbatch.capacity=64 \ -Dstreamkernel.cache.force.disabled=true \ -Dsk.run.id=run-defense-telemetry-01 \ -Dsk.config.path=config/pipelines/streamkernel_defense_telemetry_guardrail.properties \ -jar streamkernel-app-0.2.0-all.jar
Guardrail Decisions
The DEFENSE_GUARDRAIL transformer maps every event to one of three decisions based on
classification label, sensitivity score, signal score, location precision, and requested sink.
The thresholds are configurable per deployment.
Output Contract
Every record written to the guarded Kafka topic carries two complementary governance envelopes: a JSON payload and a set of Kafka message headers. Neither requires the other to be parseable — a consumer can route on headers alone without deserializing the payload.
JSON payload (REVIEW example — evt-10944)
{
"event_id": "evt-10944",
"classification": "CUI",
"sensitivity_score": 0.77,
"decision": "REVIEW",
"route": "audit_queue",
"reason_codes": ["CUI_CLASSIFICATION", "RESTRICTED_LOCATION_PRECISION",
"CUI_TO_UNCLASSIFIED_REVIEW", "TELEMETRY_TEXT_SIGNAL"],
"sensitive_fields": ["classification", "payload.locationPrecision", "requestedSink"],
"allowed_sinks": ["audit_queue"],
"denied_sinks": ["unclassified-analytics"],
"policy_version": "defense-telemetry-policy-v1",
"model_version": "deterministic-telemetry-guardrail-v1",
"safe_public_demo": true
}
Kafka message headers
These headers are stamped on every record regardless of decision. A stream processor can filter or route purely on headers without touching the value bytes.
The config.sha256 and model.ref.sha256 headers bind every governance decision to the exact pipeline properties file and ONNX model artifact on disk. Any downstream audit can verify both were unchanged at inference time without a separate audit log.
Reproduce It
The benchmark matrix row, before/after scripts, and pipeline profile are all in the repo. You need Docker (for Kafka), Java 21, and the ONNX model on local disk.
localhost:9092. The included Docker Compose mounts Kafka data on tmpfs to eliminate overlay2 I/O overhead..\scripts\demo_before_defense_telemetry.ps1
.\test-java-runner.ps1 ` -MatrixFile .\benchmark-runs\tests_national_defense.csv ` -SingleTest streamkernel_defense_telemetry_guardrail_5m
_meta.json, _metrics.prom, and _gc.log into benchmark-runs/..\scripts\demo_after_defense_telemetry.ps1 -SampleCount 3
The ONNX warmup runs 64 batches × 16 records on startup — expect ~8 seconds before the pipeline starts processing. This is intentional: it pre-JITs the inference path and stabilizes throughput from the first real batch.
Benchmark Results
Hardware: Intel i9-8950HK (12 cores), CPU-only, no GPU. This is the minimum viable deployment floor — CUDA execution provider available for server hardware.
Throughput log trace
# window_s=5 PROC_EPS EFF% PROC_TOTAL MEM DROPPED 09:40:41 38.5 9.6 4,736 313MB 0 ← ramp-up / JIT settling 09:41:07 44.6 11.1 7,776 235MB 0 09:41:22 141.0 35.2 9,440 172MB 0 ← first peak 09:42:22 141.2 35.3 15,408 226MB 0 ← peak repeated 09:43:17 124.4 31.1 21,184 267MB 0 ← steady mid-run 09:44:22 60.7 15.2 25,280 193MB 0 ← wind-down # EFF = pipeline efficiency as % of 4-thread theoretical ceiling (400 r/s) # EMPTY=0 across all windows — source always had work ready # MEM swings are normal G1 GC sawtooth, not a leak
Tuning Knobs
These are the parameters that actually move the needle. Listed in order of impact.
| Property | This run | What it does |
|---|---|---|
| ai.embedding.tokenizer.max.length | 24 | Dominant factor. Shorter sequences = faster ONNX inference. Healthcare profile uses 16 and gets 3.4× higher throughput on identical hardware. Set to the shortest value your text profile supports. |
| ai.embedding.pool.size | 4 | Number of ONNX predictor instances. At token length 24, pool=1 is optimal for short sequences. At longer sequences the parallelism helps. Keep pool × intra.op.threads ≤ physical cores. |
| ai.embedding.onnx.intra.op.threads | 2 | OnnxRuntime threads per inference call. Hard ceiling: pool × intra ≤ physical cores. Overcommitting here causes contention and reduces throughput. |
| ai.embedding.batching.max.size | 16 | Max records per inference batch. Batch fill was 100% in this run — you can increase this if your source rate supports it. Diminishing returns above 32 on CPU. |
| pipeline.parallelism | 4 | Worker thread count. Should match or be close to embedding pool size. Increasing beyond CPU core count adds context-switch overhead. |
| transform.defense_guardrail.review.threshold | 0.48 | Sensitivity score at or above which a record is routed to audit_queue instead of approved_telemetry. Tune to your operational risk tolerance. |
| transform.defense_guardrail.deny.threshold | 0.82 | Sensitivity score at or above which a record is hard-denied to DLQ regardless of other factors. |
Extending It
Swap the policy engine
The DEFENSE_GUARDRAIL transformer implements the
WireEventTransformer SPI. You can replace the deterministic
policy with an OPA bundle evaluation, a domain-specific classifier, or any logic that reads the embedding and returns
a decision — the output contract (headers + JSON envelope) stays the same.
Swap the sink
Change sink.type to MONGODB,
PULSAR, DELTA, or a
custom SPI sink. The guardrail transform chain is sink-agnostic — the
defense.guardrail.decision header and JSON payload travel unchanged to any destination.
Enable GPU inference
# Switch from CPU to CUDA execution provider ai.embedding.device = gpu ai.embedding.onnx.execution.provider = CUDA ai.embedding.onnx.enable.cpu.fallback = true # falls back if CUDA unavailable
Multi-sink fan-out (next increment)
The current implementation writes all decisions to a single guarded topic. True conditional
fan-out — SAFE to approved-telemetry, REVIEW to
audit-queue, DENY to DLQ — in the same pipeline pass
is the next increment. The header contract is already designed for it.
Add MLflow model management
Swap the static ONNX URI for an MLflow model reference to get live promotion, rollback, and A/B routing without pipeline restarts. A complete promote-detect-rollback-recover cycle runs in under 30 seconds with zero record loss — see the MLflow benchmark for details.