Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .claude/skills/outerbounds/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ and a description how it works in [starter-project.md](starter-project.md).
You must

- Include batch, offline workflows under `flows/`, structured as Metaflow flows.
- Preferably include a `@highlight` card in each flow (see `HighlightTester` in the starter project for example)
- Preferably include a `@highlight` card in each flow in the `end` step
(see `HighlightTester` in the starter project for example)
- Include online componets under `deployments/` with a proper configuration.
- Define data assets under `data/` and model assets under `models/`
- Read [project-assets.md](project-assets.md) for instructions how to define assets
- Include a `@card` for steps that consume and produce assets
- Include a `@card` for steps that consume and produce assets (it must be the only `@card` in the step)
- Include a descriptive README.md at the top level, for each `deployment`, `flow`, and asset.

## Defining flows
Expand All @@ -43,7 +44,8 @@ For instance,
python flow/a/flow.py run
```

Or, if the flow has external dependencies defined with `@pypi` or `@conda`, leverage Fast Bakery on Kubernetes:
Or, if the flow has external dependencies defined with `@pypi`, `@pypi_base`, `@conda`, or `@conda_base`,
leverage Fast Bakery on Kubernetes:

```
python flow/a/flow.py --environment=fast-bakery run --with kubernetes
Expand Down
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.metaflow_spin/
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.eggs/
*.egg
.venv/
venv/
.env
*.so
.mypy_cache/
.pytest_cache/
.ruff_cache/
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ The project that has the following components:

1. An ETL flow that fetches data from Snowflake
- Refer to `example_data.py` for a sample
- Data is processed in batches of at most 1000 rows
- Only include rows with a valid website url
- Data is processed in batches of at most 100 rows
- Store the IDs of rows that were processed, next time the flow executes, fetch the next batch
- Store the state of processing in an artifact, use Metaflow client to retrieve the state
- Include an option for resetting state
Expand Down
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Agentic Code Example

An Outerbounds project that continuously fetches company data from Snowflake,
enriches it with LLM-generated tags by analyzing company websites, and provides
an interactive UI for exploration.

## Architecture

```
SnowflakeETL (hourly)
├── Fetches batch of 100 companies from Snowflake
├── Tracks processed IDs across runs
├── Registers "companies" data asset
└── Publishes "enrich_companies" event
CompanyEnricher (event-triggered)
├── Scrapes company websites in parallel (10 tasks)
├── Generates 5 tags per company using local LLM
├── Merges with previous enrichment results
└── Registers "enriched-companies" data asset
Company Explorer (deployed UI)
└── Streamlit app to browse companies and tags
```

## Components

| Component | Location | Description |
|-----------|----------|-------------|
| Snowflake ETL | `flows/snowflake-etl/` | Hourly batch ingestion from Snowflake |
| Company Enricher | `flows/company-enricher/` | Website scraping + LLM tagging |
| Company Explorer | `deployments/company-explorer/` | Interactive Streamlit UI |
| Shared Utils | `src/company_utils/` | Snowflake queries, web scraping |

## Assets

- **companies** (`data/companies/`) - Raw company data from Snowflake
- **enriched-companies** (`data/enriched-companies/`) - Companies with LLM tags
- **tag-generator** (`models/tag-generator/`) - SmolLM2-1.7B-Instruct model

## Local Development

```bash
# Run ETL flow
python flows/snowflake-etl/flow.py run

# Run enricher (needs GPU or patience on CPU)
python flows/company-enricher/flow.py --environment=fast-bakery run --with kubernetes

# Reset ETL state
python flows/snowflake-etl/flow.py run --reset yes
```
7 changes: 7 additions & 0 deletions data/companies/asset_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name = "Company Dataset"
id = "companies"
description = "Raw company data fetched from Snowflake in batches"

[properties]
source = "Snowflake free_company_dataset"
batch_size = "100"
7 changes: 7 additions & 0 deletions data/enriched-companies/asset_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name = "Enriched Companies"
id = "enriched-companies"
description = "Companies enriched with LLM-generated tags from website analysis"

[properties]
enrichment = "5 descriptive tags per company from local LLM"
source = "Company websites + LLM inference"
7 changes: 7 additions & 0 deletions deployments/company-explorer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Company Explorer

A Streamlit app for browsing companies and their LLM-generated tags.

- Filter companies by tags or search by name
- View tag distribution across the dataset
- See success/failure status of enrichment
131 changes: 131 additions & 0 deletions deployments/company-explorer/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import streamlit as st
from metaflow import Flow, namespace

st.set_page_config(page_title="Company Explorer", layout="wide")


