Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kubernetes Operator

Volley includes a Kubernetes operator that automates deployment and management of Volley stream processing applications. Define a VolleyApplication custom resource and the operator handles StatefulSets, persistent storage, health checks, and Prometheus integration.

Prerequisites

  • Kubernetes cluster (v1.26+)
  • prometheus-operator (optional, for automatic metrics scraping)
  • KEDA (optional, for autoscaling)

Installing the Operator

Build and deploy the operator:

# Build the operator binary
cargo build --release -p volley-k8s-operator

# Generate the CRD manifest
./target/release/volley-k8s-operator generate-crd > volley-crd.json

# Apply the CRD to your cluster
kubectl apply -f volley-crd.json

# Run the operator (in-cluster or locally for development)
./target/release/volley-k8s-operator

Deploying a Volley Application

Create a VolleyApplication resource:

apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: default
spec:
  # Container image (required)
  image: registry.example.com/my-volley-app:v1.0

  # Resource requirements (required)
  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

  # RocksDB state storage
  state:
    storageClassName: fast-ssd
    size: 10Gi

  # Checkpoint storage
  checkpoints:
    storageClassName: fast-ssd
    size: 20Gi

  # Environment variables
  env:
    - name: RUST_LOG
      value: "info"
    - name: KAFKA_BOOTSTRAP_SERVERS
      value: "kafka.default.svc:9092"
kubectl apply -f my-pipeline.yaml

The operator creates the following resources automatically:

ResourcePurpose
StatefulSetRuns the app with stable pod identity and persistent volumes
PVC (state)Persistent storage for RocksDB state (/var/lib/volley/state)
PVC (checkpoints)Persistent storage for checkpoints (/var/lib/volley/checkpoints)
Service (headless)StatefulSet DNS for stable network identity
Service (metrics)Exposes the Prometheus metrics port
ServiceMonitorPrometheus-operator auto-discovery for metrics scraping
PodDisruptionBudgetPrevents involuntary eviction during checkpointing

Health Checks

The operator automatically configures liveness and readiness probes backed by Volley’s built-in HealthReporter:

ProbePathBehaviour
Liveness/health (port 8080)Returns healthy when pipeline is Running or Starting
Readiness/ready (port 8080)Returns ready only when pipeline is Running

Customize the health check configuration:

spec:
  health:
    port: 8080
    livenessPath: /health
    readinessPath: /ready
    initialDelaySeconds: 30
    periodSeconds: 10

Observability

Metrics are exposed on port 9090 by default. When prometheus-operator is installed, the operator creates a ServiceMonitor for automatic scraping:

spec:
  observability:
    metricsPort: 9090
    metricsPath: /metrics
    serviceMonitor:
      enabled: true
      interval: 15s
      labels:
        release: prometheus   # match your Prometheus selector
    otlpEndpoint: http://otel-collector.monitoring:4317  # optional tracing

Key Volley metrics available for dashboards and alerting:

MetricTypeDescription
volley.source.records_polledCounterRecords read from source
volley.records.processedCounterRecords processed per operator
volley.stage.latency_msHistogramPer-operator processing latency
volley.checkpoint.duration_msHistogramCheckpoint completion time
volley.channel.utilizationGaugeBackpressure indicator (0.0-1.0)
volley.kafka.consumer_lagGaugeKafka consumer group lag
volley.pipeline.uptime_secondsGaugePipeline uptime

Node Isolation

Following Flink best practice, each Volley application should run on dedicated nodes. The operator supports node selectors, tolerations, and pod anti-affinity:

spec:
  isolation:
    # Schedule only on nodes labelled for stream processing
    nodeSelector:
      volley.io/pool: stream-processing
    # Tolerate the dedicated taint
    tolerations:
      - key: volley.io/dedicated
        operator: Equal
        value: "true"
        effect: NoSchedule
    # One pod per node (enabled by default)
    podAntiAffinity: true

Distributed Execution (Horizontal Scaling)

To run a pipeline across multiple pods, add the cluster section to your spec:

First, create a ReadWriteMany PVC backed by your cloud provider’s managed NFS (AWS EFS, GCP Filestore, Azure Files):

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: volley-shared-ckpt
spec:
  accessModes: [ReadWriteMany]
  storageClassName: efs  # your RWX storage class
  resources:
    requests:
      storage: 50Gi

Then reference it in your VolleyApplication:

spec:
  replicas: 3
  cluster:
    maxKeyGroups: 256                    # virtual partitions (default, power of 2)
    flightPort: 50051                    # Arrow Flight port (default)
    sharedCheckpoints:
      claimName: volley-shared-ckpt      # the PVC you created above

When cluster is present, the operator automatically:

