Skip to content

cooperative-computing-lab/dynamic_data_reduction

Repository files navigation

DDR - Dynamic MapReduce Framework

A flexible framework for distributed data processing using MapReduce patterns.

Installation

Prerequisites

This project requires Python 3.13+ and uses conda for dependency management. We recommend using the provided environment.yml file to create a consistent development environment.

Setting up the Conda Environment

The project includes an environment.yml file with the following dependencies:

name: ddr
channels:
  - conda-forge
dependencies:
  - coffea=>2025.3.0
  - fsspec-xrootd=>0.5.1
  - ndcctools>=7.15.8
  - python=>3.12
  - rich=>13.9.4
  - uproot=>5.6.0
  - xrootd=>5.8.1
  - setuptools<81
  1. Create the conda environment from the provided environment.yml file:

    conda env create -f environment.yml
  2. Activate the environment:

    conda activate ddr
  3. Verify the installation:

    python --version  # Should show Python 3.13.2
    conda list | grep -E "(coffea|ndcctools)"  # Should show the installed packages

From PyPI

pip install dynamic_data_reduction

Installing from Source

Once you have the conda environment set up:

# Clone the repository
git clone https://github.com/cooperative-computing-lab/dynamic_data_reduction.git
cd dynamic_data_reduction

# Activate the conda environment (if not already active)
conda activate ddr

# Install the package in development mode
pip install -e .

Quick Start

Minimal toy example to get started:

from dynamic_data_reduction import DynamicDataReduction
import ndcctools.taskvine as vine
import getpass

# Simple data: process two datasets
data = {
    "datasets": {
        "numbers": {"values": [1, 2, 3, 4, 5]},
        "more_numbers": {"values": [10, 20, 30]}
    }
}

# Define functions
def preprocess(dataset_info, **kwargs):
    for val in dataset_info["values"]:
        yield (val, 1)

def postprocess(val, **kwargs):
    return val  # Just return the value

def processor(x):
    return x * 2  # Double each number

def reducer(a, b):
    return a + b  # Sum the results

# Run
mgr = vine.Manager(port=[9123, 9129], name=f"{getpass.getuser()}-quick-start-ddr")
print(f"Manager started on port {mgr.port}")
ddr = DynamicDataReduction(mgr,
                           data=data,
                           source_preprocess=preprocess, 
                           source_postprocess=postprocess,
                           processors=processor, 
                           accumulator=reducer)

# Use local workers, condor, slurm, or sge for scale
workers = vine.Factory("local", manager=mgr)
workers.max_workers = 2
workers.min_workers = 0
workers.cores = 4
workers.memory = 2000
workers.disk = 8000
with workers:
    result = ddr.compute()

print(f"Result: {result}")
# Expected: {"processor": {"numbers": 30, "more_numbers": 120}}

Usage

Every workflow needs four pieces: a TaskVine manager (vine.Manager), workers (vine.Factory, typically "local" for first runs), a data spec with user-defined callbacks (or Coffea equivalents), and a call to compute() inside a with workers: block.

flowchart LR
    dataSpec[DataSpec] --> preprocess[source_preprocess]
    preprocess --> chunks[Chunks]
    chunks --> postprocess[source_postprocess]
    postprocess --> mapStep[processors]
    mapStep --> reduceStep[accumulator]
    reduceStep --> result[Result]
Loading

compute() returns a nested dict: {"<processor_name>": {"<dataset_name>": <accumulated_result>, ...}, ...}.

Getting started checklist

After Installation:

Step Action
1 Activate env: conda activate ddr
2 Install package: pip install -e . (from source) or pip install dynamic_data_reduction
3 Copy/adapt the Quick Start script, or run a bundled example (below)
4 Start vine.Manager (pick a port range, e.g. port=[9123, 9129])
5 Configure vine.Factory("local", manager=mgr) with max_workers, cores, memory, disk
6 Build DynamicDataReduction(...) or CoffeaDynamicDataReduction(...)
7 Run with workers: result = ddr.compute()

Path A — Custom pipelines (DynamicDataReduction)

Use this for arbitrary data and processing logic. Data shape: {"datasets": {"name": {...}}}. You provide four callbacks: source_preprocess, source_postprocess, processors, and accumulator.

Path B — ROOT / Coffea (CoffeaDynamicDataReduction)

Use this for HEP analysis with Coffea processors. Data shape: flat dict per dataset ({"ZJets": {"files": {...}}, ...}); the Coffea wrapper adds chunking automatically. Optionally call preprocess() first to fill in num_entries per file.

Coffea examples expect sample ROOT files under examples/samples/ (not shipped in the repo); supply your own files or adjust paths.

Running bundled examples

conda activate ddr
cd dynamic_data_reduction
pip install -e .

# Generic pipeline
python examples/quick_start/quick.py
python examples/simple/simple-example.py

# Coffea (requires ROOT sample files)
python examples/coffea_processor/example_with_preprocess.py

For cluster runs, change the vine.Factory batch type from "local" to condor, slurm, or sge (see Quick Start above).

See DOCUMENTATION.md for advanced configuration, checkpointing, resource tuning, and the API reference. See DEVELOPMENT.md for internal architecture and modification guide.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors