Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Functions Reference

Volley provides built-in vectorized functions that work with the expression API (select_expr, filter_expr). These functions operate on Apache Arrow arrays and are exposed as DataFusion UDFs.

Functions are imported from volley_core::functions or via the prelude:

#![allow(unused)]
fn main() {
use volley_core::prelude::*; // imports simhash, hamming_similarity
// or
use volley_core::functions::hash::{simhash, hamming_similarity};
}

Hash Functions

Functions for computing and comparing hash fingerprints.

FunctionSignatureDescription
simhashsimhash(Utf8 / LargeUtf8) → UInt6464-bit SimHash text fingerprint
hamming_similarityhamming_similarity(UInt64, UInt64) → Float64Bitwise similarity between two hashes

simhash

Computes a 64-bit SimHash fingerprint for text similarity. Similar texts produce fingerprints with a small hamming distance, approximating cosine similarity of the original term-frequency vectors.

Signature: simhash(text: Utf8 | LargeUtf8) → UInt64

Algorithm: Tokenizes text into character trigrams, hashes each trigram with a 64-bit hash, accumulates a weighted bit vector, and thresholds to produce the final fingerprint.

Null handling: Null input produces null output.

Example:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use datafusion_expr::col;

// As an expression in a pipeline
stream
    .select_expr(vec![
        col("id"),
        simhash(col("text")).alias("text_hash"),
    ])

// As a direct Arrow kernel
use volley_core::functions::hash::simhash_kernel;
use arrow::array::StringArray;

let input = StringArray::from(vec!["hello world"]);
let hashes = simhash_kernel(&input).unwrap();
}

hamming_similarity

Computes the bitwise similarity between two UInt64 values, returning a Float64 between 0.0 (all bits differ) and 1.0 (identical).

Signature: hamming_similarity(a: UInt64, b: UInt64) → Float64

Formula: 1.0 - popcount(a XOR b) / 64.0

For a 64-bit hash, a similarity threshold of 0.95 means at most 3 bits differ.

Null handling: If either input is null, the output is null.

Example:

#![allow(unused)]
fn main() {
use volley_core::prelude::*;
use datafusion_expr::{col, lit};

// Filter out records where text hasn't changed enough
stream
    .filter_expr(
        hamming_similarity(
            simhash(col("current_text")),
            simhash(col("previous_text")),
        ).lt(lit(0.95))
    )

// Direct Arrow kernel
use volley_core::functions::hash::hamming_similarity_kernel;
use arrow::array::UInt64Array;

let a = UInt64Array::from(vec![0xDEADBEEFu64]);
let b = UInt64Array::from(vec![0xDEADBEFFu64]);
let similarity = hamming_similarity_kernel(&a, &b).unwrap();
}

Proto Functions

Functions for decoding binary protobuf data into Arrow columnar format.

FunctionSignatureDescription
decode_protodecode_proto(Binary / LargeBinary, &ProtoDescriptor) → StructDecode serialized protobuf messages into a struct column

decode_proto

Decodes a binary column containing serialized protobuf messages into an Arrow Struct column with all fields from the message definition.

Signature: decode_proto(data: Binary | LargeBinary, descriptor: &ProtoDescriptor) → Struct

The ProtoDescriptor is constructed once from a serialized FileDescriptorSet and a fully-qualified message name. It caches the Arrow schema derived from the protobuf definition.

Null handling: Null input rows and decode failures produce null struct rows. Decode failures emit a tracing::warn! but do not stop the pipeline.

Requires: protobuf feature flag on volley-core.

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::proto::{decode_proto, ProtoDescriptor};
use datafusion_expr::col;

let descr = ProtoDescriptor::new(DESCRIPTOR_BYTES, "crawl.CrawlRecord").unwrap();

stream
    .select_expr(vec![
        decode_proto(col("data"), &descr).alias("record"),
    ])
    .select_expr(vec![
        col("record.url"),
        col("record.content"),
    ])
}

URL Functions

Functions for URL normalization and extraction.

FunctionSignatureDescription
normalize_urlnormalize_url(Utf8 / LargeUtf8) → Utf8Normalize a URL
normalize_url_with_paramsnormalize_url_with_params(Utf8 / LargeUtf8, Vec<String>) → Utf8Normalize with custom tracking params
extract_domainextract_domain(Utf8 / LargeUtf8) → Utf8Extract host/domain from URL

normalize_url

Normalizes a URL by applying: scheme/host lowercasing, default port removal (:80 for http, :443 for https), path normalization (/a/../b/b), percent-encoding normalization (decode unreserved chars), www. prefix removal, fragment removal, query parameter sorting, tracking parameter removal, and trailing slash removal.

Signature: normalize_url(url: Utf8 | LargeUtf8) → Utf8

Default tracking params removed: utm_source, utm_medium, utm_campaign, utm_term, utm_content, fbclid, gclid, mc_eid, msclkid.

Null handling: Null, empty, and invalid URL inputs produce null output.

Requires: url-functions feature flag on volley-core.

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::url::normalize_url;
use datafusion_expr::col;

stream.select_expr(vec![normalize_url(col("url")).alias("url")])
}

normalize_url_with_params

Same as normalize_url but with a custom tracking params blocklist that fully replaces the defaults.

Signature: normalize_url_with_params(url: Utf8 | LargeUtf8, remove_params: Vec<String>) → Utf8

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::url::normalize_url_with_params;
use datafusion_expr::col;

let custom_params = vec!["ref".to_string(), "src".to_string()];
stream.select_expr(vec![
    normalize_url_with_params(col("url"), custom_params).alias("url"),
])
}

extract_domain

Extracts the full host/domain from a URL string, including subdomains.

Signature: extract_domain(url: Utf8 | LargeUtf8) → Utf8

Null handling: Null, empty, and invalid URL inputs produce null output.

Requires: url-functions feature flag on volley-core.

Example:

#![allow(unused)]
fn main() {
use volley_core::functions::url::extract_domain;
use datafusion_expr::col;

// "https://api.v2.example.com/endpoint" → "api.v2.example.com"
stream.select_expr(vec![extract_domain(col("url")).alias("domain")])

// Direct Arrow kernel
use volley_core::functions::url::extract_domain_kernel;
use arrow::array::StringArray;

let input = StringArray::from(vec!["http://sub.example.com/path"]);
let domains = extract_domain_kernel(&input).unwrap();
}