ActionDetail
Injects VOLLEY_* env varsVOLLEY_NUM_WORKERS, VOLLEY_HEADLESS_SERVICE, VOLLEY_MAX_KEY_GROUPS, VOLLEY_FLIGHT_PORT, VOLLEY_APP_NAME, VOLLEY_SHARED_CHECKPOINT_DIR
Adds Flight portContainer port 50051 + headless service port for inter-pod communication
Mounts shared PVCThe existing ReadWriteMany PVC is mounted into all pods at /mnt/volley/checkpoints

The Volley runtime reads these env vars via ClusterConfig::from_env() and automatically enters distributed mode.

Shared checkpoint storage uses a pre-provisioned ReadWriteMany PVC (managed NFS from your cloud provider). All workers write checkpoints to the same shared mount, so recovery and rescaling are near-instant — no data transfer needed.

Rescaling is live: change spec.replicas and the coordinator triggers a checkpoint, recomputes key group assignments, and workers pick up their new key groups from the shared filesystem. No restart required.

Autoscaling with KEDA (Future)

The operator has built-in support for KEDA autoscaling. When enabled, it creates a ScaledObject that can scale based on Kafka consumer lag, Prometheus metrics, or CPU/memory:

spec:
  autoscaling:
    enabled: true
    minReplicas: 1
    maxReplicas: 10
    triggers:
      # Scale on Kafka consumer lag
      - type: kafka
        metadata:
          bootstrapServers: kafka.default.svc:9092
          consumerGroup: my-pipeline-group
          topic: input-events
          lagThreshold: "1000"
      # Scale on backpressure (channel utilization)
      - type: prometheus
        metadata:
          serverAddress: http://prometheus.monitoring:9090
          query: avg(volley_channel_utilization{job="my-pipeline"})
          threshold: "0.8"
      # Scale on CPU
      - type: cpu
        metadata:
          value: "70"

Checking Application Status

# List all Volley applications
kubectl get volleyapplications
# or use the short name:
kubectl get vapp

# Check status
kubectl describe vapp my-pipeline

# View operator logs
kubectl logs deployment/volley-k8s-operator

The status subresource reports the current phase (Pending, Running, Degraded, Failed), replica counts, and a human-readable message.

Full Spec Reference

Click to expand the full VolleyApplication spec
apiVersion: volley.io/v1alpha1
kind: VolleyApplication
metadata:
  name: my-pipeline
  namespace: default
spec:
  image: registry.example.com/my-app:v1.0   # required
  imagePullPolicy: IfNotPresent              # default: IfNotPresent
  imagePullSecrets:
    - name: registry-secret

  resources:                                  # required
    requests: { cpu: "2", memory: "4Gi" }
    limits:   { cpu: "4", memory: "8Gi" }

  replicas: 1                                 # default: 1
  args: ["--config", "/etc/volley/config.yaml"]

  env:
    - name: RUST_LOG
      value: "info"
    - name: DB_PASSWORD
      valueFrom:
        secretKeyRef: { name: db-creds, key: password }

  state:
    storageClassName: fast-ssd
    size: 10Gi
    mountPath: /var/lib/volley/state          # default

  checkpoints:
    storageClassName: fast-ssd
    size: 20Gi
    mountPath: /var/lib/volley/checkpoints    # default

  observability:
    metricsPort: 9090                         # default: 9090
    metricsPath: /metrics                     # default: /metrics
    serviceMonitor:
      enabled: true                           # default: true
      interval: 15s                           # default: 15s
      labels: { release: prometheus }
    otlpEndpoint: http://otel-collector:4317

  health:
    port: 8080                                # default: 8080
    livenessPath: /health                     # default: /health
    readinessPath: /ready                     # default: /ready
    initialDelaySeconds: 30                   # default: 30
    periodSeconds: 10                         # default: 10

  isolation:
    nodeSelector: { volley.io/pool: stream-processing }
    tolerations:
      - key: volley.io/dedicated
        operator: Equal
        value: "true"
        effect: NoSchedule
    podAntiAffinity: true                     # default: true

  kafka:
    bootstrapServers: kafka:9092
    consumerGroup: my-group
    topics: [input-events]

  autoscaling:
    enabled: false                            # default: false
    minReplicas: 1
    maxReplicas: 10
    triggers:
      - type: kafka
        metadata:
          bootstrapServers: kafka:9092
          consumerGroup: my-group
          topic: input-events
          lagThreshold: "1000"

  cluster:                                      # distributed execution (optional)
    maxKeyGroups: 256                           # default: 256 (power of 2)
    flightPort: 50051                           # default: 50051
    sharedCheckpoints:
      claimName: volley-shared-ckpt             # existing ReadWriteMany PVC
      mountPath: /mnt/volley/checkpoints        # default

  labels: {}                                  # additional labels on all resources
  annotations: {}                             # additional annotations on all resources