PHI-Aware Governed
AI Pipeline
Detect PHI fields, apply redaction or tokenization, run local AI enrichment, and write a governed record to Kafka — all in the same JVM process, before the event touches any downstream consumer.
Healthcare data platforms face a real tension: AI enrichment — risk scoring, care-gap detection, population analytics — requires events to move. But many of those same events contain PHI that can't reach external models or unmanaged analytics sinks without prior inspection.
This pipeline resolves that in-process. By the time a record is written to
Kafka it already has a policy decision, an enumerated list of detected PHI fields,
a transformed delivered_payload
(redacted, tokenized, or blocked), local AI enrichment labels, and a full provenance chain.
How it compares to the Defense Telemetry profile
Same hardware, same JVM settings, same pipeline skeleton — different tokenizer context. The shorter token max length (16 vs 24) is the primary throughput driver.
This is a PHI-aware governed AI pipeline pattern using safe synthetic events. It demonstrates the structural controls — detection, labeling, redaction, tokenization, routing, provenance — that a HIPAA-compliant deployment would require. It does not constitute HIPAA compliance out of the box. All patient, member, provider, diagnosis, and claim data is fully synthetic.
Pipeline & Config
HC SOURCE
_WIREEVENT
GUARDRAIL
GOVERNED
Pipeline profile — exact config from this run
# ── Pipeline identity ──────────────────────────────────────── pipeline.id = sk-healthcare-phi-safe-ai pipeline.parallelism = 4 pipeline.batch.size = 16 pipeline.drain.timeout.ms = 30000 # ── Transform chain ────────────────────────────────────────── transform.chain = STRING_TO_WIREEVENT,DJL_EMBEDDING,PHI_GUARDRAIL transform.version = healthcare-phi-safe-ai-v1 # ── PHI guardrail policy ───────────────────────────────────── transform.phi_guardrail.policy.version = healthcare-phi-governance-policy-v1 transform.phi_guardrail.model.version = deterministic-healthcare-ai-v1 transform.phi_guardrail.high_claim.amount= 5000.0 # claim_amount ≥ this → elevated risk transform.string_to_wireevent.use.payload.id.as.key = true # ── ONNX embedding ─────────────────────────────────────────── ai.embedding.enabled = true ai.embedding.engine = OnnxRuntime ai.embedding.device = cpu ai.embedding.pool.size = 4 ai.embedding.batching.enabled = true ai.embedding.batching.max.size = 16 ai.embedding.batching.flush.ms = 10 ai.embedding.onnx.intra.op.threads = 2 ai.embedding.onnx.inter.op.threads = 1 ai.embedding.onnx.warmup.count = 64 ai.embedding.tokenizer.max.length = 16 # ← shorter than defense (24) → 3.4× faster ai.embedding.input.max.chars = 768 ai.embedding.normalize = true ai.embedding.model.uri = file:///models/minilm-l6-v2-onnx/model.onnx ai.embedding.tokenizer.uri = file:///models/minilm-l6-v2-onnx/tokenizer.json ai.feature.version = healthcare-phi-features-v1 # ── Source ─────────────────────────────────────────────────── source.type = SYNTHETIC source.synthetic.text.profile = HEALTHCARE_EVENTS source.synthetic.payload.size = 768 # larger than defense (640) source.synthetic.max.records.per.second = 0 # ── Kafka sink ─────────────────────────────────────────────── sink.type = KAFKA sink.kafka.topic = streamkernel-healthcare-phi-governed sink.kafka.bootstrap.servers = localhost:9092 sink.kafka.compression.type = lz4 sink.kafka.acks = 1 sink.kafka.batch.size = 262144 sink.kafka.linger.ms = 10 # slightly longer than defense (5ms) sink.kafka.enable.idempotence = false # ── Security ───────────────────────────────────────────────── security.type = PERMIT_ALL # replace with OIDC for prod # ── Observability ──────────────────────────────────────────── metrics.provider = PROMETHEUS metrics.prometheus.port = 8080 metrics.tag.run_id = run-healthcare-phi-01 metrics.tag.env = healthcare-demo
PHI Decisions
The PHI_GUARDRAIL transformer evaluates each
event against a deterministic healthcare policy. It detects PHI fields, assesses the requested destination,
scores privacy risk, and emits one of four decisions. All four were observed and verified in this run.
Detected PHI fields across all event types:
***. Structured fields preserved.Decision coverage confirmed by post-run script: BLOCK ✓ REDACT ✓ ESCALATE ✓ TOKENIZE ✓
Output Contract
What the transformer actually does to the payload
The key thing distinguishing this from a routing-only pattern: the transformer
constructs a delivered_payload
that reflects the decision. A BLOCK produces a stub. A TOKENIZE produces real data with
identifiers swapped for stable tokens. The original payload is never forwarded as-is when PHI is present.
{
"blocked": true,
"original_payload_retained": false,
"review_reason": "PHI blocked before
external model or unmanaged
destination"
}
{
"patient_token": "tok_8c29c1e64ecb",
"member_token": "tok_3ed8e0b853c8",
"provider_token": "tok_fab89005e89c",
"diagnosis_group":"J45",
"claim_amount_bucket":"1000_to_4999",
"clinical_note": "Provider submitted
supporting note for review."
}
Local AI enrichment — conditionally applied
AI enrichment runs inside the same transformer using the embedding produced by
DJL_EMBEDDING. No second process, no
external API call. It's skipped entirely for BLOCK decisions:
{
"ai_enrichment": {
"enabled": true,
"event_category": "patient_engagement",
"urgency": "low",
"claim_risk_score": 0.1227
}
}
// BLOCK decision — enrichment skipped
{
"ai_enrichment": {
"enabled": false,
"event_category": "not_enriched_blocked",
"urgency": "n/a",
"claim_risk_score": 0.0
}
}
Kafka message headers
Reproduce It
localhost:9092. Use the included Docker Compose with tmpfs for Kafka data to avoid overlay2 bottleneck..\scripts\demo_before_healthcare_phi.ps1
.\test-java-runner.ps1 ` -MatrixFile .\benchmark-runs\tests_healthcare.csv ` -SingleTest streamkernel_healthcare_phi_safe_ai_5m
.\scripts\demo_after_healthcare_phi.ps1 -SampleCount 3
Benchmark Results
Hardware: Intel i9-8950HK (12 cores), CPU-only. Same hardware and JVM settings as the defense telemetry profile — the throughput difference is entirely explained by token context length.
Throughput log trace
# window_s=5 PROC_EPS EFF% PROC_TOTAL MEM DROPPED 20:36:45 265.4 66.3 1,328 313MB 0 ← fast ramp (shorter token context) 20:37:00 220.4 55.1 4,544 239MB 0 20:37:30 291.2 72.8 11,344 218MB 0 20:40:15 281.7 70.4 51,760 416MB 0 20:40:55 370.9 92.7 61,968 1,153MB 0 ← peak (92.7% of 4-thread ceiling) 20:41:20 316.7 79.2 68,064 503MB 0 20:41:40 147.4 36.8 72,768 845MB 0 ← wind-down # DROPPED=0 across all 60 windows | EMPTY=0 throughout # MEM at peak (1,153MB) = G1 before collection; sawtooth is normal # EFF = % of 4-thread × 100 r/s theoretical ceiling
Tuning Knobs
| Property | This run | What it does |
|---|---|---|
| ai.embedding.tokenizer.max.length | 16 | Dominant factor. This is why this profile is 3.4× faster than the defense profile (24 tokens). Set to the shortest value your event text supports — every token less is a meaningful speedup on CPU. |
| transform.phi_guardrail.high_claim.amount | 5000.0 | Claim amounts at or above this value contribute to elevated privacy risk scoring. Tune to your organization's definition of high-value claims. |
| sink.kafka.linger.ms | 10 | Slightly higher than the defense profile (5 ms) to allow larger Kafka batches given the higher record throughput. Increase further if producer throughput is the bottleneck. |
| ai.embedding.pool.size | 4 | Number of ONNX predictor instances. Keep pool × intra.op.threads ≤ physical cores to avoid thread contention. |
| ai.embedding.batching.max.size | 16 | Batch fill was 100% throughout — the source kept up. You can increase this if latency budget allows. Diminishing returns above 32 on CPU at this token length. |
Extending It
Plug in your own PHI detector
The PHI_GUARDRAIL transformer
implements the WireEventTransformer SPI.
Replace the deterministic field scanner with an NLP-based detector, a custom rule engine,
or a fine-tuned clinical classifier — the output contract (headers + delivered_payload)
stays the same. Downstream consumers don't change.
Replace the embedding model
Swap all-MiniLM-L6-v2 for a clinical domain model fine-tuned on ICD codes, clinical notes, or
claims text. Change ai.embedding.model.uri and
ai.embedding.tokenizer.uri to point
at the new ONNX artifact — no code changes required.
Swap the sink
Change sink.type to
MONGODB to write governed records
directly to a vector store for downstream RAG. The healthcare.policy.decision
header travels unchanged to any sink type.
Multi-sink fan-out (next increment)
Physical routing to separate topics — TOKENIZE events to
local-ai-vector, ESCALATE to
audit-review, BLOCK to DLQ — in the
same pipeline pass. The header contract is already designed for it.
Enable GPU inference
ai.embedding.device = gpu ai.embedding.onnx.execution.provider = CUDA ai.embedding.onnx.enable.cpu.fallback = true