From 3f33c0b95ed75354fd9d86fb74ed65ce57a53aef Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sat, 28 Mar 2026 23:06:48 -0700 Subject: [PATCH 1/9] initial commit --- .gitignore | 15 ++ CLAUDE.md | 2 +- README.md | 57 +++++ data/companies/asset_config.toml | 7 + data/enriched-companies/asset_config.toml | 7 + deployments/company-explorer/README.md | 7 + deployments/company-explorer/app.py | 110 +++++++++ deployments/company-explorer/config.yml | 15 ++ flows/company-enricher/README.md | 10 + flows/company-enricher/flow.py | 276 ++++++++++++++++++++++ flows/snowflake-etl/README.md | 11 + flows/snowflake-etl/flow.py | 95 ++++++++ models/tag-generator/asset_config.toml | 9 + src/company_utils/__init__.py | 4 + src/company_utils/scraper.py | 24 ++ src/company_utils/snowflake.py | 33 +++ 16 files changed, 681 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 data/companies/asset_config.toml create mode 100644 data/enriched-companies/asset_config.toml create mode 100644 deployments/company-explorer/README.md create mode 100644 deployments/company-explorer/app.py create mode 100644 deployments/company-explorer/config.yml create mode 100644 flows/company-enricher/README.md create mode 100644 flows/company-enricher/flow.py create mode 100644 flows/snowflake-etl/README.md create mode 100644 flows/snowflake-etl/flow.py create mode 100644 models/tag-generator/asset_config.toml create mode 100644 src/company_utils/__init__.py create mode 100644 src/company_utils/scraper.py create mode 100644 src/company_utils/snowflake.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f6d976 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +.metaflow_spin/ +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +.eggs/ +*.egg +.venv/ +venv/ +.env +*.so +.mypy_cache/ +.pytest_cache/ +.ruff_cache/ diff --git a/CLAUDE.md b/CLAUDE.md index 40bca50..16cfc62 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,7 +10,7 @@ The project that has the following components: 1. An ETL flow that fetches data from Snowflake - Refer to `example_data.py` for a sample - - Data is processed in batches of at most 1000 rows + - Data is processed in batches of at most 100 rows - Store the IDs of rows that were processed, next time the flow executes, fetch the next batch - Store the state of processing in an artifact, use Metaflow client to retrieve the state - Include an option for resetting state diff --git a/README.md b/README.md new file mode 100644 index 0000000..5360eb0 --- /dev/null +++ b/README.md @@ -0,0 +1,57 @@ +# Agentic Code Example + +An Outerbounds project that continuously fetches company data from Snowflake, +enriches it with LLM-generated tags by analyzing company websites, and provides +an interactive UI for exploration. + +## Architecture + +``` +SnowflakeETL (hourly) + │ + ├── Fetches batch of 100 companies from Snowflake + ├── Tracks processed IDs across runs + ├── Registers "companies" data asset + └── Publishes "enrich_companies" event + │ + ▼ +CompanyEnricher (event-triggered) + │ + ├── Scrapes company websites in parallel (10 tasks) + ├── Generates 5 tags per company using local LLM + ├── Merges with previous enrichment results + └── Registers "enriched-companies" data asset + │ + ▼ +Company Explorer (deployed UI) + │ + └── Streamlit app to browse companies and tags +``` + +## Components + +| Component | Location | Description | +|-----------|----------|-------------| +| Snowflake ETL | `flows/snowflake-etl/` | Hourly batch ingestion from Snowflake | +| Company Enricher | `flows/company-enricher/` | Website scraping + LLM tagging | +| Company Explorer | `deployments/company-explorer/` | Interactive Streamlit UI | +| Shared Utils | `src/company_utils/` | Snowflake queries, web scraping | + +## Assets + +- **companies** (`data/companies/`) - Raw company data from Snowflake +- **enriched-companies** (`data/enriched-companies/`) - Companies with LLM tags +- **tag-generator** (`models/tag-generator/`) - SmolLM2-1.7B-Instruct model + +## Local Development + +```bash +# Run ETL flow +python flows/snowflake-etl/flow.py run + +# Run enricher (needs GPU or patience on CPU) +python flows/company-enricher/flow.py --environment=fast-bakery run --with kubernetes + +# Reset ETL state +python flows/snowflake-etl/flow.py run --reset yes +``` diff --git a/data/companies/asset_config.toml b/data/companies/asset_config.toml new file mode 100644 index 0000000..5fda198 --- /dev/null +++ b/data/companies/asset_config.toml @@ -0,0 +1,7 @@ +name = "Company Dataset" +id = "companies" +description = "Raw company data fetched from Snowflake in batches" + +[properties] +source = "Snowflake free_company_dataset" +batch_size = "100" diff --git a/data/enriched-companies/asset_config.toml b/data/enriched-companies/asset_config.toml new file mode 100644 index 0000000..6793df7 --- /dev/null +++ b/data/enriched-companies/asset_config.toml @@ -0,0 +1,7 @@ +name = "Enriched Companies" +id = "enriched-companies" +description = "Companies enriched with LLM-generated tags from website analysis" + +[properties] +enrichment = "5 descriptive tags per company from local LLM" +source = "Company websites + LLM inference" diff --git a/deployments/company-explorer/README.md b/deployments/company-explorer/README.md new file mode 100644 index 0000000..25ee9b7 --- /dev/null +++ b/deployments/company-explorer/README.md @@ -0,0 +1,7 @@ +# Company Explorer + +A Streamlit app for browsing companies and their LLM-generated tags. + +- Filter companies by tags or search by name +- View tag distribution across the dataset +- See success/failure status of enrichment diff --git a/deployments/company-explorer/app.py b/deployments/company-explorer/app.py new file mode 100644 index 0000000..781c3e4 --- /dev/null +++ b/deployments/company-explorer/app.py @@ -0,0 +1,110 @@ +import streamlit as st +from metaflow import Flow, namespace + +st.set_page_config(page_title="Company Explorer", layout="wide") + + +@st.cache_data(ttl=60) +def load_enriched_companies(): + """Load the latest enriched companies from the CompanyEnricher flow.""" + try: + namespace(None) + run = Flow("CompanyEnricher").latest_successful_run + return run.data.enriched_companies + except Exception as e: + st.error(f"Could not load enriched company data: {e}") + return [] + + +def get_all_tags(companies): + """Extract all unique tags across companies.""" + tags = set() + for c in companies: + for t in c.get("tags", []): + tags.add(t) + return sorted(tags) + + +def main(): + st.title("Company Explorer") + st.markdown("Browse companies and their LLM-generated tags.") + + companies = load_enriched_companies() + + if not companies: + st.warning("No enriched company data available yet. Run the SnowflakeETL and CompanyEnricher flows first.") + return + + # Sidebar filters + all_tags = get_all_tags(companies) + tagged_companies = [c for c in companies if c.get("status") == "success"] + failed_companies = [c for c in companies if c.get("status") != "success"] + + st.sidebar.header("Filters") + selected_tags = st.sidebar.multiselect("Filter by tags", all_tags) + show_failed = st.sidebar.checkbox("Show failed companies", value=False) + search = st.sidebar.text_input("Search by name") + + # Stats + col1, col2, col3 = st.columns(3) + col1.metric("Total Companies", len(companies)) + col2.metric("Successfully Tagged", len(tagged_companies)) + col3.metric("Unique Tags", len(all_tags)) + + st.markdown("---") + + # Filter companies + display = tagged_companies if not show_failed else companies + if selected_tags: + display = [ + c for c in display if any(t in c.get("tags", []) for t in selected_tags) + ] + if search: + display = [ + c for c in display if search.lower() in c.get("name", "").lower() + ] + + st.subheader(f"Showing {len(display)} companies") + + # Display as cards in a grid + for i in range(0, len(display), 3): + cols = st.columns(3) + for j, col in enumerate(cols): + idx = i + j + if idx >= len(display): + break + company = display[idx] + with col: + with st.container(border=True): + st.markdown(f"### {company.get('name', 'Unknown')}") + domain = company.get("domain", "") + if domain: + st.markdown(f"[{domain}](https://{domain})") + if company.get("status") == "success": + tags = company.get("tags", []) + tag_html = " ".join( + f'{t}' + for t in tags + ) + st.markdown(tag_html, unsafe_allow_html=True) + else: + st.error(f"Status: {company.get('status', 'unknown')}") + + # Tag cloud + if all_tags: + st.markdown("---") + st.subheader("All Tags") + tag_counts = {} + for c in tagged_companies: + for t in c.get("tags", []): + tag_counts[t] = tag_counts.get(t, 0) + 1 + sorted_tags = sorted(tag_counts.items(), key=lambda x: -x[1]) + tag_html = " ".join( + f'{tag} ({count})' + for tag, count in sorted_tags + ) + st.markdown(tag_html, unsafe_allow_html=True) + + +if __name__ == "__main__": + main() diff --git a/deployments/company-explorer/config.yml b/deployments/company-explorer/config.yml new file mode 100644 index 0000000..a8aa5e4 --- /dev/null +++ b/deployments/company-explorer/config.yml @@ -0,0 +1,15 @@ +name: company-explorer +port: 8000 +description: Interactive UI for exploring companies and their LLM-generated tags + +replicas: + min: 1 + max: 1 + +dependencies: + pypi: + streamlit: "" + outerbounds: "" + +commands: + - streamlit run deployments/company-explorer/app.py --server.port 8000 diff --git a/flows/company-enricher/README.md b/flows/company-enricher/README.md new file mode 100644 index 0000000..3c3066c --- /dev/null +++ b/flows/company-enricher/README.md @@ -0,0 +1,10 @@ +# Company Enricher + +Enriches company data by scraping each company's website and using a local LLM +to generate 5 descriptive tags. + +- **Trigger**: `enrich_companies` event from SnowflakeETL +- **Parallelism**: Processes companies in parallel using foreach (up to 10 tasks) +- **LLM**: Uses SmolLM2-1.7B-Instruct to generate tags from website content +- **Output**: Merges results with previous runs and registers `enriched-companies` data asset +- **Cards**: Each parallel task shows real-time progress; join step shows summary with sample tags diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py new file mode 100644 index 0000000..453c89d --- /dev/null +++ b/flows/company-enricher/flow.py @@ -0,0 +1,276 @@ +from metaflow import ( + card, + step, + current, + Flow, + resources, + pypi, + retry, +) +from metaflow.cards import Markdown as MD, Table +from obproject import ProjectFlow, project_trigger, highlight + +from company_utils import fetch_landing_page + +MODEL = "HuggingFaceTB/SmolLM2-1.7B-Instruct" + +SYSTEM_PROMPT = ( + "You are a company analyst. Given a company's website content, " + "produce exactly 5 short descriptive tags that capture the company's " + "industry, products, and characteristics. " + "Return ONLY the tags as a comma-separated list, nothing else. " + "Example: enterprise software, cloud computing, B2B, cybersecurity, AI-powered" +) + + +def generate_tags(html_text): + """Use a local LLM to generate 5 descriptive tags from website content.""" + import torch + from transformers import AutoTokenizer, AutoModelForCausalLM + + device = "cuda" if torch.cuda.is_available() else "cpu" + + tokenizer = AutoTokenizer.from_pretrained(MODEL) + model = AutoModelForCausalLM.from_pretrained( + MODEL, torch_dtype=torch.bfloat16 + ).to(device) + + # Truncate HTML to fit context - take first ~2000 chars of visible text + from html.parser import HTMLParser + + class TextExtractor(HTMLParser): + def __init__(self): + super().__init__() + self.parts = [] + self._skip = False + + def handle_starttag(self, tag, attrs): + if tag in ("script", "style", "noscript"): + self._skip = True + + def handle_endtag(self, tag): + if tag in ("script", "style", "noscript"): + self._skip = False + + def handle_data(self, data): + if not self._skip: + text = data.strip() + if text: + self.parts.append(text) + + extractor = TextExtractor() + extractor.feed(html_text) + visible_text = " ".join(extractor.parts)[:2000] + + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + { + "role": "user", + "content": f"Here is the website content:\n\n{visible_text}\n\nProvide 5 descriptive tags:", + }, + ] + + input_text = tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + inputs = tokenizer(input_text, return_tensors="pt").to(device) + + with torch.no_grad(): + outputs = model.generate( + **inputs, max_new_tokens=100, temperature=0.3, do_sample=True + ) + + response = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True) + tags = [t.strip() for t in response.split(",")][:5] + return tags + + +@project_trigger(event="enrich_companies") +class CompanyEnricher(ProjectFlow): + + @card(type="blank") + @step + def start(self): + # Get the latest batch from the ETL flow + companies_data = self.prj.get_data("companies") + self.companies_batch = companies_data + print(f"Received {len(self.companies_batch)} companies to enrich") + + # Split into chunks for parallel processing (10 tasks) + batch = self.companies_batch + n = max(1, len(batch) // 10) + self.chunks = [batch[i : i + n] for i in range(0, len(batch), n)] + # Cap at 10 chunks + if len(self.chunks) > 10: + self.chunks[-2].extend(self.chunks[-1]) + self.chunks = self.chunks[:10] + + self.next(self.enrich, foreach="chunks") + + @resources(cpu=4, memory=16000) + @retry(times=1) + @card(type="blank", id="enrich_progress", refresh_interval=5) + @pypi( + python="3.11.11", + packages={ + "transformers": "4.55.2", + "torch": "2.8.0", + "requests": "2.32.3", + }, + ) + @step + def enrich(self): + chunk = self.input + results = [] + total = len(chunk) + + current.card["enrich_progress"].append( + MD(f"## Processing {total} companies...") + ) + current.card["enrich_progress"].refresh() + + # Consume the model asset for lineage tracking + print(self.prj.asset.consume_model_asset("tag-generator")) + + for i, company in enumerate(chunk): + company_name = company.get("NAME", company.get("name", "Unknown")) + website = company.get("DOMAIN", company.get("domain", "")) + company_id = str(company.get("ID", company.get("id", ""))) + + current.card["enrich_progress"].append( + MD(f"Processing [{i+1}/{total}]: **{company_name}** ({website})") + ) + current.card["enrich_progress"].refresh() + + html = fetch_landing_page(website) + if html: + try: + tags = generate_tags(html) + results.append( + { + "id": company_id, + "name": company_name, + "domain": website, + "tags": tags, + "status": "success", + } + ) + print(f" [{i+1}/{total}] {company_name}: {tags}") + except Exception as e: + results.append( + { + "id": company_id, + "name": company_name, + "domain": website, + "tags": [], + "status": f"llm_error: {e}", + } + ) + print(f" [{i+1}/{total}] {company_name}: LLM error - {e}") + else: + results.append( + { + "id": company_id, + "name": company_name, + "domain": website, + "tags": [], + "status": "fetch_failed", + } + ) + print(f" [{i+1}/{total}] {company_name}: website fetch failed") + + self.chunk_results = results + + # Update card with summary + success = sum(1 for r in results if r["status"] == "success") + current.card["enrich_progress"].append( + MD(f"\n---\n### Done: {success}/{total} successful") + ) + current.card["enrich_progress"].refresh() + + @highlight + @card(type="blank") + @step + def join(self, inputs): + # Merge results from all parallel tasks + all_results = [] + for inp in inputs: + all_results.extend(inp.chunk_results) + + # Fetch previous enrichment results and merge + try: + prev = Flow("CompanyEnricher").latest_successful_run.data.enriched_companies + # Build a dict keyed by company id, new results overwrite old + merged = {r["id"]: r for r in prev} + except Exception: + merged = {} + + for r in all_results: + merged[r["id"]] = r + + self.enriched_companies = list(merged.values()) + + # Register the enriched data as an asset + success_count = sum(1 for r in self.enriched_companies if r["status"] == "success") + self.prj.register_data( + "enriched-companies", + "enriched_companies", + annotations={ + "total_companies": str(len(self.enriched_companies)), + "successfully_tagged": str(success_count), + "batch_new": str(len(all_results)), + }, + ) + + # Summary card + failed = [r for r in all_results if r["status"] != "success"] + current.card.append(MD("## Company Enrichment Summary")) + current.card.append( + MD( + f"- **New companies processed**: {len(all_results)}\n" + f"- **Successfully tagged**: {sum(1 for r in all_results if r['status'] == 'success')}\n" + f"- **Failed**: {len(failed)}\n" + f"- **Total enriched companies**: {len(self.enriched_companies)}" + ) + ) + + # Show sample tags + tagged = [r for r in all_results if r["status"] == "success"][:10] + if tagged: + rows = [[r["name"], r["domain"], ", ".join(r["tags"])] for r in tagged] + current.card.append(MD("### Sample Tags")) + current.card.append( + Table( + headers=["Company", "Domain", "Tags"], + data=rows, + ) + ) + + if failed: + current.card.append(MD("### Failed Companies")) + fail_rows = [[r["name"], r["domain"], r["status"]] for r in failed[:10]] + current.card.append( + Table( + headers=["Company", "Domain", "Error"], + data=fail_rows, + ) + ) + + # Highlight + self.highlight.title = "Company Enricher" + self.highlight.add_column( + big=str(len(all_results)), small="processed" + ) + self.highlight.add_column( + big=str(success_count), small="total tagged" + ) + + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + CompanyEnricher() diff --git a/flows/snowflake-etl/README.md b/flows/snowflake-etl/README.md new file mode 100644 index 0000000..8c7c2ac --- /dev/null +++ b/flows/snowflake-etl/README.md @@ -0,0 +1,11 @@ +# Snowflake ETL + +Fetches company data from Snowflake in batches of 100 rows. Tracks which rows +have been processed across runs using Metaflow artifacts, so each execution +picks up where the last one left off. + +- **Schedule**: Hourly +- **State**: Processed row IDs stored as an artifact, retrieved via Metaflow client +- **Reset**: Pass `--reset yes` to start from scratch +- **Output**: Publishes `enrich_companies` event to trigger the CompanyEnricher flow +- **Asset**: Registers fetched batch as `companies` data asset diff --git a/flows/snowflake-etl/flow.py b/flows/snowflake-etl/flow.py new file mode 100644 index 0000000..eaec3b0 --- /dev/null +++ b/flows/snowflake-etl/flow.py @@ -0,0 +1,95 @@ +from metaflow import ( + card, + step, + current, + Flow, + schedule, + Parameter, + pypi, +) +from metaflow.cards import Markdown as MD +from obproject import ProjectFlow, highlight + +from company_utils import fetch_company_batch + + +@pypi(packages={"snowflake-connector-python": "3.12.3", "pandas": "2.2.3"}) +@schedule(hourly=True) +class SnowflakeETL(ProjectFlow): + + reset = Parameter( + "reset", + default="no", + help="Set to 'yes' to reset processing state and start from scratch", + ) + + @highlight + @card(type="blank") + @step + def start(self): + # Retrieve previously processed IDs from the last successful run + processed_ids = set() + if self.reset == "no": + try: + run = Flow("SnowflakeETL").latest_successful_run + processed_ids = set(run.data.processed_ids) + print(f"Resuming from previous state: {len(processed_ids)} rows already processed") + except Exception: + print("No previous state found - starting from scratch") + else: + print("Reset requested - starting from scratch") + + # Fetch next batch from Snowflake + df = fetch_company_batch(processed_ids if processed_ids else None) + batch_size = len(df) + print(f"Fetched batch of {batch_size} rows from Snowflake") + + if batch_size == 0: + current.card.append(MD("## No new rows to process")) + self.highlight.title = "Snowflake ETL" + self.highlight.add_column(big="0", small="new rows") + self.batch = [] + self.processed_ids = list(processed_ids) + self.next(self.end) + return + + # Convert batch to list of dicts for downstream processing + self.batch = df.to_dict(orient="records") + new_ids = set(str(row.get("ID", row.get("id", ""))) for row in self.batch) + self.processed_ids = list(processed_ids | new_ids) + + # Register the company data as an asset + self.companies = self.batch + self.prj.register_data( + "companies", + "companies", + annotations={"batch_size": str(batch_size), "total_processed": str(len(self.processed_ids))}, + ) + + # Update card + current.card.append(MD(f"## Snowflake ETL Batch")) + current.card.append(MD(f"- **New rows fetched**: {batch_size}")) + current.card.append(MD(f"- **Total rows processed**: {len(self.processed_ids)}")) + if self.batch: + sample = self.batch[0] + cols = list(sample.keys())[:5] + current.card.append(MD(f"- **Sample columns**: {', '.join(cols)}")) + + # Highlight card + self.highlight.title = "Snowflake ETL" + self.highlight.add_column(big=str(batch_size), small="new rows") + self.highlight.add_column(big=str(len(self.processed_ids)), small="total processed") + + # Trigger enrichment + self.prj.safe_publish_event("enrich_companies") + print("Published enrich_companies event") + + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + SnowflakeETL() diff --git a/models/tag-generator/asset_config.toml b/models/tag-generator/asset_config.toml new file mode 100644 index 0000000..1b74c32 --- /dev/null +++ b/models/tag-generator/asset_config.toml @@ -0,0 +1,9 @@ +name = "Tag Generator LLM" +id = "tag-generator" +description = "Small local LLM used to generate descriptive tags for companies" +blobs = ["HuggingFaceTB/SmolLM2-1.7B-Instruct"] + +[properties] +provider = "HuggingFaceTB/SmolLM2-1.7B-Instruct" +task = "text-generation" +output = "5 descriptive tags per company" diff --git a/src/company_utils/__init__.py b/src/company_utils/__init__.py new file mode 100644 index 0000000..70fdfff --- /dev/null +++ b/src/company_utils/__init__.py @@ -0,0 +1,4 @@ +from .snowflake import fetch_company_batch +from .scraper import fetch_landing_page + +METAFLOW_PACKAGE_POLICY = "include" diff --git a/src/company_utils/scraper.py b/src/company_utils/scraper.py new file mode 100644 index 0000000..931cbc7 --- /dev/null +++ b/src/company_utils/scraper.py @@ -0,0 +1,24 @@ +import requests + + +def fetch_landing_page(url, timeout=10): + """ + Fetch the landing page HTML of a company website. + Returns the text content or None on failure. + """ + if not url: + return None + if not url.startswith("http"): + url = "https://" + url + try: + resp = requests.get( + url, + timeout=timeout, + headers={"User-Agent": "Mozilla/5.0 (compatible; CompanyEnricher/1.0)"}, + allow_redirects=True, + ) + resp.raise_for_status() + return resp.text + except Exception as e: + print(f"Failed to fetch {url}: {e}") + return None diff --git a/src/company_utils/snowflake.py b/src/company_utils/snowflake.py new file mode 100644 index 0000000..975313b --- /dev/null +++ b/src/company_utils/snowflake.py @@ -0,0 +1,33 @@ +from metaflow import Snowflake + +QUERY_BATCH = """ +SELECT * +FROM free_company_dataset.public.freecompanydataset +WHERE id NOT IN ({excluded}) +LIMIT 100 +""" + +QUERY_BATCH_NO_EXCLUDE = """ +SELECT * +FROM free_company_dataset.public.freecompanydataset +LIMIT 100 +""" + + +def fetch_company_batch(processed_ids=None): + """ + Fetch a batch of up to 1000 company rows from Snowflake, + skipping any IDs in processed_ids. + + Returns a pandas DataFrame. + """ + with Snowflake(integration="snowflake-test") as cn: + cursor = cn.cursor() + if processed_ids: + placeholders = ",".join(f"'{pid}'" for pid in processed_ids) + query = QUERY_BATCH.format(excluded=placeholders) + else: + query = QUERY_BATCH_NO_EXCLUDE + cursor.execute(query) + df = cursor.fetch_pandas_all() + return df From df08b66ba4d16ceacf758eb68398f5ccef50ec69 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sat, 28 Mar 2026 23:09:55 -0700 Subject: [PATCH 2/9] fix enrichment flow --- flows/company-enricher/flow.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py index 453c89d..3728b58 100644 --- a/flows/company-enricher/flow.py +++ b/flows/company-enricher/flow.py @@ -23,17 +23,22 @@ ) -def generate_tags(html_text): - """Use a local LLM to generate 5 descriptive tags from website content.""" +def load_model(): + """Load the LLM and tokenizer once.""" import torch from transformers import AutoTokenizer, AutoModelForCausalLM device = "cuda" if torch.cuda.is_available() else "cpu" - tokenizer = AutoTokenizer.from_pretrained(MODEL) model = AutoModelForCausalLM.from_pretrained( MODEL, torch_dtype=torch.bfloat16 ).to(device) + return tokenizer, model, device + + +def generate_tags(html_text, tokenizer, model, device): + """Use a local LLM to generate 5 descriptive tags from website content.""" + import torch # Truncate HTML to fit context - take first ~2000 chars of visible text from html.parser import HTMLParser @@ -132,6 +137,9 @@ def enrich(self): # Consume the model asset for lineage tracking print(self.prj.asset.consume_model_asset("tag-generator")) + # Load the model once for all companies in this chunk + tokenizer, model, device = load_model() + for i, company in enumerate(chunk): company_name = company.get("NAME", company.get("name", "Unknown")) website = company.get("DOMAIN", company.get("domain", "")) @@ -145,7 +153,7 @@ def enrich(self): html = fetch_landing_page(website) if html: try: - tags = generate_tags(html) + tags = generate_tags(html, tokenizer, model, device) results.append( { "id": company_id, From afada482f88241605b29c2d6f05049009e4bd193 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sat, 28 Mar 2026 23:53:22 -0700 Subject: [PATCH 3/9] fixes --- .claude/skills/outerbounds/SKILL.md | 3 ++- flows/company-enricher/flow.py | 29 ++++++++++++++++++----------- flows/snowflake-etl/flow.py | 20 ++++++++++++-------- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/.claude/skills/outerbounds/SKILL.md b/.claude/skills/outerbounds/SKILL.md index 6ec5cb1..e4e0d63 100644 --- a/.claude/skills/outerbounds/SKILL.md +++ b/.claude/skills/outerbounds/SKILL.md @@ -43,7 +43,8 @@ For instance, python flow/a/flow.py run ``` -Or, if the flow has external dependencies defined with `@pypi` or `@conda`, leverage Fast Bakery on Kubernetes: +Or, if the flow has external dependencies defined with `@pypi`, `@pypi_base`, `@conda`, or `@conda_base`, +leverage Fast Bakery on Kubernetes: ``` python flow/a/flow.py --environment=fast-bakery run --with kubernetes diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py index 3728b58..2514fac 100644 --- a/flows/company-enricher/flow.py +++ b/flows/company-enricher/flow.py @@ -97,8 +97,8 @@ class CompanyEnricher(ProjectFlow): @step def start(self): # Get the latest batch from the ETL flow - companies_data = self.prj.get_data("companies") - self.companies_batch = companies_data + run = Flow("SnowflakeETL").latest_successful_run + self.companies_batch = run.data.companies print(f"Received {len(self.companies_batch)} companies to enrich") # Split into chunks for parallel processing (10 tasks) @@ -121,6 +121,8 @@ def start(self): "transformers": "4.55.2", "torch": "2.8.0", "requests": "2.32.3", + "pyopenssl": "24.2.1", + "cryptography": "43.0.3", }, ) @step @@ -134,8 +136,11 @@ def enrich(self): ) current.card["enrich_progress"].refresh() - # Consume the model asset for lineage tracking - print(self.prj.asset.consume_model_asset("tag-generator")) + # Consume the model asset for lineage tracking (best-effort) + try: + self.prj.asset.consume_model_asset("tag-generator") + except Exception: + print("Model asset lineage tracking unavailable") # Load the model once for all companies in this chunk tokenizer, model, device = load_model() @@ -196,8 +201,10 @@ def enrich(self): ) current.card["enrich_progress"].refresh() + self.next(self.join) + @highlight - @card(type="blank") + @card(type="blank", id="join_card") @step def join(self, inputs): # Merge results from all parallel tasks @@ -232,8 +239,8 @@ def join(self, inputs): # Summary card failed = [r for r in all_results if r["status"] != "success"] - current.card.append(MD("## Company Enrichment Summary")) - current.card.append( + current.card["join_card"].append(MD("## Company Enrichment Summary")) + current.card["join_card"].append( MD( f"- **New companies processed**: {len(all_results)}\n" f"- **Successfully tagged**: {sum(1 for r in all_results if r['status'] == 'success')}\n" @@ -246,8 +253,8 @@ def join(self, inputs): tagged = [r for r in all_results if r["status"] == "success"][:10] if tagged: rows = [[r["name"], r["domain"], ", ".join(r["tags"])] for r in tagged] - current.card.append(MD("### Sample Tags")) - current.card.append( + current.card["join_card"].append(MD("### Sample Tags")) + current.card["join_card"].append( Table( headers=["Company", "Domain", "Tags"], data=rows, @@ -255,9 +262,9 @@ def join(self, inputs): ) if failed: - current.card.append(MD("### Failed Companies")) + current.card["join_card"].append(MD("### Failed Companies")) fail_rows = [[r["name"], r["domain"], r["status"]] for r in failed[:10]] - current.card.append( + current.card["join_card"].append( Table( headers=["Company", "Domain", "Error"], data=fail_rows, diff --git a/flows/snowflake-etl/flow.py b/flows/snowflake-etl/flow.py index eaec3b0..0e7ba54 100644 --- a/flows/snowflake-etl/flow.py +++ b/flows/snowflake-etl/flow.py @@ -5,7 +5,7 @@ Flow, schedule, Parameter, - pypi, + pypi_base, ) from metaflow.cards import Markdown as MD from obproject import ProjectFlow, highlight @@ -13,7 +13,11 @@ from company_utils import fetch_company_batch -@pypi(packages={"snowflake-connector-python": "3.12.3", "pandas": "2.2.3"}) +@pypi_base(python="3.11.11", packages={ + "snowflake-connector-python[pandas]": "3.12.3", + "pyopenssl": "24.2.1", + "cryptography": "43.0.3", +}) @schedule(hourly=True) class SnowflakeETL(ProjectFlow): @@ -24,7 +28,7 @@ class SnowflakeETL(ProjectFlow): ) @highlight - @card(type="blank") + @card(type="blank", id="etl_card") @step def start(self): # Retrieve previously processed IDs from the last successful run @@ -45,7 +49,7 @@ def start(self): print(f"Fetched batch of {batch_size} rows from Snowflake") if batch_size == 0: - current.card.append(MD("## No new rows to process")) + current.card["etl_card"].append(MD("## No new rows to process")) self.highlight.title = "Snowflake ETL" self.highlight.add_column(big="0", small="new rows") self.batch = [] @@ -67,13 +71,13 @@ def start(self): ) # Update card - current.card.append(MD(f"## Snowflake ETL Batch")) - current.card.append(MD(f"- **New rows fetched**: {batch_size}")) - current.card.append(MD(f"- **Total rows processed**: {len(self.processed_ids)}")) + current.card["etl_card"].append(MD(f"## Snowflake ETL Batch")) + current.card["etl_card"].append(MD(f"- **New rows fetched**: {batch_size}")) + current.card["etl_card"].append(MD(f"- **Total rows processed**: {len(self.processed_ids)}")) if self.batch: sample = self.batch[0] cols = list(sample.keys())[:5] - current.card.append(MD(f"- **Sample columns**: {', '.join(cols)}")) + current.card["etl_card"].append(MD(f"- **Sample columns**: {', '.join(cols)}")) # Highlight card self.highlight.title = "Snowflake ETL" From c61bacd0408600d4a9a7c7fa3518ef7b073c48f8 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sun, 29 Mar 2026 00:06:15 -0700 Subject: [PATCH 4/9] fix issues --- CLAUDE.md | 1 + flows/company-enricher/README.md | 2 +- flows/company-enricher/flow.py | 15 +++++++-------- flows/snowflake-etl/README.md | 2 +- flows/snowflake-etl/flow.py | 6 +----- src/company_utils/snowflake.py | 9 ++++++--- 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 16cfc62..5fd7efc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,6 +10,7 @@ The project that has the following components: 1. An ETL flow that fetches data from Snowflake - Refer to `example_data.py` for a sample + - Only include rows with a valid website url - Data is processed in batches of at most 100 rows - Store the IDs of rows that were processed, next time the flow executes, fetch the next batch - Store the state of processing in an artifact, use Metaflow client to retrieve the state diff --git a/flows/company-enricher/README.md b/flows/company-enricher/README.md index 3c3066c..4e3f01f 100644 --- a/flows/company-enricher/README.md +++ b/flows/company-enricher/README.md @@ -3,7 +3,7 @@ Enriches company data by scraping each company's website and using a local LLM to generate 5 descriptive tags. -- **Trigger**: `enrich_companies` event from SnowflakeETL +- **Trigger**: Automatically triggered when SnowflakeETL finishes (`@trigger_on_finish`) - **Parallelism**: Processes companies in parallel using foreach (up to 10 tasks) - **LLM**: Uses SmolLM2-1.7B-Instruct to generate tags from website content - **Output**: Merges results with previous runs and registers `enriched-companies` data asset diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py index 2514fac..c527be8 100644 --- a/flows/company-enricher/flow.py +++ b/flows/company-enricher/flow.py @@ -2,13 +2,13 @@ card, step, current, - Flow, resources, pypi, retry, ) from metaflow.cards import Markdown as MD, Table -from obproject import ProjectFlow, project_trigger, highlight +from metaflow import trigger_on_finish +from obproject import ProjectFlow, highlight from company_utils import fetch_landing_page @@ -90,15 +90,14 @@ def handle_data(self, data): return tags -@project_trigger(event="enrich_companies") +@trigger_on_finish(flow="SnowflakeETL") class CompanyEnricher(ProjectFlow): @card(type="blank") @step def start(self): - # Get the latest batch from the ETL flow - run = Flow("SnowflakeETL").latest_successful_run - self.companies_batch = run.data.companies + # Get the latest batch from the ETL flow via asset + self.companies_batch = self.prj.get_data("companies") print(f"Received {len(self.companies_batch)} companies to enrich") # Split into chunks for parallel processing (10 tasks) @@ -147,7 +146,7 @@ def enrich(self): for i, company in enumerate(chunk): company_name = company.get("NAME", company.get("name", "Unknown")) - website = company.get("DOMAIN", company.get("domain", "")) + website = company.get("WEBSITE", company.get("website", "")) company_id = str(company.get("ID", company.get("id", ""))) current.card["enrich_progress"].append( @@ -214,7 +213,7 @@ def join(self, inputs): # Fetch previous enrichment results and merge try: - prev = Flow("CompanyEnricher").latest_successful_run.data.enriched_companies + prev = self.prj.get_data("enriched-companies") # Build a dict keyed by company id, new results overwrite old merged = {r["id"]: r for r in prev} except Exception: diff --git a/flows/snowflake-etl/README.md b/flows/snowflake-etl/README.md index 8c7c2ac..d99904c 100644 --- a/flows/snowflake-etl/README.md +++ b/flows/snowflake-etl/README.md @@ -7,5 +7,5 @@ picks up where the last one left off. - **Schedule**: Hourly - **State**: Processed row IDs stored as an artifact, retrieved via Metaflow client - **Reset**: Pass `--reset yes` to start from scratch -- **Output**: Publishes `enrich_companies` event to trigger the CompanyEnricher flow +- **Output**: Triggers CompanyEnricher flow on completion via `@trigger_on_finish` - **Asset**: Registers fetched batch as `companies` data asset diff --git a/flows/snowflake-etl/flow.py b/flows/snowflake-etl/flow.py index 0e7ba54..a12d34e 100644 --- a/flows/snowflake-etl/flow.py +++ b/flows/snowflake-etl/flow.py @@ -35,7 +35,7 @@ def start(self): processed_ids = set() if self.reset == "no": try: - run = Flow("SnowflakeETL").latest_successful_run + run = Flow(current.flow_name).latest_successful_run processed_ids = set(run.data.processed_ids) print(f"Resuming from previous state: {len(processed_ids)} rows already processed") except Exception: @@ -84,10 +84,6 @@ def start(self): self.highlight.add_column(big=str(batch_size), small="new rows") self.highlight.add_column(big=str(len(self.processed_ids)), small="total processed") - # Trigger enrichment - self.prj.safe_publish_event("enrich_companies") - print("Published enrich_companies event") - self.next(self.end) @step diff --git a/src/company_utils/snowflake.py b/src/company_utils/snowflake.py index 975313b..1b77901 100644 --- a/src/company_utils/snowflake.py +++ b/src/company_utils/snowflake.py @@ -3,21 +3,24 @@ QUERY_BATCH = """ SELECT * FROM free_company_dataset.public.freecompanydataset -WHERE id NOT IN ({excluded}) +WHERE WEBSITE IS NOT NULL AND TRIM(WEBSITE) != '' + AND id NOT IN ({excluded}) LIMIT 100 """ QUERY_BATCH_NO_EXCLUDE = """ SELECT * FROM free_company_dataset.public.freecompanydataset +WHERE WEBSITE IS NOT NULL AND TRIM(WEBSITE) != '' LIMIT 100 """ def fetch_company_batch(processed_ids=None): """ - Fetch a batch of up to 1000 company rows from Snowflake, - skipping any IDs in processed_ids. + Fetch a batch of up to 100 company rows from Snowflake, + skipping any IDs in processed_ids. Only includes rows + with a valid website URL (non-empty DOMAIN). Returns a pandas DataFrame. """ From 653f15bc9b7df68d29e99a96247279194915f9d7 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sun, 29 Mar 2026 00:13:03 -0700 Subject: [PATCH 5/9] remove dev branch --- obproject.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/obproject.toml b/obproject.toml index 0641949..f644ac9 100644 --- a/obproject.toml +++ b/obproject.toml @@ -2,6 +2,3 @@ platform = 'dev-yellow.outerbounds.xyz' project = 'ob_agentic_code_example' title = 'Agentic Code Example' - -[dev-assets] -branch = 'dev' From d2d959b9eb5b5a23106e2aad3cfee40f1bd40675 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sun, 29 Mar 2026 00:21:33 -0700 Subject: [PATCH 6/9] improve cards --- .claude/skills/outerbounds/SKILL.md | 5 +++-- flows/company-enricher/flow.py | 19 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/.claude/skills/outerbounds/SKILL.md b/.claude/skills/outerbounds/SKILL.md index e4e0d63..d8c764b 100644 --- a/.claude/skills/outerbounds/SKILL.md +++ b/.claude/skills/outerbounds/SKILL.md @@ -12,11 +12,12 @@ and a description how it works in [starter-project.md](starter-project.md). You must - Include batch, offline workflows under `flows/`, structured as Metaflow flows. - - Preferably include a `@highlight` card in each flow (see `HighlightTester` in the starter project for example) + - Preferably include a `@highlight` card in each flow in the `end` step + (see `HighlightTester` in the starter project for example) - Include online componets under `deployments/` with a proper configuration. - Define data assets under `data/` and model assets under `models/` - Read [project-assets.md](project-assets.md) for instructions how to define assets - - Include a `@card` for steps that consume and produce assets + - Include a `@card` for steps that consume and produce assets (it must be the only `@card` in the step) - Include a descriptive README.md at the top level, for each `deployment`, `flow`, and asset. ## Defining flows diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py index c527be8..5daebae 100644 --- a/flows/company-enricher/flow.py +++ b/flows/company-enricher/flow.py @@ -149,11 +149,6 @@ def enrich(self): website = company.get("WEBSITE", company.get("website", "")) company_id = str(company.get("ID", company.get("id", ""))) - current.card["enrich_progress"].append( - MD(f"Processing [{i+1}/{total}]: **{company_name}** ({website})") - ) - current.card["enrich_progress"].refresh() - html = fetch_landing_page(website) if html: try: @@ -167,6 +162,9 @@ def enrich(self): "status": "success", } ) + current.card["enrich_progress"].append( + MD(f"✅ [{i+1}/{total}] **{company_name}** — {', '.join(tags)}") + ) print(f" [{i+1}/{total}] {company_name}: {tags}") except Exception as e: results.append( @@ -178,6 +176,9 @@ def enrich(self): "status": f"llm_error: {e}", } ) + current.card["enrich_progress"].append( + MD(f"❌ [{i+1}/{total}] **{company_name}** — LLM error: {e}") + ) print(f" [{i+1}/{total}] {company_name}: LLM error - {e}") else: results.append( @@ -189,14 +190,20 @@ def enrich(self): "status": "fetch_failed", } ) + current.card["enrich_progress"].append( + MD(f"❌ [{i+1}/{total}] **{company_name}** — website fetch failed") + ) print(f" [{i+1}/{total}] {company_name}: website fetch failed") + current.card["enrich_progress"].refresh() + self.chunk_results = results # Update card with summary success = sum(1 for r in results if r["status"] == "success") + failed = total - success current.card["enrich_progress"].append( - MD(f"\n---\n### Done: {success}/{total} successful") + MD(f"\n---\n### Done: ✅ {success} succeeded, ❌ {failed} failed out of {total}") ) current.card["enrich_progress"].refresh() From 7087382acb499a01c9f074f75b66a56c9d897c98 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sun, 29 Mar 2026 00:25:33 -0700 Subject: [PATCH 7/9] fix tag handling --- deployments/company-explorer/app.py | 29 +++++++++++++++++++++++++---- flows/company-enricher/flow.py | 21 ++++++++++++++++++++- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/deployments/company-explorer/app.py b/deployments/company-explorer/app.py index 781c3e4..5accd4a 100644 --- a/deployments/company-explorer/app.py +++ b/deployments/company-explorer/app.py @@ -16,12 +16,25 @@ def load_enriched_companies(): return [] +def parse_tag(tag): + """Parse a tag string that may contain multiple numbered tags into individual tags.""" + import re + tag = tag.strip() + # Check if this is a numbered list crammed into one string + numbered = re.split(r"\d+[\.\)]\s*", tag) + numbered = [t.strip().rstrip(",").strip() for t in numbered if t.strip()] + if len(numbered) >= 2: + return numbered + return [tag] if tag else [] + + def get_all_tags(companies): """Extract all unique tags across companies.""" tags = set() for c in companies: for t in c.get("tags", []): - tags.add(t) + for parsed in parse_tag(t): + tags.add(parsed) return sorted(tags) @@ -57,7 +70,12 @@ def main(): display = tagged_companies if not show_failed else companies if selected_tags: display = [ - c for c in display if any(t in c.get("tags", []) for t in selected_tags) + c for c in display + if any( + p in selected_tags + for t in c.get("tags", []) + for p in parse_tag(t) + ) ] if search: display = [ @@ -81,7 +99,9 @@ def main(): if domain: st.markdown(f"[{domain}](https://{domain})") if company.get("status") == "success": - tags = company.get("tags", []) + tags = [ + p for t in company.get("tags", []) for p in parse_tag(t) + ] tag_html = " ".join( f'{t}' for t in tags @@ -97,7 +117,8 @@ def main(): tag_counts = {} for c in tagged_companies: for t in c.get("tags", []): - tag_counts[t] = tag_counts.get(t, 0) + 1 + for p in parse_tag(t): + tag_counts[p] = tag_counts.get(p, 0) + 1 sorted_tags = sorted(tag_counts.items(), key=lambda x: -x[1]) tag_html = " ".join( f'{tag} ({count})' diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py index 5daebae..dbfdf87 100644 --- a/flows/company-enricher/flow.py +++ b/flows/company-enricher/flow.py @@ -86,10 +86,29 @@ def handle_data(self, data): ) response = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True) - tags = [t.strip() for t in response.split(",")][:5] + tags = parse_tags(response) return tags +def parse_tags(response): + """Parse tags from LLM response, handling numbered lists and comma-separated formats.""" + import re + + text = response.strip() + # Try numbered list format: "1. Tag 2. Tag ..." or "1) Tag 2) Tag ..." + numbered = re.split(r"\d+[\.\)]\s*", text) + numbered = [t.strip().rstrip(",").strip() for t in numbered if t.strip()] + if len(numbered) >= 2: + return numbered[:5] + # Try dash/bullet list + lines = [l.strip().lstrip("-•*").strip() for l in text.splitlines() if l.strip()] + if len(lines) >= 2: + return lines[:5] + # Fallback: comma-separated + tags = [t.strip() for t in text.split(",") if t.strip()] + return tags[:5] + + @trigger_on_finish(flow="SnowflakeETL") class CompanyEnricher(ProjectFlow): From 149def4192da5220cd9d4ec9dbfdfc97e4b5ed65 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sun, 29 Mar 2026 00:30:40 -0700 Subject: [PATCH 8/9] move highlight to the end step --- flows/snowflake-etl/flow.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/flows/snowflake-etl/flow.py b/flows/snowflake-etl/flow.py index a12d34e..315d902 100644 --- a/flows/snowflake-etl/flow.py +++ b/flows/snowflake-etl/flow.py @@ -27,7 +27,6 @@ class SnowflakeETL(ProjectFlow): help="Set to 'yes' to reset processing state and start from scratch", ) - @highlight @card(type="blank", id="etl_card") @step def start(self): @@ -50,8 +49,6 @@ def start(self): if batch_size == 0: current.card["etl_card"].append(MD("## No new rows to process")) - self.highlight.title = "Snowflake ETL" - self.highlight.add_column(big="0", small="new rows") self.batch = [] self.processed_ids = list(processed_ids) self.next(self.end) @@ -79,16 +76,15 @@ def start(self): cols = list(sample.keys())[:5] current.card["etl_card"].append(MD(f"- **Sample columns**: {', '.join(cols)}")) - # Highlight card - self.highlight.title = "Snowflake ETL" - self.highlight.add_column(big=str(batch_size), small="new rows") - self.highlight.add_column(big=str(len(self.processed_ids)), small="total processed") - self.next(self.end) + @highlight @step def end(self): - pass + batch_size = len(self.batch) + self.highlight.title = "Snowflake ETL" + self.highlight.add_column(big=str(batch_size), small="new rows") + self.highlight.add_column(big=str(len(self.processed_ids)), small="total processed") if __name__ == "__main__": From 76fa0bc2b33172b4926780e72018a72cded0fb62 Mon Sep 17 00:00:00 2001 From: Ville Tuulos Date: Sun, 29 Mar 2026 00:37:08 -0700 Subject: [PATCH 9/9] rearrange cards --- flows/company-enricher/flow.py | 34 +++++++++++++++++----------------- flows/snowflake-etl/flow.py | 12 ++++++------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py index dbfdf87..c310ac7 100644 --- a/flows/company-enricher/flow.py +++ b/flows/company-enricher/flow.py @@ -228,8 +228,7 @@ def enrich(self): self.next(self.join) - @highlight - @card(type="blank", id="join_card") + @card(type="blank") @step def join(self, inputs): # Merge results from all parallel tasks @@ -264,8 +263,8 @@ def join(self, inputs): # Summary card failed = [r for r in all_results if r["status"] != "success"] - current.card["join_card"].append(MD("## Company Enrichment Summary")) - current.card["join_card"].append( + current.card.append(MD("## Company Enrichment Summary")) + current.card.append( MD( f"- **New companies processed**: {len(all_results)}\n" f"- **Successfully tagged**: {sum(1 for r in all_results if r['status'] == 'success')}\n" @@ -278,8 +277,8 @@ def join(self, inputs): tagged = [r for r in all_results if r["status"] == "success"][:10] if tagged: rows = [[r["name"], r["domain"], ", ".join(r["tags"])] for r in tagged] - current.card["join_card"].append(MD("### Sample Tags")) - current.card["join_card"].append( + current.card.append(MD("### Sample Tags")) + current.card.append( Table( headers=["Company", "Domain", "Tags"], data=rows, @@ -287,29 +286,30 @@ def join(self, inputs): ) if failed: - current.card["join_card"].append(MD("### Failed Companies")) + current.card.append(MD("### Failed Companies")) fail_rows = [[r["name"], r["domain"], r["status"]] for r in failed[:10]] - current.card["join_card"].append( + current.card.append( Table( headers=["Company", "Domain", "Error"], data=fail_rows, ) ) - # Highlight - self.highlight.title = "Company Enricher" - self.highlight.add_column( - big=str(len(all_results)), small="processed" - ) - self.highlight.add_column( - big=str(success_count), small="total tagged" - ) + self.new_count = len(all_results) + self.success_count = success_count self.next(self.end) + @highlight @step def end(self): - pass + self.highlight.title = "Company Enricher" + self.highlight.add_column( + big=str(self.new_count), small="processed" + ) + self.highlight.add_column( + big=str(self.success_count), small="total tagged" + ) if __name__ == "__main__": diff --git a/flows/snowflake-etl/flow.py b/flows/snowflake-etl/flow.py index 315d902..ed4715d 100644 --- a/flows/snowflake-etl/flow.py +++ b/flows/snowflake-etl/flow.py @@ -27,7 +27,7 @@ class SnowflakeETL(ProjectFlow): help="Set to 'yes' to reset processing state and start from scratch", ) - @card(type="blank", id="etl_card") + @card(type="blank") @step def start(self): # Retrieve previously processed IDs from the last successful run @@ -48,7 +48,7 @@ def start(self): print(f"Fetched batch of {batch_size} rows from Snowflake") if batch_size == 0: - current.card["etl_card"].append(MD("## No new rows to process")) + current.card.append(MD("## No new rows to process")) self.batch = [] self.processed_ids = list(processed_ids) self.next(self.end) @@ -68,13 +68,13 @@ def start(self): ) # Update card - current.card["etl_card"].append(MD(f"## Snowflake ETL Batch")) - current.card["etl_card"].append(MD(f"- **New rows fetched**: {batch_size}")) - current.card["etl_card"].append(MD(f"- **Total rows processed**: {len(self.processed_ids)}")) + current.card.append(MD(f"## Snowflake ETL Batch")) + current.card.append(MD(f"- **New rows fetched**: {batch_size}")) + current.card.append(MD(f"- **Total rows processed**: {len(self.processed_ids)}")) if self.batch: sample = self.batch[0] cols = list(sample.keys())[:5] - current.card["etl_card"].append(MD(f"- **Sample columns**: {', '.join(cols)}")) + current.card.append(MD(f"- **Sample columns**: {', '.join(cols)}")) self.next(self.end)