Kubernetes Operator
Volley includes a Kubernetes operator that automates deployment and management of Volley stream processing applications. Define a VolleyApplication custom resource and the operator handles StatefulSets, persistent storage, health checks, and Prometheus integration.
Prerequisites
- Kubernetes cluster (v1.26+)
- prometheus-operator (optional, for automatic metrics scraping)
- KEDA (optional, for autoscaling)
Installing the Operator
Build and deploy the operator:
# Build the operator binary
cargo build --release -p volley-k8s-operator
# Generate the CRD manifest
./target/release/volley-k8s-operator generate-crd > volley-crd.json
# Apply the CRD to your cluster
kubectl apply -f volley-crd.json
# Run the operator (in-cluster or locally for development)
./target/release/volley-k8s-operator
Deploying a Volley Application
Create a VolleyApplication resource:
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: default
spec:
# Container image (required)
image: registry.example.com/my-volley-app:v1.0
# Resource requirements (required)
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
# RocksDB state storage
state:
storageClassName: fast-ssd
size: 10Gi
# Checkpoint storage
checkpoints:
storageClassName: fast-ssd
size: 20Gi
# Environment variables
env:
- name: RUST_LOG
value: "info"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka.default.svc:9092"
kubectl apply -f my-pipeline.yaml
The operator creates the following resources automatically:
| Resource | Purpose |
|---|---|
| StatefulSet | Runs the app with stable pod identity and persistent volumes |
| PVC (state) | Persistent storage for RocksDB state (/var/lib/volley/state) |
| PVC (checkpoints) | Persistent storage for checkpoints (/var/lib/volley/checkpoints) |
| Service (headless) | StatefulSet DNS for stable network identity |
| Service (metrics) | Exposes the Prometheus metrics port |
| ServiceMonitor | Prometheus-operator auto-discovery for metrics scraping |
| PodDisruptionBudget | Prevents involuntary eviction during checkpointing |
Health Checks
The operator automatically configures liveness and readiness probes backed by Volley’s built-in HealthReporter:
| Probe | Path | Behaviour |
|---|---|---|
| Liveness | /health (port 8080) | Returns healthy when pipeline is Running or Starting |
| Readiness | /ready (port 8080) | Returns ready only when pipeline is Running |
Customize the health check configuration:
spec:
health:
port: 8080
livenessPath: /health
readinessPath: /ready
initialDelaySeconds: 30
periodSeconds: 10
Observability
Metrics are exposed on port 9090 by default. When prometheus-operator is installed, the operator creates a ServiceMonitor for automatic scraping:
spec:
observability:
metricsPort: 9090
metricsPath: /metrics
serviceMonitor:
enabled: true
interval: 15s
labels:
release: prometheus # match your Prometheus selector
otlpEndpoint: http://otel-collector.monitoring:4317 # optional tracing
Key Volley metrics available for dashboards and alerting:
| Metric | Type | Description |
|---|---|---|
volley.source.records_polled | Counter | Records read from source |
volley.records.processed | Counter | Records processed per operator |
volley.stage.latency_ms | Histogram | Per-operator processing latency |
volley.checkpoint.duration_ms | Histogram | Checkpoint completion time |
volley.channel.utilization | Gauge | Backpressure indicator (0.0-1.0) |
volley.kafka.consumer_lag | Gauge | Kafka consumer group lag |
volley.pipeline.uptime_seconds | Gauge | Pipeline uptime |
Node Isolation
Following Flink best practice, each Volley application should run on dedicated nodes. The operator supports node selectors, tolerations, and pod anti-affinity:
spec:
isolation:
# Schedule only on nodes labelled for stream processing
nodeSelector:
volley.io/pool: stream-processing
# Tolerate the dedicated taint
tolerations:
- key: volley.io/dedicated
operator: Equal
value: "true"
effect: NoSchedule
# One pod per node (enabled by default)
podAntiAffinity: true
Distributed Execution (Horizontal Scaling)
To run a pipeline across multiple pods, add the cluster section to your spec:
First, create a ReadWriteMany PVC backed by your cloud provider’s managed NFS (AWS EFS, GCP Filestore, Azure Files):
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: volley-shared-ckpt
spec:
accessModes: [ReadWriteMany]
storageClassName: efs # your RWX storage class
resources:
requests:
storage: 50Gi
Then reference it in your VolleyApplication:
spec:
replicas: 3
cluster:
maxKeyGroups: 256 # virtual partitions (default, power of 2)
flightPort: 50051 # Arrow Flight port (default)
sharedCheckpoints:
claimName: volley-shared-ckpt # the PVC you created above
When cluster is present, the operator automatically:
| Action | Detail |
|---|---|
Injects VOLLEY_* env vars | VOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME, VOLLEY_SHARED_CHECKPOINT_DIR |
| Adds Flight port | Container port 50051 + headless service port for inter-pod communication |
| Mounts shared PVC | The existing ReadWriteMany PVC is mounted into all pods at /mnt/volley/checkpoints |
The Volley runtime reads these env vars via ClusterConfig::from_env() and automatically enters distributed mode.
Shared checkpoint storage uses a pre-provisioned ReadWriteMany PVC (managed NFS from your cloud provider). All workers write checkpoints to the same shared mount, so recovery and rescaling are near-instant — no data transfer needed.
Rescaling is live: change spec.replicas and the coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.
Autoscaling with KEDA (Future)
The operator has built-in support for KEDA autoscaling. When enabled, it creates a ScaledObject that can scale based on Kafka consumer lag, Prometheus metrics, or CPU/memory:
spec:
autoscaling:
enabled: true
minReplicas: 1
maxReplicas: 10
triggers:
# Scale on Kafka consumer lag
- type: kafka
metadata:
bootstrapServers: kafka.default.svc:9092
consumerGroup: my-pipeline-group
topic: input-events
lagThreshold: "1000"
# Scale on backpressure (channel utilization)
- type: prometheus
metadata:
serverAddress: http://prometheus.monitoring:9090
query: avg(volley_channel_utilization{job="my-pipeline"})
threshold: "0.8"
# Scale on CPU
- type: cpu
metadata:
value: "70"
Checking Application Status
# List all Volley applications
kubectl get volleyapplications
# or use the short name:
kubectl get vapp
# Check status
kubectl describe vapp my-pipeline
# View operator logs
kubectl logs deployment/volley-k8s-operator
The status subresource reports the current phase (Pending, Running, Degraded, Failed), replica counts, and a human-readable message.
Full Spec Reference
Click to expand the full VolleyApplication spec
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
name: my-pipeline
namespace: default
spec:
image: registry.example.com/my-app:v1.0 # required
imagePullPolicy: IfNotPresent # default: IfNotPresent
imagePullSecrets:
- name: registry-secret
resources: # required
requests: { cpu: "2", memory: "4Gi" }
limits: { cpu: "4", memory: "8Gi" }
replicas: 1 # default: 1
args: ["--config", "/etc/volley/config.yaml"]
env:
- name: RUST_LOG
value: "info"
- name: DB_PASSWORD
valueFrom:
secretKeyRef: { name: db-creds, key: password }
state:
storageClassName: fast-ssd
size: 10Gi
mountPath: /var/lib/volley/state # default
checkpoints:
storageClassName: fast-ssd
size: 20Gi
mountPath: /var/lib/volley/checkpoints # default
observability:
metricsPort: 9090 # default: 9090
metricsPath: /metrics # default: /metrics
serviceMonitor:
enabled: true # default: true
interval: 15s # default: 15s
labels: { release: prometheus }
otlpEndpoint: http://otel-collector:4317
health:
port: 8080 # default: 8080
livenessPath: /health # default: /health
readinessPath: /ready # default: /ready
initialDelaySeconds: 30 # default: 30
periodSeconds: 10 # default: 10
isolation:
nodeSelector: { volley.io/pool: stream-processing }
tolerations:
- key: volley.io/dedicated
operator: Equal
value: "true"
effect: NoSchedule
podAntiAffinity: true # default: true
kafka:
bootstrapServers: kafka:9092
consumerGroup: my-group
topics: [input-events]
autoscaling:
enabled: false # default: false
minReplicas: 1
maxReplicas: 10
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: my-group
topic: input-events
lagThreshold: "1000"
cluster: # distributed execution (optional)
maxKeyGroups: 256 # default: 256 (power of 2)
flightPort: 50051 # default: 50051
sharedCheckpoints:
claimName: volley-shared-ckpt # existing ReadWriteMany PVC
mountPath: /mnt/volley/checkpoints # default
labels: {} # additional labels on all resources
annotations: {} # additional annotations on all resources