A flexible framework for distributed data processing using MapReduce patterns.
This project requires Python 3.13+ and uses conda for dependency management. We recommend using the provided environment.yml file to create a consistent development environment.
The project includes an environment.yml file with the following dependencies:
name: ddr
channels:
- conda-forge
dependencies:
- coffea=>2025.3.0
- fsspec-xrootd=>0.5.1
- ndcctools>=7.15.8
- python=>3.12
- rich=>13.9.4
- uproot=>5.6.0
- xrootd=>5.8.1
- setuptools<81-
Create the conda environment from the provided environment.yml file:
conda env create -f environment.yml
-
Activate the environment:
conda activate ddr
-
Verify the installation:
python --version # Should show Python 3.13.2 conda list | grep -E "(coffea|ndcctools)" # Should show the installed packages
pip install dynamic_data_reductionOnce you have the conda environment set up:
# Clone the repository
git clone https://github.com/cooperative-computing-lab/dynamic_data_reduction.git
cd dynamic_data_reduction
# Activate the conda environment (if not already active)
conda activate ddr
# Install the package in development mode
pip install -e .Minimal toy example to get started:
from dynamic_data_reduction import DynamicDataReduction
import ndcctools.taskvine as vine
import getpass
# Simple data: process two datasets
data = {
"datasets": {
"numbers": {"values": [1, 2, 3, 4, 5]},
"more_numbers": {"values": [10, 20, 30]}
}
}
# Define functions
def preprocess(dataset_info, **kwargs):
for val in dataset_info["values"]:
yield (val, 1)
def postprocess(val, **kwargs):
return val # Just return the value
def processor(x):
return x * 2 # Double each number
def reducer(a, b):
return a + b # Sum the results
# Run
mgr = vine.Manager(port=[9123, 9129], name=f"{getpass.getuser()}-quick-start-ddr")
print(f"Manager started on port {mgr.port}")
ddr = DynamicDataReduction(mgr,
data=data,
source_preprocess=preprocess,
source_postprocess=postprocess,
processors=processor,
accumulator=reducer)
# Use local workers, condor, slurm, or sge for scale
workers = vine.Factory("local", manager=mgr)
workers.max_workers = 2
workers.min_workers = 0
workers.cores = 4
workers.memory = 2000
workers.disk = 8000
with workers:
result = ddr.compute()
print(f"Result: {result}")
# Expected: {"processor": {"numbers": 30, "more_numbers": 120}}Every workflow needs four pieces: a TaskVine manager (vine.Manager), workers (vine.Factory, typically "local" for first runs), a data spec with user-defined callbacks (or Coffea equivalents), and a call to compute() inside a with workers: block.
flowchart LR
dataSpec[DataSpec] --> preprocess[source_preprocess]
preprocess --> chunks[Chunks]
chunks --> postprocess[source_postprocess]
postprocess --> mapStep[processors]
mapStep --> reduceStep[accumulator]
reduceStep --> result[Result]
compute() returns a nested dict: {"<processor_name>": {"<dataset_name>": <accumulated_result>, ...}, ...}.
After Installation:
| Step | Action |
|---|---|
| 1 | Activate env: conda activate ddr |
| 2 | Install package: pip install -e . (from source) or pip install dynamic_data_reduction |
| 3 | Copy/adapt the Quick Start script, or run a bundled example (below) |
| 4 | Start vine.Manager (pick a port range, e.g. port=[9123, 9129]) |
| 5 | Configure vine.Factory("local", manager=mgr) with max_workers, cores, memory, disk |
| 6 | Build DynamicDataReduction(...) or CoffeaDynamicDataReduction(...) |
| 7 | Run with workers: result = ddr.compute() |
Use this for arbitrary data and processing logic. Data shape: {"datasets": {"name": {...}}}. You provide four callbacks: source_preprocess, source_postprocess, processors, and accumulator.
- Start with: examples/simple/simple-example.py or examples/quick_start/quick.py
- Details: DOCUMENTATION.md § Basic Usage
Use this for HEP analysis with Coffea processors. Data shape: flat dict per dataset ({"ZJets": {"files": {...}}, ...}); the Coffea wrapper adds chunking automatically. Optionally call preprocess() first to fill in num_entries per file.
- Start with: examples/coffea_processor/example_with_preprocess.py
- Real analysis: examples/cortado/ddr_cortado.py
- Details: DOCUMENTATION.md § Coffea and § Preprocessing
Coffea examples expect sample ROOT files under examples/samples/ (not shipped in the repo); supply your own files or adjust paths.
conda activate ddr
cd dynamic_data_reduction
pip install -e .
# Generic pipeline
python examples/quick_start/quick.py
python examples/simple/simple-example.py
# Coffea (requires ROOT sample files)
python examples/coffea_processor/example_with_preprocess.pyFor cluster runs, change the vine.Factory batch type from "local" to condor, slurm, or sge (see Quick Start above).
See DOCUMENTATION.md for advanced configuration, checkpointing, resource tuning, and the API reference. See DEVELOPMENT.md for internal architecture and modification guide.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.