This repository contains a Python/NumPy implementation of the Weighted Reservoir Sampling with Replacement SKIP algorithm, converted from the Julia implementation in StreamSampling.jl.
The WRSWR-SKIP algorithm implements weighted random reservoir sampling with replacement for single element sampling from a stream. This is particularly useful when:
- You need to sample from a stream of weighted data where elements arrive one at a time
- You don't know the total number of elements in advance
- Each element has an associated weight that affects its probability of being selected
- You want constant memory usage regardless of stream size
The algorithm is adapted from "Weighted Reservoir Sampling with Replacement from Multiple Data Streams, A. Meligrana, 2024".
- Streaming: Processes elements one at a time without storing the entire dataset
- Weighted: Each element can have a different probability weight
- Constant Memory: O(1) memory usage regardless of stream size
- Single Pass: Only requires one pass through the data
- Mathematically Sound: Provides correct(ish) probability distributions
The core algorithm maintains:
seen_k: Number of elements processedtotal_w: Cumulative sum of all weightsskip_w: Weight threshold for next updatecurrent_value: Currently stored sample
For each new element (element, weight):
- Increment
seen_kand addweighttototal_w - If
skip_w <= total_w:- Update
skip_w = total_w / random() - Store
elementas the new sample
- Update
This creates the correct weighted probability distribution where element selection probability is proportional to its weight.
import numpy as np
from typing import Any, Optional, Union
from dataclasses import dataclass
from collections import Counter # For examples onlyfrom weighted_sampling_single_python import weighted_sample_single, stream_weighted_sample_single
# Example 1: One-shot sampling
elements = ['apple', 'banana', 'cherry', 'date']
weights = [0.1, 0.3, 0.4, 0.2] # Cherry is most likely
sample = weighted_sample_single(elements, weights)
print(f"Selected: {sample}")
# Example 2: Using a weight function
numbers = range(1, 11)
def weight_func(x):
return 1.0 / x # Smaller numbers get higher weights
sample = weighted_sample_single(numbers, weight_func)
print(f"Selected: {sample}")
# Example 3: Streaming/online processing
sampler = stream_weighted_sample_single()
for element, weight in data_stream:
sampler.fit(element, weight)
current_sample = sampler.value()
print(f"Current sample: {current_sample}")See examples_weighted_sampling.py for comprehensive examples including:
- NumPy array handling
- Large-scale data processing
- Comparison with NumPy's
choice()function - Practical applications (log sampling, feature selection, customer sampling)
The main sampler class for streaming scenarios.
Constructor:
WeightedReservoirSamplerSingle(rng: Optional[np.random.Generator] = None)Methods:
fit(element, weight): Add a weighted element to the streamvalue(): Get the current sample (or None if no elements processed)empty(): Reset the sampler to initial statenobs(): Get the number of elements processed
One-shot sampling from a collection of weighted elements.
weighted_sample_single(
elements: Union[list, np.ndarray],
weights: Union[list, np.ndarray, callable],
rng: Optional[np.random.Generator] = None
) -> AnyParameters:
elements: Collection of elements to sample fromweights: Either weights array (same length as elements) or weight functionrng: Random number generator (optional)
Create a streaming sampler instance.
stream_weighted_sample_single(
rng: Optional[np.random.Generator] = None
) -> WeightedReservoirSamplerSingle| Aspect | Julia (Original) | Python (This Implementation) |
|---|---|---|
| Core Algorithm | ✓ Identical | ✓ Identical |
| Memory Usage | O(1) | O(1) |
| Time Complexity | O(1) per element | O(1) per element |
| Type System | Generic types | Any type with type hints |
| RNG | AbstractRNG | numpy.random.Generator |
| Mutability | Hybrid struct variants | Single class with mutable state |
| Interface | OnlineStatsBase | Simple fit/value methods |
- Type System: Julia's generic type system vs Python's dynamic typing with hints
- Memory Management: Julia's hybrid structs vs Python's standard classes
- RNG: Julia's AbstractRNG vs NumPy's Generator
- Interface: Julia's OnlineStatsBase protocol vs simple method names
Run the test suite:
python weighted_sampling_single_python.pyRun comprehensive examples:
python examples_weighted_sampling.pyThe tests verify:
- Correct probability distributions
- Streaming interface functionality
- Edge cases and error handling
- Performance characteristics
- "Weighted Reservoir Sampling with Replacement from Multiple Data Streams" - A. Meligrana, 2024
- StreamSampling.jl - Original Julia implementation: https://github.com/JuliaDynamics/StreamSampling.jl
- "Random Sampling with a Reservoir" - J. S. Vitter, 1985 (foundational reservoir sampling)
This implementation follows the same principles as the original StreamSampling.jl package. Please refer to the original repository for licensing information.
Feel free to submit issues, feature requests, or pull requests. When contributing:
- Maintain algorithmic correctness with the original Julia implementation
- Add appropriate tests for new features
- Follow Python coding standards (PEP 8)
- Update documentation as needed