Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The project that has the following components:

1. An ETL flow that fetches data from Snowflake
- Refer to `example_data.py` for a sample
- Data is processed in batches of at most 1000 rows
- Data is processed in batches of at most 100 rows
- Store the IDs of rows that were processed, next time the flow executes, fetch the next batch
- Store the state of processing in an artifact, use Metaflow client to retrieve the state
- Include an option for resetting state
Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Company Enrichment Pipeline

An Outerbounds project that continuously fetches company data from Snowflake,
enriches it with website-derived tags using a local LLM, and provides an
interactive UI for exploring the results.

## Components

### Flows

- **snowflake-etl** - Hourly ETL that fetches company data from Snowflake in
batches of 1000 rows, tracking progress across runs.
- **company-enricher** - Event-triggered flow that fetches company websites,
uses a local LLM to generate descriptive tags, and merges results across runs.

### Deployments

- **company-explorer** - Streamlit dashboard for browsing enriched companies
and filtering by tags.

### Assets

- **Data: `company_batch`** - Latest batch of raw company data from Snowflake.
- **Data: `enriched_companies`** - Accumulated enrichment results with tags.
- **Model: `tag_llm`** - Small local LLM used for tag extraction.

## Running Locally

```bash
# ETL flow
OBPROJECT_SKIP_PYPI_BASE=1 python flows/snowflake-etl/flow.py run

# Enrichment flow (normally triggered by ETL event)
OBPROJECT_SKIP_PYPI_BASE=1 python flows/company-enricher/flow.py run

# Dashboard
streamlit run deployments/company-explorer/app.py
```
4 changes: 4 additions & 0 deletions data/company-batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Company Batch

Raw company data fetched from Snowflake by the Snowflake ETL flow.
Each instance contains up to 100 rows as a pandas DataFrame.
7 changes: 7 additions & 0 deletions data/company-batch/asset_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name = "Company Batch"
id = "company_batch"
description = "Latest batch of raw company data fetched from Snowflake"

[properties]
source = "free_company_dataset.public.freecompanydataset"
batch_size = "100"
5 changes: 5 additions & 0 deletions data/enriched-companies/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Enriched Companies

Accumulated enrichment results produced by the Company Enricher flow.
Each company entry includes the original ID, name, domain, LLM-generated
tags, and whether the website was successfully fetched.
6 changes: 6 additions & 0 deletions data/enriched-companies/asset_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name = "Enriched Companies"
id = "enriched_companies"
description = "Accumulated company data enriched with LLM-generated tags from website content"

[properties]
format = "list of dicts with keys: id, name, domain, tags, website_fetched"
15 changes: 15 additions & 0 deletions deployments/company-explorer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Company Explorer

Streamlit dashboard for browsing and exploring enriched company data.

## Features

- Search companies by name or domain
- Filter by LLM-generated tags
- View tag distribution chart
- Inspect individual company details

## Data Source

Reads the `enriched_companies` data asset produced by the Company Enricher flow,
accessed via the Metaflow client API.
103 changes: 103 additions & 0 deletions deployments/company-explorer/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Streamlit dashboard for exploring enriched company data."""

import os

import streamlit as st
from metaflow import Flow, namespace

st.set_page_config(page_title="Company Explorer", layout="wide")
st.title("Company Explorer")


@st.cache_data(ttl=300)
def load_enriched_data():
"""Load the latest enriched company data from the CompanyEnricher flow."""
namespace(None)
try:
latest = Flow("CompanyEnricher").latest_successful_run
if latest and hasattr(latest.data, "enriched"):
return latest.data.enriched
except Exception:
pass
return []


data = load_enriched_data()

if not data:
st.warning("No enriched company data available yet. Run the pipeline first.")
st.stop()

# Collect all unique tags
all_tags = sorted({tag for row in data for tag in row.get("tags", [])})

# --- Sidebar filters ---
st.sidebar.header("Filters")

search = st.sidebar.text_input("Search companies", "")
selected_tags = st.sidebar.multiselect("Filter by tags", all_tags)

# --- Apply filters ---
filtered = data

if search:
search_lower = search.lower()
filtered = [
r for r in filtered
if search_lower in r.get("name", "").lower()
or search_lower in r.get("domain", "").lower()
]

if selected_tags:
filtered = [
r for r in filtered
if any(tag in r.get("tags", []) for tag in selected_tags)
]

# --- Metrics ---
col1, col2, col3 = st.columns(3)
col1.metric("Total Companies", len(data))
col2.metric("Filtered", len(filtered))
col3.metric("Unique Tags", len(all_tags))

# --- Company table ---
st.subheader(f"Companies ({len(filtered)})")

if filtered:
import pandas as pd

df = pd.DataFrame([
{
"Name": r.get("name", ""),
"Domain": r.get("domain", ""),
"Tags": ", ".join(r.get("tags", [])),
"Website Fetched": r.get("website_fetched", False),
}
for r in filtered
])
st.dataframe(df, use_container_width=True, hide_index=True)
else:
st.info("No companies match the current filters.")

# --- Tag distribution ---
st.subheader("Tag Distribution")

from collections import Counter

tag_counts = Counter(tag for r in data for tag in r.get("tags", []))
if tag_counts:
top_tags = tag_counts.most_common(25)
import pandas as pd

tag_df = pd.DataFrame(top_tags, columns=["Tag", "Count"])
st.bar_chart(tag_df.set_index("Tag"))

# --- Company detail ---
st.subheader("Company Detail")

company_names = [r.get("name", "Unknown") for r in filtered]
if company_names:
selected = st.selectbox("Select a company", company_names)
company = next((r for r in filtered if r.get("name") == selected), None)
if company:
st.json(company)
16 changes: 16 additions & 0 deletions deployments/company-explorer/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: company-explorer
port: 8000
description: Interactive UI for exploring enriched company data and tags

replicas:
min: 1
max: 1

dependencies:
pypi:
streamlit: ""
pandas: ""
outerbounds: ""

commands:
- streamlit run deployments/company-explorer/app.py --server.port 8000
25 changes: 25 additions & 0 deletions flows/company-enricher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Company Enricher

Event-triggered flow that enriches company data with website-derived tags.

## Behavior

- Triggered by the `enrich` event from the Snowflake ETL flow
- Splits the batch into up to 10 parallel chunks for processing
- For each company:
1. Fetches the landing page from the company's domain
2. Uses TinyLlama 1.1B to generate 5 descriptive tags
- Merges new results with previous runs to build a cumulative dataset
- Registers results as the `enriched_companies` data asset

## Resources

Each parallel enrichment task requests 4 CPUs and 16 GB memory
to run the local LLM inference.

## Running

```bash
# Normally triggered by event, but can be run manually:
OBPROJECT_SKIP_PYPI_BASE=1 python flows/company-enricher/flow.py run
```
Loading
Loading