@st.cache_data(ttl=60)
def load_enriched_companies():
"""Load the latest enriched companies from the CompanyEnricher flow."""
try:
namespace(None)
run = Flow("CompanyEnricher").latest_successful_run
return run.data.enriched_companies
except Exception as e:
st.error(f"Could not load enriched company data: {e}")
return []


def parse_tag(tag):
"""Parse a tag string that may contain multiple numbered tags into individual tags."""
import re
tag = tag.strip()
# Check if this is a numbered list crammed into one string
numbered = re.split(r"\d+[\.\)]\s*", tag)
numbered = [t.strip().rstrip(",").strip() for t in numbered if t.strip()]
if len(numbered) >= 2:
return numbered
return [tag] if tag else []


def get_all_tags(companies):
"""Extract all unique tags across companies."""
tags = set()
for c in companies:
for t in c.get("tags", []):
for parsed in parse_tag(t):
tags.add(parsed)
return sorted(tags)


def main():
st.title("Company Explorer")
st.markdown("Browse companies and their LLM-generated tags.")

companies = load_enriched_companies()

if not companies:
st.warning("No enriched company data available yet. Run the SnowflakeETL and CompanyEnricher flows first.")
return

# Sidebar filters
all_tags = get_all_tags(companies)
tagged_companies = [c for c in companies if c.get("status") == "success"]
failed_companies = [c for c in companies if c.get("status") != "success"]

st.sidebar.header("Filters")
selected_tags = st.sidebar.multiselect("Filter by tags", all_tags)
show_failed = st.sidebar.checkbox("Show failed companies", value=False)
search = st.sidebar.text_input("Search by name")

# Stats
col1, col2, col3 = st.columns(3)
col1.metric("Total Companies", len(companies))
col2.metric("Successfully Tagged", len(tagged_companies))
col3.metric("Unique Tags", len(all_tags))

st.markdown("---")

# Filter companies
display = tagged_companies if not show_failed else companies
if selected_tags:
display = [
c for c in display
if any(
p in selected_tags
for t in c.get("tags", [])
for p in parse_tag(t)
)
]
if search:
display = [
c for c in display if search.lower() in c.get("name", "").lower()
]

st.subheader(f"Showing {len(display)} companies")

# Display as cards in a grid
for i in range(0, len(display), 3):
cols = st.columns(3)
for j, col in enumerate(cols):
idx = i + j
if idx >= len(display):
break
company = display[idx]
with col:
with st.container(border=True):
st.markdown(f"### {company.get('name', 'Unknown')}")
domain = company.get("domain", "")
if domain:
st.markdown(f"[{domain}](https://{domain})")
if company.get("status") == "success":
tags = [
p for t in company.get("tags", []) for p in parse_tag(t)
]
tag_html = " ".join(
f'<span style="background:#e0e7ff;padding:2px 8px;border-radius:12px;margin:2px;display:inline-block;font-size:0.85em">{t}</span>'
for t in tags
)
st.markdown(tag_html, unsafe_allow_html=True)
else:
st.error(f"Status: {company.get('status', 'unknown')}")

# Tag cloud
if all_tags:
st.markdown("---")
st.subheader("All Tags")
tag_counts = {}
for c in tagged_companies:
for t in c.get("tags", []):
for p in parse_tag(t):
tag_counts[p] = tag_counts.get(p, 0) + 1
sorted_tags = sorted(tag_counts.items(), key=lambda x: -x[1])
tag_html = " ".join(
f'<span style="background:#dbeafe;padding:4px 12px;border-radius:16px;margin:4px;display:inline-block;font-size:{min(1.5, 0.8 + count * 0.05):.1f}em">{tag} ({count})</span>'
for tag, count in sorted_tags
)
st.markdown(tag_html, unsafe_allow_html=True)


if __name__ == "__main__":
main()
15 changes: 15 additions & 0 deletions deployments/company-explorer/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: company-explorer
port: 8000
description: Interactive UI for exploring companies and their LLM-generated tags

replicas:
min: 1
max: 1

dependencies:
pypi:
streamlit: ""
outerbounds: ""

commands:
- streamlit run deployments/company-explorer/app.py --server.port 8000
10 changes: 10 additions & 0 deletions flows/company-enricher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Company Enricher

Enriches company data by scraping each company's website and using a local LLM
to generate 5 descriptive tags.

- **Trigger**: Automatically triggered when SnowflakeETL finishes (`@trigger_on_finish`)
- **Parallelism**: Processes companies in parallel using foreach (up to 10 tasks)
- **LLM**: Uses SmolLM2-1.7B-Instruct to generate tags from website content
- **Output**: Merges results with previous runs and registers `enriched-companies` data asset
- **Cards**: Each parallel task shows real-time progress; join step shows summary with sample tags
Loading
Loading