diff --git a/jobs/colin-extract-refresh/Dockerfile b/jobs/colin-extract-refresh/Dockerfile index ada6289ed1..7f70d720d0 100644 --- a/jobs/colin-extract-refresh/Dockerfile +++ b/jobs/colin-extract-refresh/Dockerfile @@ -7,20 +7,14 @@ ENV VCS_REF=${VCS_REF} ENV BUILD_DATE=${BUILD_DATE} ENV PYTHONUNBUFFERED=1 - - - - LABEL org.label-schema.vcs-ref=${VCS_REF} \ org.label-schema.build-date=${BUILD_DATE} - USER root ENV ORACLE_CLIENT_LIB_DIR=/opt/oracle/instantclient_21_4 ENV LD_LIBRARY_PATH=${ORACLE_CLIENT_LIB_DIR} - WORKDIR /opt/oracle/ RUN set -eux; \ apt-get update; \ @@ -54,12 +48,6 @@ RUN set -eux; \ ln -sf "${DBSCHEMA_HOME}/DbSchemaCLI" /usr/local/bin/dbschemacli; \ test -x "${DBSCHEMA_HOME}/DbSchemaCLI"; \ rm -rf /var/lib/apt/lists/* - - - -# Create directories with proper permissions -RUN mkdir -p /opt/app-root && \ - chmod 755 /opt/app-root ENV HOME=/opt/app-root RUN mkdir -p /opt/app-root/.DbSchema/cli /opt/app-root/.DbSchema/drivers/PostgreSql && \ @@ -76,10 +64,15 @@ RUN apt-get update; \ ENV PATH="${DBSCHEMA_HOME}:${PATH}" -USER 1001 +COPY src/ ./colin-exrtact-refresh/src/ +RUN mkdir -p /opt/app-root/colin-extract-refresh/src/subset/generated && \ + chmod -R 777 /opt/app-root/colin-extract-refresh/src/subset/generated /opt/app-root/.DbSchema + +WORKDIR /opt/app-root/colin-extract-refresh/src EXPOSE 8080 COPY src . -CMD [ "python", "test_connectivity.py" ] +ENTRYPOINT ["python", "generate_cprd_subset_extract.py"] +CMD ["--corp-file", "corp.txt", "--mode", "refresh"] diff --git a/jobs/colin-extract-refresh/src/generate_cprd_subset_extract.py b/jobs/colin-extract-refresh/src/generate_cprd_subset_extract.py new file mode 100644 index 0000000000..2286e6cc9e --- /dev/null +++ b/jobs/colin-extract-refresh/src/generate_cprd_subset_extract.py @@ -0,0 +1,1288 @@ +#!/usr/bin/env python3 +""" +Generate DbSchemaCLI scripts to extract a subset of COLIN (Oracle CPRD) corps into a Postgres extract DB. + +Supports: +- refresh: delete + reload a list of corp identifiers in a target Postgres extract DB +- load: load only a list of corp identifiers (useful for empty target DBs) + +Key constraints handled: +- No Oracle temp tables required. +- Oracle IN-list limit (~1000 items) handled by: + - chunk_files: execute the full transfer suite per chunk (legacy / best for very large lists), OR + - or_of_in_lists: execute the transfer suite once using an OR-of-IN-lists predicate (fast for small/medium lists), OR + - auto: choose or_of_in_lists up to a configurable max-id threshold, else chunk_files. +- Templates remain the source of truth. + +Render modes: +- inline (default): parameterization happens at generation time. + The generator renders chunk SQL scripts and produces a small master script that `execute`s each chunk file. +- vset (legacy): uses DbSchemaCLI vset + &placeholders at runtime. Kept as a fallback. + +Outputs (inline mode): +- (master DbSchemaCLI script) +- _chunks/*.sql (chunk scripts) + +Then run: + dbschemacli +""" + +from __future__ import annotations + +import argparse +import re +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Dict, Iterable, List, Sequence + + +# ========================= +# cfg_* (types & config) +# ========================= + +class cfg_GenerationMode(str, Enum): + REFRESH = "refresh" # delete + reload + LOAD = "load" # load only + + +class cfg_RenderMode(str, Enum): + INLINE = "inline" # render templates into chunk files (no vset) + VSET = "vset" # legacy behavior (runtime vset substitution) + + +class cfg_OracleInStrategy(str, Enum): + AUTO = "auto" + CHUNK_FILES = "chunk_files" + OR_OF_IN_LISTS = "or_of_in_lists" + + +class cfg_PgDisableMethod(str, Enum): + TABLE_TRIGGERS = "table_triggers" # ALTER TABLE ... DISABLE/ENABLE TRIGGER ALL (default) + REPLICA_ROLE = "replica_role" # SET session_replication_role=replica|origin (superuser only) + + +@dataclass(frozen=True) +class cfg_GenerationConfig: + repo_root: Path + + corp_file: Path + mode: cfg_GenerationMode + render_mode: cfg_RenderMode + + chunk_size: int + threads: int + prefix_numeric_bc: bool + include_cars: bool + include_cp: bool + + pg_fastload: bool + pg_disable_method: cfg_PgDisableMethod + pg_debug_session_probes: bool + + oracle_in_strategy: cfg_OracleInStrategy + or_of_in_max_ids: int + + out_master: Path + out_chunks_dir: Path + + target_connection: str + target_schema: str + + +# ========================= +# tmpl_* (template specs) +# ========================= + +TMPL_TOKEN_CORP_IDS = "&corp_ids_in" # used by delete template (Postgres-side) +TMPL_TOKEN_TARGET_PRED = "&target_corp_num_predicate" # used by transfer template (Oracle-side) +TMPL_TOKEN_ORACLE_PRED = "&oracle_corp_num_predicate" # used by transfer template (Oracle-side) +TMPL_TOKEN_ORACLE_CORP_TYPE_PRED = "&oracle_corp_type_predicate" # used by transfer template (Oracle-side) +TMPL_TOKEN_SCHEMA = "TARGET_SCHEMA" + +@dataclass(frozen=True) +class tmpl_TemplateSpec: + name: str + path: Path + schema: str + required_tokens: tuple[str, ...] = () + + +@dataclass(frozen=True) +class tmpl_TemplateBundle: + pg_acquire_advisory_lock: tmpl_TemplateSpec + pg_release_advisory_lock: tmpl_TemplateSpec + pg_prepare_address_stage: tmpl_TemplateSpec + pg_cleanup_address_stage: tmpl_TemplateSpec + pg_cleanup_orphan_children: tmpl_TemplateSpec + disable_triggers: tmpl_TemplateSpec + enable_triggers: tmpl_TemplateSpec + pg_boolean_casts: tmpl_TemplateSpec + pg_fastload_begin: tmpl_TemplateSpec + pg_fastload_end: tmpl_TemplateSpec + pg_purge_bcomps_excluded: tmpl_TemplateSpec + delete_chunk: tmpl_TemplateSpec + transfer_chunk: tmpl_TemplateSpec + delete_cars: tmpl_TemplateSpec + transfer_cars: tmpl_TemplateSpec + + +# ========================= +# chunk_* (chunk planning) +# ========================= + +@dataclass(frozen=True) +class chunk_ChunkSpec: + index: int + total: int + target_ids: List[str] + oracle_ids: List[str] + chunk_file: Path + + +# ========================= +# corp_* (corp id parsing/normalization) +# ========================= + +BC_PREFIX_RE = re.compile(r"^BC(\d+)$", re.IGNORECASE) + + +def corp_read_lines(path: Path) -> List[str]: + return path.read_text(encoding="utf-8").splitlines() + + +def corp_normalize_target_ids(lines: Iterable[str], *, prefix_numeric_bc: bool) -> List[str]: + """ + Normalize corp ids for TARGET/Postgres usage: + - strip whitespace + - ignore blank lines and comment lines starting with '#' + - uppercase + - optionally prefix all-numeric ids with 'BC' + - de-dupe while preserving order + """ + out: List[str] = [] + seen: set[str] = set() + + for raw in lines: + line = raw.strip() + if not line or line.startswith("#"): + continue + + corp_id = line.upper() + + if prefix_numeric_bc and corp_id.isdigit(): + corp_id = f"BC{corp_id}" + + if corp_id not in seen: + out.append(corp_id) + seen.add(corp_id) + + return out + + +def corp_to_oracle_ids(target_ids: Sequence[str]) -> List[str]: + """ + Convert TARGET/Postgres corp ids into Oracle corporation.corp_num values. + + For ids like BC0460007 -> 0460007 + Otherwise leave as-is (A1234567 -> A1234567) + + De-dupe while preserving order (avoid wasting Oracle IN-list slots). + """ + out: List[str] = [] + seen: set[str] = set() + + for target_id in target_ids: + m = BC_PREFIX_RE.match(target_id) + oracle_id = m.group(1) if m else target_id + + if oracle_id not in seen: + out.append(oracle_id) + seen.add(oracle_id) + + return out + + +# ========================= +# sql_* (SQL rendering helpers) +# ========================= + +def sql_quote_literal(val: str) -> str: + escaped = val.replace("'", "''") + return f"'{escaped}'" + + +def sql_render_oracle_corp_type_predicate(*, include_cp: bool) -> str: + supported_types = ['BC', 'C', 'ULC', 'CUL', 'CC', 'CCC', 'QA', 'QB', 'QC', 'QD', 'QE'] + if include_cp: + supported_types.append('CP') + return f"c.CORP_TYP_CD in ({sql_render_in_list(supported_types, multiline=False)})" + + +def sql_render_in_list(values: Sequence[str], *, multiline: bool = True, indent: str = " ") -> str: + """ + Render a comma-separated list of SQL string literals, with NO surrounding parentheses. + + Example (multiline=True): + 'A', + 'B', + 'C' + """ + quoted = [sql_quote_literal(v) for v in values] + if not multiline: + return ",".join(quoted) + if not quoted: + return "" + return (",\n" + indent).join(quoted) + + +def sql_render_in_predicate( + column_expr: str, + values: Sequence[str], + *, + max_in_list: int, + multiline: bool = True, + indent: str = " ", +) -> str: + """ + Render a predicate for Oracle IN-list limits by OR-ing multiple IN(...) clauses as needed. + + Returns SQL like: + column_expr in ('a','b') + or: + (column_expr in (...) OR column_expr in (...)) + """ + if max_in_list <= 0: + raise ValueError("max_in_list must be > 0") + if not values: + # This should never happen for normal runs, but keep it valid SQL. + return "1=0" + + def _term(vals: Sequence[str]) -> str: + inner = sql_render_in_list(vals, multiline=multiline, indent=indent) + if not multiline: + return f"{column_expr} in ({inner})" + # indent first element as well + return f"{column_expr} in (\n{indent}{inner}\n)" + + chunks = [list(values[i:i + max_in_list]) for i in range(0, len(values), max_in_list)] + terms = [_term(ch) for ch in chunks] + if len(terms) == 1: + return terms[0] + + if not multiline: + return "(" + " OR ".join(terms) + ")" + + joined = "\nOR\n".join(terms) + return f"(\n{joined}\n)" + + +def sql_render_pg_session_probe(label: str) -> str: + quoted_label = sql_quote_literal(label) + return ( + "SELECT " + f"{quoted_label} AS probe_label,\n" + " pg_backend_pid() AS backend_pid,\n" + " current_user AS db_user,\n" + " current_setting('session_replication_role') AS session_replication_role,\n" + " clock_timestamp() AS observed_at;" + ) + + +# ========================= +# tmpl_* (template loading/validation/rendering) +# ========================= + +def tmpl_default_bundle(repo_root: Path, schema: str) -> tmpl_TemplateBundle: + subset_dir = repo_root / "colin-extract-refresh" / "src" / "subset" + + pg_acquire_advisory_lock = tmpl_TemplateSpec( + name="subset_pg_acquire_advisory_lock", + path=subset_dir / "subset_pg_acquire_advisory_lock.sql", + schema=schema, + ) + pg_release_advisory_lock = tmpl_TemplateSpec( + name="subset_pg_release_advisory_lock", + path=subset_dir / "subset_pg_release_advisory_lock.sql", + schema=schema, + ) + pg_prepare_address_stage = tmpl_TemplateSpec( + name="subset_pg_prepare_address_stage", + path=subset_dir / "subset_pg_prepare_address_stage.sql", + schema=schema, + ) + pg_cleanup_address_stage = tmpl_TemplateSpec( + name="subset_pg_cleanup_address_stage", + path=subset_dir / "subset_pg_cleanup_address_stage.sql", + schema=schema, + ) + pg_cleanup_orphan_children = tmpl_TemplateSpec( + name="subset_pg_cleanup_orphan_children", + path=subset_dir / "subset_pg_cleanup_orphan_children.sql", + schema=schema, + ) + disable_triggers = tmpl_TemplateSpec( + name="subset_disable_triggers", + path=subset_dir / "subset_disable_triggers.sql", + schema=schema, + ) + enable_triggers = tmpl_TemplateSpec( + name="subset_enable_triggers", + path=subset_dir / "subset_enable_triggers.sql", + schema=schema, + ) + pg_boolean_casts = tmpl_TemplateSpec( + name="subset_pg_boolean_casts", + path=subset_dir / "subset_pg_boolean_casts.sql", + schema=schema, + ) + pg_fastload_begin = tmpl_TemplateSpec( + name="subset_pg_fastload_begin", + path=subset_dir / "subset_pg_fastload_begin.sql", + schema=schema, + ) + pg_fastload_end = tmpl_TemplateSpec( + name="subset_pg_fastload_end", + path=subset_dir / "subset_pg_fastload_end.sql", + schema=schema, + ) + pg_purge_bcomps_excluded = tmpl_TemplateSpec( + name="subset_pg_purge_bcomps_excluded", + path=subset_dir / "subset_pg_purge_bcomps_excluded.sql", + schema=schema, + ) + delete_chunk = tmpl_TemplateSpec( + name="subset_delete_chunk", + path=subset_dir / "subset_delete_chunk.sql", + required_tokens=(TMPL_TOKEN_CORP_IDS,), + schema=schema, + ) + transfer_chunk = tmpl_TemplateSpec( + name="subset_transfer_chunk", + path=subset_dir / "subset_transfer_chunk.sql", + required_tokens=(TMPL_TOKEN_TARGET_PRED, TMPL_TOKEN_ORACLE_PRED, TMPL_TOKEN_ORACLE_CORP_TYPE_PRED), + schema=schema, + ) + delete_cars = tmpl_TemplateSpec( + name="subset_delete_cars", + path=subset_dir / "subset_delete_cars.sql", + schema=schema, + ) + transfer_cars = tmpl_TemplateSpec( + name="subset_transfer_cars", + path=subset_dir / "subset_transfer_cars.sql", + schema=schema, + ) + + return tmpl_TemplateBundle( + pg_acquire_advisory_lock=pg_acquire_advisory_lock, + pg_release_advisory_lock=pg_release_advisory_lock, + pg_prepare_address_stage=pg_prepare_address_stage, + pg_cleanup_address_stage=pg_cleanup_address_stage, + pg_cleanup_orphan_children=pg_cleanup_orphan_children, + disable_triggers=disable_triggers, + enable_triggers=enable_triggers, + pg_boolean_casts=pg_boolean_casts, + pg_fastload_begin=pg_fastload_begin, + pg_fastload_end=pg_fastload_end, + pg_purge_bcomps_excluded=pg_purge_bcomps_excluded, + delete_chunk=delete_chunk, + transfer_chunk=transfer_chunk, + delete_cars=delete_cars, + transfer_cars=transfer_cars, + ) + + +def tmpl_load_text(spec: tmpl_TemplateSpec) -> str: + if not spec.path.exists(): + raise SystemExit(f"Missing required template: {spec.name}\nPath: {spec.path}") + text = spec.path.read_text(encoding="utf-8") + tmpl_validate_tokens(spec, text) + return text + +def tmpl_validate_tokens(spec: tmpl_TemplateSpec, template_text: str) -> None: + if not spec.required_tokens: + return + missing = [t for t in spec.required_tokens if t not in template_text] + if missing: + raise SystemExit( + "Template token contract violated.\n" + f"Template: {spec.name}\n" + f"Path: {spec.path}\n" + f"Missing required token(s): {', '.join(missing)}\n" + ) + + +def tmpl_render(template_text: str, *, replacements: Dict[str, str]) -> str: + out = template_text + for token, value in replacements.items(): + out = out.replace(token, value) + return out + +def tmpl_resolve_execute_path(spec: tmpl_TemplateSpec, *, out_dir:Path) -> str: + text = tmpl_load_text(spec) + if TMPL_TOKEN_SCHEMA not in text: + return spec.path + out_dir.mkdir(parents=True, exist_ok=True) + rendered = tmpl_render(text, replacements={TMPL_TOKEN_SCHEMA: spec.schema}) + out_path = out_dir / f"{spec.name}.sql" + gen_write_text(out_path, rendered) + print(f"substitued =>{out_path}") + return out_path + +# ========================= +# chunk_* (chunk planning) +# ========================= + +def chunk_chunked(items: Sequence[str], size: int) -> List[List[str]]: + + if size <= 0: + raise ValueError("chunk size must be > 0") + return [list(items[i:i + size]) for i in range(0, len(items), size)] + + +def chunk_plan_chunks( + target_ids: List[str], + *, + chunk_size: int, + chunks_dir: Path, + file_stem: str = "chunk", +) -> List[chunk_ChunkSpec]: + chunks = chunk_chunked(target_ids, chunk_size) + total = len(chunks) + + out: List[chunk_ChunkSpec] = [] + for idx, chunk_ids in enumerate(chunks, start=1): + oracle_ids = corp_to_oracle_ids(chunk_ids) + chunk_file = chunks_dir / f"{file_stem}_{idx:03d}.sql" + out.append( + chunk_ChunkSpec( + index=idx, + total=total, + target_ids=chunk_ids, + oracle_ids=oracle_ids, + chunk_file=chunk_file, + ) + ) + return out + + +# ========================= +# gen_* (generation) +# ========================= + +def gen_write_text(path: Path, text: str) -> None: + path.write_text(text, encoding="utf-8") + + +def gen_build_chunk_sql( + *, + chunk: chunk_ChunkSpec, + mode: cfg_GenerationMode, + include_delete: bool, + include_transfer: bool, + delete_template_text: str, + transfer_template_text: str, + corp_ids_sql: str, + target_predicate_sql: str, + oracle_predicate_sql: str, + oracle_corp_type_predicate_sql: str, + pg_debug_session_probes: bool, + schema: str, +) -> str: + replacements = { + TMPL_TOKEN_CORP_IDS: corp_ids_sql, + TMPL_TOKEN_TARGET_PRED: target_predicate_sql, + TMPL_TOKEN_ORACLE_PRED: oracle_predicate_sql, + TMPL_TOKEN_ORACLE_CORP_TYPE_PRED: oracle_corp_type_predicate_sql, + TMPL_TOKEN_SCHEMA: schema, + } + + parts: List[str] = [] + parts.append(f"-- generated chunk script: {chunk.chunk_file.name}") + parts.append(f"-- mode: {mode.value}") + parts.append(f"-- chunk: {chunk.index:03d}/{chunk.total:03d}") + parts.append(f"-- target corps: {len(chunk.target_ids)}") + parts.append(f"-- oracle corp_num: {len(chunk.oracle_ids)}") + parts.append("") + + if pg_debug_session_probes: + parts.append("-- Debug probe: backend/session state for this nested execute file.") + parts.append(sql_render_pg_session_probe(f"chunk:{chunk.chunk_file.stem}")) + parts.append("") + + if include_delete and mode == cfg_GenerationMode.REFRESH: + rendered_delete = tmpl_render(delete_template_text, replacements=replacements) + if TMPL_TOKEN_CORP_IDS in rendered_delete: + raise SystemExit( + f"Internal error: token {TMPL_TOKEN_CORP_IDS} remained after rendering delete template " + f"for chunk {chunk.index:03d}." + ) + parts.append(rendered_delete.rstrip()) + parts.append("") + + if include_transfer: + rendered_transfer = tmpl_render(transfer_template_text, replacements=replacements) + if (TMPL_TOKEN_TARGET_PRED in rendered_transfer or + TMPL_TOKEN_ORACLE_PRED in rendered_transfer or + TMPL_TOKEN_ORACLE_CORP_TYPE_PRED in rendered_transfer or + TMPL_TOKEN_SCHEMA in rendered_transfer): + raise SystemExit( + f"Internal error: token(s) remained after rendering transfer template for chunk {chunk.index:03d}." + ) + parts.append(rendered_transfer.rstrip()) + parts.append("") + + return "\n".join(parts) + + +def gen_write_chunk_files( + *, + chunks: Sequence[chunk_ChunkSpec], + mode: cfg_GenerationMode, + include_delete: bool, + include_transfer: bool, + delete_template_text: str, + transfer_template_text: str, + max_in_list: int, + include_cp: bool, + pg_debug_session_probes: bool, + schema: str +) -> List[Path]: + out_paths: List[Path] = [] + oracle_corp_type_predicate_sql = sql_render_oracle_corp_type_predicate(include_cp=include_cp) + for ch in chunks: + corp_ids_sql = sql_render_in_list(ch.target_ids, multiline=True, indent=" ") + target_predicate_sql = sql_render_in_predicate( + "target_corp_num", + ch.target_ids, + max_in_list=max_in_list, + multiline=True, + indent=" ", + ) + oracle_predicate_sql = sql_render_in_predicate( + "c.CORP_NUM", + ch.oracle_ids, + max_in_list=max_in_list, + multiline=True, + indent=" ", + ) + + chunk_sql = gen_build_chunk_sql( + chunk=ch, + mode=mode, + include_delete=include_delete, + include_transfer=include_transfer, + delete_template_text=delete_template_text, + transfer_template_text=transfer_template_text, + corp_ids_sql=corp_ids_sql, + target_predicate_sql=target_predicate_sql, + oracle_predicate_sql=oracle_predicate_sql, + oracle_corp_type_predicate_sql=oracle_corp_type_predicate_sql, + pg_debug_session_probes=pg_debug_session_probes, + schema=schema + ) + gen_write_text(ch.chunk_file, chunk_sql) + out_paths.append(ch.chunk_file) + return out_paths + + +def _gen_emit_pg_disable_begin(lines: List[str], *, cfg: cfg_GenerationConfig, templates: tmpl_TemplateBundle) -> None: + if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: + lines.append(f"execute {tmpl_resolve_execute_path(templates.disable_triggers, out_dir=cfg.out_chunks_dir).as_posix()}") + if cfg.mode == cfg_GenerationMode.REFRESH: + lines.append("-- Refresh-only: preserved processing/tracking tables still reference corporation/event rows.") + lines.append("ALTER TABLE corp_processing DISABLE TRIGGER ALL;") + # lines.append("ALTER TABLE auth_processing DISABLE TRIGGER ALL;") + lines.append("ALTER TABLE colin_tracking DISABLE TRIGGER ALL;") + lines.append("") + return + + if cfg.pg_disable_method == cfg_PgDisableMethod.REPLICA_ROLE: + lines.append("-- Disable triggers / FK checks for this session (requires superuser privileges).") + lines.append("SET session_replication_role = replica;") + lines.append("") + return + + raise SystemExit(f"Unsupported pg_disable_method: {cfg.pg_disable_method}") + + +def _gen_emit_pg_disable_end(lines: List[str], *, cfg: cfg_GenerationConfig, templates: tmpl_TemplateBundle) -> None: + if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: + lines.append(f"execute {tmpl_resolve_execute_path(templates.enable_triggers, out_dir=cfg.out_chunks_dir).as_posix()}") + if cfg.mode == cfg_GenerationMode.REFRESH: + lines.append("-- Refresh-only: restore preserved processing/tracking table triggers too.") + lines.append("ALTER TABLE corp_processing ENABLE TRIGGER ALL;") + # lines.append("ALTER TABLE auth_processing ENABLE TRIGGER ALL;") + lines.append("ALTER TABLE colin_tracking ENABLE TRIGGER ALL;") + lines.append("") + return + + if cfg.pg_disable_method == cfg_PgDisableMethod.REPLICA_ROLE: + lines.append("-- Restore normal trigger behavior for this session.") + lines.append("SET session_replication_role = origin;") + lines.append("") + return + + raise SystemExit(f"Unsupported pg_disable_method: {cfg.pg_disable_method}") + + +def _gen_emit_refresh_fk_note(lines: List[str], *, cfg: cfg_GenerationConfig) -> None: + if cfg.mode != cfg_GenerationMode.REFRESH: + return + + lines.append("-- Refresh mode preserves processing/tracking rows while deleting and reloading corporation/event rows.") + if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: + lines.append("-- table_triggers mode adds refresh-only trigger suppression for the preserved FK-owning tables too.") + else: + lines.append("-- replica_role mode relies on session_replication_role remaining active for nested execute/transfer work.") + lines.append("") + + +def gen_write_pg_debug_copy(*, out_path: Path, source_path: Path, probe_label: str) -> Path: + source_text = source_path.read_text(encoding="utf-8") + debug_lines = [ + f"-- generated debug copy: {out_path.name}", + "-- Debug probe: backend/session state for this nested execute file.", + sql_render_pg_session_probe(probe_label), + "", + source_text.rstrip(), + "", + ] + gen_write_text(out_path, "\n".join(debug_lines)) + return out_path + + +def gen_build_master_script_inline( + *, + cfg: cfg_GenerationConfig, + templates: tmpl_TemplateBundle, + delete_chunk_files: Sequence[Path], + transfer_chunk_files: Sequence[Path], + pg_purge_script_path: Path, +) -> str: + lines: List[str] = [] + lines.append("vset cli.settings.ignore_errors=false") + lines.append("vset cli.settings.replace_variables=false") + lines.append(f"vset cli.settings.transfer_threads={cfg.threads}") + lines.append("vset format.date=YYYY-MM-dd'T'hh:mm:ss'Z'") + lines.append("vset format.timestamp=YYYY-MM-dd'T'hh:mm:ss'Z'") + lines.append("") + lines.append(f"connect {cfg.target_connection};") + lines.append("-- Serialize subset runs on this target DB.") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_acquire_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + lines.append("-- Prepare shared address staging table before learning schema") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_prepare_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append(f"learn schema {cfg.target_schema};") + lines.append("") + lines.append(f"SET search_path TO {cfg.target_schema};") + + lines.append("truncate table colin_extract_version; " + "insert into colin_extract_version (extracted_at) values (current_timestamp); " + ) + lines.append("") + + if cfg.pg_fastload: + lines.append("-- Postgres fast-load mode (session-level settings)") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_begin, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + if cfg.mode == cfg_GenerationMode.REFRESH: + lines.append("-- Cleanup stale orphan child rows before entering the trigger-suppressed refresh window.") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_orphan_children, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + _gen_emit_pg_disable_begin(lines, cfg=cfg, templates=templates) + _gen_emit_refresh_fk_note(lines, cfg=cfg) + if cfg.pg_debug_session_probes: + lines.append("-- Debug probe: backend/session state in the master script after trigger/FK suppression.") + lines.append(sql_render_pg_session_probe("master:after_disable")) + lines.append("") + + if cfg.include_cars: + lines.append("-- global cars* refresh (not corp-scoped; full dataset truncate + reload)") + lines.append(f"execute {tmpl_resolve_execute_path(templates.delete_cars, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_cars, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + if delete_chunk_files: + total = len(delete_chunk_files) + lines.append("-- delete corp-scoped subset (refresh mode)") + for idx, chunk_file in enumerate(delete_chunk_files, start=1): + lines.append(f"-- delete chunk {idx:03d}/{total:03d}") + lines.append(f"execute {chunk_file.as_posix()}") + lines.append("") + if transfer_chunk_files: + total = len(transfer_chunk_files) + lines.append("-- transfer corp-scoped subset from Oracle to Postgres") + for idx, chunk_file in enumerate(transfer_chunk_files, start=1): + lines.append(f"-- transfer chunk {idx:03d}/{total:03d}") + lines.append(f"execute {chunk_file.as_posix()}") + lines.append("") + + lines.append("-- purge BCOMPS-excluded corps (computed in Postgres after load)") + lines.append(f"execute {pg_purge_script_path.as_posix()}") + lines.append("") + + _gen_emit_pg_disable_end(lines, cfg=cfg, templates=templates) + + lines.append("-- Cleanup shared address staging table") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + lines.append("-- Release subset-run advisory lock") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_release_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + if cfg.pg_fastload: + lines.append("-- Reset Postgres fast-load session settings") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_end, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + return "\n".join(lines) + + +def gen_build_master_script_vset( + *, + cfg: cfg_GenerationConfig, + templates: tmpl_TemplateBundle, + corp_ids: List[str], + effective_strategy: cfg_OracleInStrategy, +) -> str: + """ + vset mode: relies on DbSchemaCLI runtime substitution via vset + cli.settings.replace_variables=true. + + Strategies: + - chunk_files: delete+transfer per chunk + - or_of_in_lists: (refresh) delete per chunk, then transfer once using OR-of-IN-lists predicates + (load) transfer once using OR-of-IN-lists predicates + """ + delete_chunks = chunk_chunked(corp_ids, cfg.chunk_size) + oracle_ids_all = corp_to_oracle_ids(corp_ids) + + lines: List[str] = [] + lines.append("vset cli.settings.ignore_errors=false") + lines.append("vset cli.settings.replace_variables=true") + lines.append(f"vset cli.settings.transfer_threads={cfg.threads}") + lines.append("vset format.date=YYYY-MM-dd'T'hh:mm:ss'Z'") + lines.append("vset format.timestamp=YYYY-MM-dd'T'hh:mm:ss'Z'") + lines.append("") + lines.append(f"connect {cfg.target_connection};") + lines.append("-- Serialize subset runs on this target DB.") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_acquire_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + lines.append("-- Prepare shared address staging table before learning schema") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_prepare_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append(f"learn schema {cfg.target_schema};") + lines.append("") + + lines.append("truncate table colin_extract_version; " + "insert into colin_extract_version (extracted_at) values (current_timestamp); " + ) + lines.append("") + + if cfg.pg_fastload: + lines.append("-- Postgres fast-load mode (session-level settings)") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_begin, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + + if cfg.mode == cfg_GenerationMode.REFRESH: + lines.append("-- Cleanup stale orphan child rows before entering the trigger-suppressed refresh window.") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_orphan_children, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + _gen_emit_pg_disable_begin(lines, cfg=cfg, templates=templates) + _gen_emit_refresh_fk_note(lines, cfg=cfg) + if cfg.mode == cfg_GenerationMode.REFRESH: + lines.append("-- Cleanup stale orphan child rows before chunked refresh deletes.") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_orphan_children, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + if cfg.include_cars: + lines.append("-- global cars* refresh (not corp-scoped; full dataset truncate + reload)") + lines.append(f"execute {tmpl_resolve_execute_path(templates.delete_cars, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_cars, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + def _vset_predicates(target_values: Sequence[str], oracle_values: Sequence[str]) -> None: + target_pred = sql_render_in_predicate( + "target_corp_num", + target_values, + max_in_list=cfg.chunk_size, + multiline=False, + indent="", + ) + oracle_pred = sql_render_in_predicate( + "c.CORP_NUM", + oracle_values, + max_in_list=cfg.chunk_size, + multiline=False, + indent="", + ) + oracle_corp_type_pred = sql_render_oracle_corp_type_predicate(include_cp=cfg.include_cp) + lines.append(f"vset target_corp_num_predicate={target_pred}") + lines.append(f"vset oracle_corp_num_predicate={oracle_pred}") + lines.append(f"vset oracle_corp_type_predicate={oracle_corp_type_pred}") + + if cfg.mode == cfg_GenerationMode.REFRESH: + lines.append("-- delete subset (chunked to keep SQL size manageable)") + total_del = len(delete_chunks) + for idx, ch in enumerate(delete_chunks, start=1): + lines.append(f"-- delete chunk {idx:03d}/{total_del:03d} (target corps={len(ch)})") + lines.append(f"vset corp_ids_in={','.join(sql_quote_literal(v) for v in ch)}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.delete_chunk, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + if effective_strategy == cfg_OracleInStrategy.OR_OF_IN_LISTS: + lines.append("-- transfer subset (single pass via OR-of-IN-lists predicate)") + _vset_predicates(corp_ids, oracle_ids_all) + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_chunk, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + else: + lines.append("-- transfer subset (chunked)") + total_tr = len(delete_chunks) + for idx, ch in enumerate(delete_chunks, start=1): + oracle_ids = corp_to_oracle_ids(ch) + lines.append( + f"-- transfer chunk {idx:03d}/{total_tr:03d} (target corps={len(ch)}, oracle corp_num={len(oracle_ids)})" + ) + _vset_predicates(ch, oracle_ids) + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_chunk, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + lines.append("-- purge BCOMPS-excluded corps (computed in Postgres after load)") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_purge_bcomps_excluded, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + _gen_emit_pg_disable_end(lines, cfg=cfg, templates=templates) + + lines.append("-- Cleanup shared address staging table") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + lines.append("-- Release subset-run advisory lock") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_release_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + if cfg.pg_fastload: + lines.append("-- Reset Postgres fast-load session settings") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_end, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append("") + + return "\n".join(lines) + + +# ========================= +# cli_* (CLI + orchestration) +# ========================= + +def cli_parse_args(argv: List[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Generate a chunked DbSchemaCLI subset corp extract script.") + parser.add_argument("--corp-file", required=True, help="Path to a newline-delimited file of corp identifiers.") + parser.add_argument( + "--mode", + choices=[m.value for m in cfg_GenerationMode], + default=cfg_GenerationMode.REFRESH.value, + help="refresh=delete+reload, load=load only (useful for empty target DB).", + ) + parser.add_argument( + "--render-mode", + choices=[m.value for m in cfg_RenderMode], + default=cfg_RenderMode.INLINE.value, + help="inline=render per-chunk scripts at generation time; vset=legacy runtime substitution.", + ) + parser.add_argument( + "--oracle-in-strategy", + choices=[m.value for m in cfg_OracleInStrategy], + default=cfg_OracleInStrategy.AUTO.value, + help="How to handle Oracle's ~1000 item IN(...) limit. " + "auto (default)=use or_of_in_lists only when total ids <= --or-of-in-max-ids (default 10000); otherwise chunk_files. " + "chunk_files=repeat full transfer suite per chunk. " + "or_of_in_lists=single transfer pass with OR-of-IN-lists predicates.", + ) + parser.add_argument( + "--or-of-in-max-ids", + type=int, + default=10000, + help="When --oracle-in-strategy=auto, use or_of_in_lists only if the corp list has <= this many ids (default: 10000). " + "Example: 30000 ids with default 10000 => chunk_files.", + ) + parser.add_argument( + "--chunk-size", + type=int, + default=900, + help="Max items per IN(...) list. Keep <= 1000 (default: 900). Also used as the per-IN list size for OR-of-IN-lists.", + ) + parser.add_argument( + "--threads", + type=int, + default=4, + help="DbSchemaCLI transfer writer threads (default: 4).", + ) + parser.add_argument( + "--out", + default=None, + help="Output path for the generated master DbSchemaCLI script. " + "Default: data-tool/scripts/generated/subset_.sql", + ) + parser.add_argument( + "--prefix-numeric-bc", + action="store_true", + help="If set, any all-numeric corp id lines will be normalized to BC for the TARGET/Postgres corp_num.", + ) + parser.add_argument( + "--no-cars", + dest="include_cars", + action="store_false", + help="Skip global cars* refresh step (carsfile/carsbox/carsrept/carindiv).", + ) + parser.set_defaults(include_cars=True) + parser.add_argument( + "--include-cp", + action="store_true", + help="Include corp type CP in subset extract queries.", + ) + + parser.add_argument( + "--pg-fastload", + dest="pg_fastload", + action="store_true", + help="Enable Postgres session 'fast load' settings (e.g. synchronous_commit=off) for faster bulk writes.", + ) + parser.add_argument( + "--no-pg-fastload", + dest="pg_fastload", + action="store_false", + help="Disable Postgres session 'fast load' settings (default).", + ) + parser.set_defaults(pg_fastload=False) + + parser.add_argument( + "--pg-disable-method", + choices=[m.value for m in cfg_PgDisableMethod], + default=cfg_PgDisableMethod.TABLE_TRIGGERS.value, + help="How to suppress triggers during load. " + "table_triggers=ALTER TABLE ... DISABLE/ENABLE TRIGGER ALL (default). " + "replica_role=SET session_replication_role=replica|origin (requires superuser; disables triggers/FKs for session).", + ) + parser.add_argument( + "--pg-debug-session-probes", + action="store_true", + help="Inline-mode only. Add diagnostic SELECT probes that print pg_backend_pid/current_user/" + "session_replication_role in the master script and nested execute files.", + ) + + parser.add_argument( + "--target-connection", + default="cprd_pg_subset", + help="DbSchemaCLI connection name for the TARGET Postgres extract DB (default: cprd_pg_subset).", + ) + parser.add_argument( + "--target-schema", + default="public", + help="Target schema to `learn` in DbSchemaCLI (default: public).", + ) + + return parser.parse_args(argv) + + +def cfg_resolve_repo_root() -> Path: + # repo root is two levels above this file: data-tool/scripts/ + return Path(__file__).resolve().parents[2] + + +def cfg_build_config(args: argparse.Namespace) -> cfg_GenerationConfig: + repo_root = cfg_resolve_repo_root() + print(repo_root) + + corp_file = Path(args.corp_file).expanduser().resolve() + if not corp_file.exists(): + raise SystemExit(f"corp file not found: {corp_file}") + + mode = cfg_GenerationMode(args.mode) + render_mode = cfg_RenderMode(args.render_mode) + + oracle_in_strategy = cfg_OracleInStrategy(args.oracle_in_strategy) + or_of_in_max_ids = int(args.or_of_in_max_ids) + if or_of_in_max_ids <= 0: + raise SystemExit("--or-of-in-max-ids must be > 0") + + pg_disable_method = cfg_PgDisableMethod(args.pg_disable_method) + + out_master = ( + Path(args.out).expanduser().resolve() + if args.out + else (repo_root / "colin-extract-refresh" / "src" / "subset" / "generated" / f"subset_{mode.value}.sql") + ) + out_master.parent.mkdir(parents=True, exist_ok=True) + + # Chunk scripts dir is always derived from master output stem for determinism. + out_chunks_dir = out_master.parent / f"{out_master.stem}_chunks" + + print(out_master, out_chunks_dir) + + if render_mode == cfg_RenderMode.INLINE: + out_chunks_dir.mkdir(parents=True, exist_ok=True) + + if args.chunk_size <= 0: + raise SystemExit("--chunk-size must be > 0") + if args.threads <= 0: + raise SystemExit("--threads must be > 0") + + return cfg_GenerationConfig( + repo_root=repo_root, + corp_file=corp_file, + mode=mode, + render_mode=render_mode, + chunk_size=int(args.chunk_size), + threads=int(args.threads), + prefix_numeric_bc=bool(args.prefix_numeric_bc), + include_cars=bool(args.include_cars), + include_cp=bool(args.include_cp), + pg_fastload=bool(args.pg_fastload), + pg_disable_method=pg_disable_method, + pg_debug_session_probes=bool(args.pg_debug_session_probes), + oracle_in_strategy=oracle_in_strategy, + or_of_in_max_ids=or_of_in_max_ids, + out_master=out_master, + out_chunks_dir=out_chunks_dir, + target_connection=str(args.target_connection), + target_schema=str(args.target_schema), + ) + + +def _effective_oracle_strategy(cfg: cfg_GenerationConfig, total_ids: int) -> cfg_OracleInStrategy: + if cfg.oracle_in_strategy != cfg_OracleInStrategy.AUTO: + return cfg.oracle_in_strategy + return cfg_OracleInStrategy.OR_OF_IN_LISTS if total_ids <= cfg.or_of_in_max_ids else cfg_OracleInStrategy.CHUNK_FILES + + +def run(cfg: cfg_GenerationConfig) -> int: + templates = tmpl_default_bundle(cfg.repo_root, cfg.target_schema) + + if cfg.pg_debug_session_probes and cfg.render_mode != cfg_RenderMode.INLINE: + raise SystemExit("--pg-debug-session-probes currently supports only --render-mode inline.") + + # Load/validate templates we depend on (fail fast if template contract changes). + delete_template_text = tmpl_load_text(templates.delete_chunk) + transfer_template_text = tmpl_load_text(templates.transfer_chunk) + # Ensure the execute-only templates exist too (even though we don't render them). + for spec in ( + templates.pg_acquire_advisory_lock, + templates.pg_release_advisory_lock, + templates.pg_prepare_address_stage, + templates.pg_cleanup_address_stage, + templates.pg_cleanup_orphan_children, + templates.disable_triggers, + templates.enable_triggers, + templates.pg_boolean_casts, + templates.pg_fastload_begin, + templates.pg_fastload_end, + templates.pg_purge_bcomps_excluded, + templates.delete_cars, + templates.transfer_cars, + ): + if not spec.path.exists(): + raise SystemExit(f"Missing required template: {spec.name}\nPath: {spec.path}") + + # Parse & normalize corp ids. + target_ids = corp_normalize_target_ids( + corp_read_lines(cfg.corp_file), + prefix_numeric_bc=cfg.prefix_numeric_bc, + ) + if not target_ids: + raise SystemExit("No corp ids found after parsing (file empty or only comments).") + + effective_strategy = _effective_oracle_strategy(cfg, total_ids=len(target_ids)) + + if cfg.render_mode == cfg_RenderMode.INLINE: + delete_files: List[Path] = [] + transfer_files: List[Path] = [] + + if effective_strategy == cfg_OracleInStrategy.CHUNK_FILES: + chunks = chunk_plan_chunks(target_ids, chunk_size=cfg.chunk_size, chunks_dir=cfg.out_chunks_dir, file_stem="chunk") + combined_files = gen_write_chunk_files( + chunks=chunks, + mode=cfg.mode, + include_delete=(cfg.mode == cfg_GenerationMode.REFRESH), + include_transfer=True, + delete_template_text=delete_template_text, + transfer_template_text=transfer_template_text, + max_in_list=cfg.chunk_size, + include_cp=cfg.include_cp, + pg_debug_session_probes=cfg.pg_debug_session_probes, + schema=cfg.target_schema, + ) + transfer_files = combined_files + + else: + # OR-of-IN-lists: transfer once using predicates; keep deletes chunked in refresh mode. + if cfg.mode == cfg_GenerationMode.REFRESH: + del_chunks = chunk_plan_chunks(target_ids, chunk_size=cfg.chunk_size, chunks_dir=cfg.out_chunks_dir, file_stem="delete_chunk") + delete_files = gen_write_chunk_files( + chunks=del_chunks, + mode=cfg.mode, + include_delete=True, + include_transfer=False, + delete_template_text=delete_template_text, + transfer_template_text=transfer_template_text, + max_in_list=cfg.chunk_size, + include_cp=cfg.include_cp, + pg_debug_session_probes=cfg.pg_debug_session_probes, + schema=cfg.target_schema, + ) + + oracle_ids_all = corp_to_oracle_ids(target_ids) + transfer_chunk = chunk_ChunkSpec( + index=1, + total=1, + target_ids=target_ids, + oracle_ids=oracle_ids_all, + chunk_file=cfg.out_chunks_dir / "transfer_all.sql", + ) + transfer_files = gen_write_chunk_files( + chunks=[transfer_chunk], + mode=cfg.mode, + include_delete=False, + include_transfer=True, + delete_template_text=delete_template_text, + transfer_template_text=transfer_template_text, + max_in_list=cfg.chunk_size, + include_cp=cfg.include_cp, + pg_debug_session_probes=cfg.pg_debug_session_probes, + schema=cfg.target_schema, + ) + + pg_purge_script_path = tmpl_resolve_execute_path(templates.pg_purge_bcomps_excluded, out_dir=cfg.out_chunks_dir) + if cfg.pg_debug_session_probes: + pg_purge_script_path = gen_write_pg_debug_copy( + out_path=cfg.out_chunks_dir / "pg_purge_bcomps_excluded_debug.sql", + source_path=pg_purge_script_path, + probe_label="execute:pg_purge_bcomps_excluded", + ) + + master_text = gen_build_master_script_inline( + cfg=cfg, + templates=templates, + delete_chunk_files=delete_files, + transfer_chunk_files=transfer_files, + pg_purge_script_path=pg_purge_script_path, + ) + gen_write_text(cfg.out_master, master_text) + + n_ids = len(target_ids) + in_groups = (n_ids + cfg.chunk_size - 1) // cfg.chunk_size + + print(f"Wrote master script: {cfg.out_master}") + if cfg.out_chunks_dir.exists(): + print(f"Wrote chunk scripts: {cfg.out_chunks_dir}") + print("") + print("Run:") + print(f" dbschemacli {cfg.out_master}") + print("") + print("Notes:") + print(" - Corp ids in the file should match the TARGET Postgres extract corp_num format (e.g. BC0460007).") + print(" - If you have numeric-only corp ids, consider --prefix-numeric-bc.") + print(f" - corp ids: {n_ids} => ceil({n_ids}/{cfg.chunk_size}) = {in_groups} chunk(s)") + print(f" - Oracle IN-list handling: {effective_strategy.value} (configured: {cfg.oracle_in_strategy.value})") + print(f" - chunk-size (max items per IN list): {cfg.chunk_size}") + if effective_strategy == cfg_OracleInStrategy.CHUNK_FILES: + print(f" - transfer suite executions: {in_groups} (one per chunk file)") + else: + print(" - transfer suite executions: 1 (single pass via OR-of-IN-lists)") + print(f" - OR-of-IN-lists groups: {in_groups} (each IN list <= {cfg.chunk_size} ids)") + if cfg.mode == cfg_GenerationMode.REFRESH: + print(f" - delete chunks (refresh mode): {in_groups} (always chunked)") + if cfg.oracle_in_strategy == cfg_OracleInStrategy.AUTO: + print(f" - auto threshold (--or-of-in-max-ids): {cfg.or_of_in_max_ids}") + if cfg.include_cars: + print(" - cars* tables will be globally refreshed (truncate + reload from Oracle).") + else: + print(" - cars* tables will NOT be refreshed (--no-cars was set).") + print(f" - CP corp type inclusion: {'ENABLED' if cfg.include_cp else 'disabled'} (--include-cp)") + print(f" - Postgres fast-load session settings: {'ENABLED' if cfg.pg_fastload else 'disabled'} (--pg-fastload)") + print(f" - Postgres trigger suppression: {cfg.pg_disable_method.value} (--pg-disable-method)") + print(" - subset runs acquire a session-level advisory lock on the target DB to prevent overlap.") + print(" - Address loads use the predeclared helper table public.subset_address_stage and merge into public.address by addr_id.") + print(" - BCOMPS purge keysets also use predeclared helper tables in the extract schema (subset_excluded_*).") + print(" - subset runs should not overlap on the same target DB, and the runtime role must be able to truncate/read/write those helper tables.") + print(f"schema passed down as : {cfg.target_schema}") + if cfg.pg_debug_session_probes: + print(" - Postgres session probes: ENABLED (--pg-debug-session-probes)") + print(" - master + nested execute files will print pg_backend_pid/current_user/session_replication_role.") + print(" - probes confirm master/nested execute session context; DbSchemaCLI transfer writer sessions may still differ.") + if cfg.mode == cfg_GenerationMode.REFRESH: + print(" - refresh preserves processing/tracking rows while deleting/reloading corporation/event rows.") + print(" - refresh also pre-cleans orphan event/corp-party child rows before entering the trigger-suppressed window.") + if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: + print(" - table_triggers mode adds refresh-only trigger suppression for the preserved FK-owning tables.") + print(" - table_triggers changes table state globally while the refresh runs; use it against a quiesced extract DB with sufficient privileges.") + else: + print(" - replica_role is session-local; if FK errors still occur, probe current_setting('session_replication_role') inside nested execute files.") + if cfg.pg_disable_method == cfg_PgDisableMethod.REPLICA_ROLE: + print(" - NOTE: replica_role requires superuser and disables triggers/FKs for the session.") + return 0 + + # vset mode: generate only a master script (no chunk files) + master_text = gen_build_master_script_vset( + cfg=cfg, + templates=templates, + corp_ids=target_ids, + effective_strategy=effective_strategy, + ) + gen_write_text(cfg.out_master, master_text) + + n_ids = len(target_ids) + in_groups = (n_ids + cfg.chunk_size - 1) // cfg.chunk_size + + print(f"Wrote (vset mode): {cfg.out_master}") + print("") + print("Run:") + print(f" dbschemacli {cfg.out_master}") + print("") + print("Notes:") + print(" - This script relies on DbSchemaCLI vset variables and runtime substitution.") + print(f" - corp ids: {n_ids} => ceil({n_ids}/{cfg.chunk_size}) = {in_groups} chunk(s)") + print(f" - Oracle IN-list handling: {effective_strategy.value} (configured: {cfg.oracle_in_strategy.value})") + print(f" - chunk-size (max items per IN list): {cfg.chunk_size}") + if effective_strategy == cfg_OracleInStrategy.CHUNK_FILES: + print(f" - transfer suite executions: {in_groups} (looped in SQL via vset/while)") + else: + print(" - transfer suite executions: 1 (single pass via OR-of-IN-lists)") + print(f" - OR-of-IN-lists groups: {in_groups} (each IN list <= {cfg.chunk_size} ids)") + if cfg.mode == cfg_GenerationMode.REFRESH: + print(f" - delete chunks (refresh mode): {in_groups} (looped in SQL)") + if cfg.oracle_in_strategy == cfg_OracleInStrategy.AUTO: + print(f" - auto threshold (--or-of-in-max-ids): {cfg.or_of_in_max_ids}") + print(" - Prefer --render-mode inline for faster runs (inline generates static SQL per chunk).") + print(f" - Postgres fast-load session settings: {'ENABLED' if cfg.pg_fastload else 'disabled'} (--pg-fastload)") + print(f" - Postgres trigger suppression: {cfg.pg_disable_method.value} (--pg-disable-method)") + print(" - subset runs acquire a session-level advisory lock on the target DB to prevent overlap.") + print(" - Address loads use the predeclared helper table public.subset_address_stage and merge into public.address by addr_id.") + print(" - BCOMPS purge keysets also use predeclared helper tables in the extract schema (subset_excluded_*).") + print(" - subset runs should not overlap on the same target DB, and the runtime role must be able to truncate/read/write those helper tables.") + if cfg.pg_debug_session_probes: + print(" - Postgres session probes: ENABLED (--pg-debug-session-probes)") + print(" - master + nested execute files will print pg_backend_pid/current_user/session_replication_role.") + print(" - probes confirm master/nested execute session context; DbSchemaCLI transfer writer sessions may still differ.") + if cfg.mode == cfg_GenerationMode.REFRESH: + print(" - refresh preserves processing/tracking rows while deleting/reloading corporation/event rows.") + print(" - refresh also pre-cleans orphan event/corp-party child rows that can survive parent-keyed deletes.") + if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: + print(" - table_triggers mode adds refresh-only trigger suppression for the preserved FK-owning tables.") + print(" - table_triggers changes table state globally while the refresh runs; use it against a quiesced extract DB with sufficient privileges.") + else: + print(" - replica_role is session-local; if FK errors still occur, probe current_setting('session_replication_role') inside nested execute files.") + if cfg.pg_disable_method == cfg_PgDisableMethod.REPLICA_ROLE: + print(" - NOTE: replica_role requires superuser and disables triggers/FKs for the session.") + return 0 + + +def main() -> int: + args = cli_parse_args() + cfg = cfg_build_config(args) + return run(cfg) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/jobs/colin-extract-refresh/src/subset/.gitignore b/jobs/colin-extract-refresh/src/subset/.gitignore new file mode 100644 index 0000000000..69766d9aeb --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/.gitignore @@ -0,0 +1,3 @@ +# Local-only generated DbSchemaCLI scripts and corp-id lists. +# Keep this folder in git, but not the generated artifacts. +*.txt diff --git a/jobs/colin-extract-refresh/src/subset/subset_delete_cars.sql b/jobs/colin-extract-refresh/src/subset/subset_delete_cars.sql new file mode 100644 index 0000000000..1fe5a82b4a --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_delete_cars.sql @@ -0,0 +1,11 @@ +-- Global delete/clear for cars* tables (subset refresh/load). +-- Intended to be executed while connected to target Postgres extract DB (cprd_pg). +-- +-- These tables are NOT corp-scoped, so we truncate the entire dataset and reload from Oracle. +-- Volume is low enough that a full refresh is appropriate. +SET search_path TO TARGET_SCHEMA; + +TRUNCATE TABLE TARGET_SCHEMA.carindiv; +TRUNCATE TABLE TARGET_SCHEMA.carsrept; +TRUNCATE TABLE TARGET_SCHEMA.carsbox; +TRUNCATE TABLE TARGET_SCHEMA.carsfile; diff --git a/jobs/colin-extract-refresh/src/subset/subset_delete_chunk.sql b/jobs/colin-extract-refresh/src/subset/subset_delete_chunk.sql new file mode 100644 index 0000000000..d5336e241c --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_delete_chunk.sql @@ -0,0 +1,125 @@ +-- Delete a chunk of corps from the TARGET Postgres extract DB. +-- +-- REQUIRED DbSchemaCLI variables (replace_variables=true): +-- corp_ids_in : comma-separated SQL string literals for target corp_num values (NO parentheses), +-- e.g. 'BC0460007','A1234567' +-- +-- Intended to be executed from a master DbSchemaCLI script connected to the target Postgres DB (cprd_pg). +-- +-- Note: This script intentionally does NOT delete internal migration/processing tables (mig_*, corp_processing, +-- colin_tracking, affiliation_processing, etc). It only deletes the corp-scoped COLIN extract tables that are +-- reloaded from Oracle. +-- IMPORTANT: +-- - Because preserved processing/tracking tables still reference corporation/event rows, refresh mode must keep +-- FK enforcement suppressed across this delete/reload window (for example via replica_role, or by disabling +-- triggers on the preserved referencing tables too). + +-- Address rows are treated as shared/global during subset refresh. +-- Do not delete them here: subset_transfer_chunk.sql stages incoming Oracle address rows and +-- merges them into public.address by addr_id. + +-- Delete child tables first (event-scoped). +SET search_path TO TARGET_SCHEMA; + +DELETE FROM TARGET_SCHEMA.notification_resend +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.notification +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.filing_user +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.payment +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.ledger_text +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.conv_ledger +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.conv_event +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.completing_party +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.submitting_party +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.corp_involved_amalgamating +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.corp_involved_cont_in +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.correction +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.filing +WHERE event_id IN (SELECT event_id FROM event WHERE corp_num IN (&corp_ids_in)); + +-- Delete corp-party related tables. +DELETE FROM TARGET_SCHEMA.party_notification +WHERE party_id IN (SELECT corp_party_id FROM corp_party WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.offices_held +WHERE corp_party_id IN (SELECT corp_party_id FROM corp_party WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.corp_party_relationship +WHERE corp_party_id IN (SELECT corp_party_id FROM corp_party WHERE corp_num IN (&corp_ids_in)); + +DELETE FROM TARGET_SCHEMA.corp_party +WHERE corp_num IN (&corp_ids_in); + +-- Delete corp-scoped tables. +DELETE FROM TARGET_SCHEMA.office +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.corp_name +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.corp_state +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.corp_comments +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.corp_flag +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.cont_out +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.corp_restriction +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.jurisdiction +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.resolution +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.share_series +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.share_struct_cls +WHERE corp_num IN (&corp_ids_in); + +DELETE FROM TARGET_SCHEMA.share_struct +WHERE corp_num IN (&corp_ids_in); + +-- Delete events last (many tables reference event_id). +DELETE FROM TARGET_SCHEMA.event +WHERE corp_num IN (&corp_ids_in); + + +DELETE FROM TARGET_SCHEMA.auth_processing +WHERE corp_num IN (&corp_ids_in); + +-- Delete the corp rows last. +DELETE FROM TARGET_SCHEMA.corporation +WHERE corp_num IN (&corp_ids_in); + +-- Address rows are refreshed via stage+merge in subset_transfer_chunk.sql. diff --git a/jobs/colin-extract-refresh/src/subset/subset_disable_triggers.sql b/jobs/colin-extract-refresh/src/subset/subset_disable_triggers.sql new file mode 100644 index 0000000000..631d8e0015 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_disable_triggers.sql @@ -0,0 +1,24 @@ +SET search_path TO TARGET_SCHEMA; + +ALTER TABLE TARGET_SCHEMA.notification + DROP CONSTRAINT fk_notification_filing ; +ALTER TABLE TARGET_SCHEMA.office + DROP CONSTRAINT fk_office_mailing_address ; +ALTER TABLE TARGET_SCHEMA.office +DROP CONSTRAINT fk_office_delivery_address ; +ALTER TABLE TARGET_SCHEMA.corp_party + DROP CONSTRAINT fk_corp_party_mailing_address ; +ALTER TABLE TARGET_SCHEMA.completing_party + DROP CONSTRAINT fk_completing_party_address ; +ALTER TABLE TARGET_SCHEMA.office + DROP CONSTRAINT fk_corp_party_delivery_address ; +ALTER TABLE TARGET_SCHEMA.notification + DROP CONSTRAINT fk_notification_address ; +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN arrangement_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.corporation ALTER COLUMN send_ar_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_series ALTER COLUMN max_share_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN max_share_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN court_appr_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN spec_rights_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN par_value_ind TYPE CHARACTER VARYING(255); + diff --git a/jobs/colin-extract-refresh/src/subset/subset_enable_triggers.sql b/jobs/colin-extract-refresh/src/subset/subset_enable_triggers.sql new file mode 100644 index 0000000000..0e627bb954 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_enable_triggers.sql @@ -0,0 +1,18 @@ +SET search_path TO TARGET_SCHEMA; + +ALTER TABLE TARGET_SCHEMA.notification ADD CONSTRAINT fk_notification_filing FOREIGN KEY (event_id) REFERENCES TARGET_SCHEMA.filing (event_id); +ALTER TABLE TARGET_SCHEMA.office ADD CONSTRAINT fk_office_mailing_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.office ADD CONSTRAINT fk_office_delivery_address FOREIGN KEY (delivery_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.office ADD CONSTRAINT fk_corp_party_delivery_address FOREIGN KEY (delivery_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.corp_party ADD CONSTRAINT fk_corp_party_mailing_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.completing_party ADD CONSTRAINT fk_completing_party_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.notification ADD CONSTRAINT fk_notification_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); + + +ALTER TABLE TARGET_SCHEMA.corporation ALTER COLUMN send_ar_ind TYPE boolean USING (CASE send_ar_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN arrangement_ind TYPE boolean USING (CASE arrangement_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN court_appr_ind TYPE boolean USING (CASE court_appr_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_series ALTER COLUMN max_share_ind TYPE boolean USING (CASE max_share_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN max_share_ind TYPE boolean USING (CASE max_share_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN spec_rights_ind TYPE boolean USING (CASE spec_rights_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN par_value_ind TYPE boolean USING (CASE par_value_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_acquire_advisory_lock.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_acquire_advisory_lock.sql new file mode 100644 index 0000000000..5434c6999a --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_acquire_advisory_lock.sql @@ -0,0 +1,8 @@ +-- Acquire a session-level advisory lock so subset load/refresh runs do not overlap on the same target DB. +-- Advisory locks are scoped to the current Postgres database, so the same keys are safe across separate DBs. +SET search_path TO TARGET_SCHEMA; + +SELECT pg_advisory_lock( + hashtext('lear:data-tool:colin_subset_extract'), + hashtext('subset_run') +); diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_boolean_casts.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_boolean_casts.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_cleanup_address_stage.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_cleanup_address_stage.sql new file mode 100644 index 0000000000..a702de8e9c --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_cleanup_address_stage.sql @@ -0,0 +1,2 @@ +-- Cleanup the shared address staging table used by subset_transfer_chunk.sql. +-- No-op: the helper table is predeclared in the COLIN extract DDL and is truncated during prepare/chunk execution. diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_cleanup_orphan_children.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_cleanup_orphan_children.sql new file mode 100644 index 0000000000..d93a1f4b26 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_cleanup_orphan_children.sql @@ -0,0 +1,64 @@ +-- Cleanup stale child rows that no longer have the parent rows used by refresh-mode deletes. +-- +-- Why this exists: +-- - refresh-mode chunk deletes remove event-scoped rows by first looking up event_id in target `event` +-- - and remove corp-party child rows by first looking up corp_party_id in target `corp_party` +-- - so a prior failed/interleaved run can leave stale child rows behind when the parent row is missing +-- - those orphans can then collide with the next reload (for example, unique `filing.event_id`) +-- +-- This cleanup is intentionally narrow: +-- - only rows whose normal refresh delete path traverses a parent lookup are removed here +-- - corp-scoped rows deleted directly by corp_num are left to the regular chunk deletes + +-- Event-scoped children whose parent event row is missing. +SET search_path TO TARGET_SCHEMA; + +DELETE FROM TARGET_SCHEMA.notification_resend t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.notification t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.filing_user t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.payment t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.ledger_text t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.conv_ledger t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.conv_event t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.completing_party t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.submitting_party t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.corp_involved_amalgamating t +WHERE t.event_id IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.corp_involved_cont_in t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.correction t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +DELETE FROM TARGET_SCHEMA.filing t +WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + +-- Corp-party children whose parent corp_party row is missing. +DELETE FROM TARGET_SCHEMA.party_notification t +WHERE NOT EXISTS (SELECT 1 FROM corp_party cp WHERE cp.corp_party_id = t.party_id); + +DELETE FROM TARGET_SCHEMA.offices_held t +WHERE NOT EXISTS (SELECT 1 FROM corp_party cp WHERE cp.corp_party_id = t.corp_party_id); + +DELETE FROM TARGET_SCHEMA.corp_party_relationship t +WHERE NOT EXISTS (SELECT 1 FROM corp_party cp WHERE cp.corp_party_id = t.corp_party_id); diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_fastload_begin.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_fastload_begin.sql new file mode 100644 index 0000000000..d733f7b6c2 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_fastload_begin.sql @@ -0,0 +1,20 @@ +-- Postgres "fast load" session settings for DbSchemaCLI subset extracts. +-- +-- These settings apply only to the current session/connection and are intended for +-- disposable extract databases (where maximum durability is not required). +-- +-- IMPORTANT: +-- - Keep this file free of DO $$ blocks; some DbSchemaCLI builds split statements on semicolons +-- and don't handle dollar-quoted bodies reliably. +SET search_path TO TARGET_SCHEMA; + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; + +-- Big win for many commits / batched inserts (safe for ephemeral extract DBs). +SET synchronous_commit = off; + +-- Optional knobs (uncomment if you know you need them): +-- SET client_min_messages = warning; +-- SET temp_buffers = '64MB'; diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_fastload_end.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_fastload_end.sql new file mode 100644 index 0000000000..9646615917 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_fastload_end.sql @@ -0,0 +1,11 @@ +-- Reset session settings set by subset_pg_fastload_begin.sql. +SET search_path TO TARGET_SCHEMA; + +SET synchronous_commit TO DEFAULT; +SET statement_timeout TO DEFAULT; +SET lock_timeout TO DEFAULT; +SET idle_in_transaction_session_timeout TO DEFAULT; + +-- If you enabled any optional knobs above, reset them here too: +-- RESET client_min_messages; +-- RESET temp_buffers; diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_prepare_address_stage.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_prepare_address_stage.sql new file mode 100644 index 0000000000..c364236a55 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_prepare_address_stage.sql @@ -0,0 +1,5 @@ +-- Prepare the shared address staging table used by subset_transfer_chunk.sql. +-- This is a predeclared regular table (not TEMP) because DbSchemaCLI transfer work may use separate sessions. +SET search_path TO TARGET_SCHEMA; + +TRUNCATE TABLE TARGET_SCHEMA.subset_address_stage; diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_purge_bcomps_excluded.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_purge_bcomps_excluded.sql new file mode 100644 index 0000000000..4aa03f8410 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_purge_bcomps_excluded.sql @@ -0,0 +1,182 @@ +-- Compute BCOMPS-excluded corps ONCE in Postgres, then purge those corps from the corp-scoped COLIN extract tables. +-- +-- This is intended to replace the repeated Oracle-side "excluded_corps" computation previously embedded in every +-- transfer query (subset_transfer_chunk.sql). +-- +-- IMPORTANT: +-- - This script intentionally does NOT touch internal migration/processing tables (mig_*, corp_processing, +-- colin_tracking, affiliation_processing, etc). It only purges the corp-scoped COLIN extract tables +-- that are reloaded from Oracle. +-- - Because preserved processing/tracking tables still reference corporation/event rows, refresh mode must keep +-- FK enforcement suppressed across this purge window too (for example via replica_role, or by disabling +-- triggers on the preserved referencing tables too). +-- - This script avoids DO $$ blocks for DbSchemaCLI compatibility. +-- - The helper keyset tables are predeclared in the COLIN extract DDL and reused via TRUNCATE/INSERT. + +-- 1) Build keysets +SET search_path TO TARGET_SCHEMA; + +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corp_parties; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_events; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corps; + +INSERT INTO TARGET_SCHEMA.subset_excluded_corps (corp_num) +SELECT DISTINCT e.corp_num +FROM event e +JOIN filing f ON f.event_id = e.event_id +JOIN filing_user u ON u.event_id = e.event_id +WHERE e.corp_num IS NOT NULL + AND u.user_id = 'BCOMPS' + AND f.filing_type_cd IN ('BEINC', 'ICORP', 'ICORU', 'ICORC', 'CONTB', 'CONTI', 'CONTU', 'CONTC'); + +INSERT INTO TARGET_SCHEMA.subset_excluded_events (event_id) +SELECT DISTINCT e.event_id +FROM event e +JOIN subset_excluded_corps x ON x.corp_num = e.corp_num +WHERE e.event_id IS NOT NULL; + +INSERT INTO TARGET_SCHEMA.subset_excluded_corp_parties (corp_party_id) +SELECT DISTINCT cp.corp_party_id +FROM corp_party cp +JOIN subset_excluded_corps x ON x.corp_num = cp.corp_num +WHERE cp.corp_party_id IS NOT NULL; + +-- 2) Purge (delete child tables first) + +-- Event-scoped children +DELETE FROM TARGET_SCHEMA.notification_resend t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.notification t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.filing_user t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.payment t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.ledger_text t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.conv_ledger t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.conv_event t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.completing_party t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.submitting_party t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.corp_involved_cont_in t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.correction t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +DELETE FROM TARGET_SCHEMA.filing t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +-- corp_involved_amalgamating can reference corp_num via ted_corp_num/ting_corp_num as well as event_id. +-- Delete any rows where either side is excluded (covers non-event-owned references too). +DELETE FROM TARGET_SCHEMA.corp_involved_amalgamating t +USING subset_excluded_corps x +WHERE t.ted_corp_num = x.corp_num + OR t.ting_corp_num = x.corp_num; + +-- Corp-party related +DELETE FROM TARGET_SCHEMA.party_notification t +USING subset_excluded_corp_parties x +WHERE t.party_id = x.corp_party_id; + +DELETE FROM TARGET_SCHEMA.offices_held t +USING subset_excluded_corp_parties x +WHERE t.corp_party_id = x.corp_party_id; + +DELETE FROM TARGET_SCHEMA.corp_party_relationship t +USING subset_excluded_corp_parties x +WHERE t.corp_party_id = x.corp_party_id; + +DELETE FROM TARGET_SCHEMA.corp_party t +USING subset_excluded_corp_parties x +WHERE t.corp_party_id = x.corp_party_id; + +-- Corp-scoped tables +DELETE FROM TARGET_SCHEMA.office t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.corp_name t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.corp_state t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.corp_comments t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.corp_flag t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.cont_out t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.corp_restriction t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.jurisdiction t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.resolution t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +-- Share tables (delete deepest first) +DELETE FROM TARGET_SCHEMA.share_series t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.share_struct_cls t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +DELETE FROM TARGET_SCHEMA.share_struct t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +-- Events last (many tables reference event_id) +DELETE FROM TARGET_SCHEMA.event t +USING subset_excluded_events x +WHERE t.event_id = x.event_id; + +-- Corporation last +DELETE FROM TARGET_SCHEMA.corporation t +USING subset_excluded_corps x +WHERE t.corp_num = x.corp_num; + +-- 3) Cleanup helper tables +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corp_parties; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_events; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corps; diff --git a/jobs/colin-extract-refresh/src/subset/subset_pg_release_advisory_lock.sql b/jobs/colin-extract-refresh/src/subset/subset_pg_release_advisory_lock.sql new file mode 100644 index 0000000000..16cf108ab3 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_pg_release_advisory_lock.sql @@ -0,0 +1,8 @@ +-- Release the session-level advisory lock used to serialize subset load/refresh runs. +-- If the session ends unexpectedly, Postgres releases the advisory lock automatically. +SET search_path TO TARGET_SCHEMA; + +SELECT pg_advisory_unlock( + hashtext('lear:data-tool:colin_subset_extract'), + hashtext('subset_run') +); diff --git a/jobs/colin-extract-refresh/src/subset/subset_transfer_cars.sql b/jobs/colin-extract-refresh/src/subset/subset_transfer_cars.sql new file mode 100644 index 0000000000..115d2c9e12 --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_transfer_cars.sql @@ -0,0 +1,46 @@ +-- Global transfer TARGET_SCHEMA.of cars* tables from SOURCE Oracle DB (cprd) into TARGET Postgres extract DB (cprd_pg). +-- Intended to be executed from a master DbSchemaCLI script connected to the target Postgres DB (cprd_pg). +-- +-- These tables are NOT corp-scoped. The full dataset is transferred without filtering. +-- Volume is low enough that a full refresh is appropriate. +SET search_path TO TARGET_SCHEMA; + +transfer TARGET_SCHEMA.carsfile from cprd using +select + documtid, + filedate, + regiracf +from carsfile; + +transfer TARGET_SCHEMA.carsbox from cprd using +select + documtid, + accesnum, + batchnum, + boxrracf +from carsbox; + +transfer TARGET_SCHEMA.carsrept from cprd using +select + documtid, + docutype, + compnumb +from carsrept; + +transfer TARGET_SCHEMA.carindiv from cprd using +select + documtid, + replace(surname, CHR(0), '') as surname, + replace(firname, CHR(0), '') as firname, + replace(dircpoco, CHR(0), '') as dircpoco, + replace(dircflag, CHR(0), '') as dircflag, + replace(offiflag, CHR(0), '') as offiflag, + replace(chgreasn, CHR(0), '') as chgreasn, + replace(pfirname, CHR(0), '') as pfirname, + replace(psurname, CHR(0), '') as psurname, + replace(offtitle, CHR(0), '') as offtitle, + replace(dircaddr01, CHR(0), '') as dircaddr01, + replace(dircaddr02, CHR(0), '') as dircaddr02, + replace(dircaddr03, CHR(0), '') as dircaddr03, + replace(dircaddr04, CHR(0), '') as dircaddr04 +from carindiv; diff --git a/jobs/colin-extract-refresh/src/subset/subset_transfer_chunk.sql b/jobs/colin-extract-refresh/src/subset/subset_transfer_chunk.sql new file mode 100644 index 0000000000..05866e1cfc --- /dev/null +++ b/jobs/colin-extract-refresh/src/subset/subset_transfer_chunk.sql @@ -0,0 +1,1284 @@ +-- transfer TARGET_SCHEMA.a chunk (or a whole subset) of corps from the SOURCE Oracle DB (cprd) into the TARGET Postgres extract DB (cprd_pg). +-- +-- REQUIRED DbSchemaCLI variables (replace_variables=true): +-- target_corp_num_predicate : SQL predicate restricting the computed target_corp_num (NO trailing semicolon). +-- Examples: +-- target_corp_num in ('BC0460007','A1234567') +-- (target_corp_num in (...) OR target_corp_num in (...)) +-- oracle_corp_num_predicate : SQL predicate restricting Oracle corporation.corp_num (NO trailing semicolon). +-- Examples: +-- c.CORP_NUM in ('0460007','A1234567') +-- (c.CORP_NUM in (...) OR c.CORP_NUM in (...)) +-- oracle_corp_type_predicate : SQL predicate restricting Oracle corporation.corp_typ_cd (NO trailing semicolon). +-- Examples: +-- c.CORP_TYP_CD in ('BC','C','ULC','CUL','CC','CCC','QA','QB','QC','QD','QE') +-- c.CORP_TYP_CD in ('BC','C','ULC','CUL','CC','CCC','QA','QB','QC','QD','QE','CP') +-- +-- Intended to be executed from a master DbSchemaCLI script connected to the target Postgres DB (cprd_pg). +-- +-- IMPORTANT: +-- - This template intentionally avoids the boolean<->integer ALTER COLUMN hacks used in the full refresh script. +-- Instead, Oracle SELECTs emit boolean-friendly 'true'/'false' strings for Postgres boolean columns. +-- - This template transfers corp-scoped tables only (no cars* tables). +-- +-- Performance notes: +-- - BCOMPS exclusion is NOT computed in Oracle in this template (to avoid repeating expensive Oracle-side joins per table). +-- Instead, load the requested corp set and purge BCOMPS-excluded corps ONCE in Postgres after the transfer TARGET_SCHEMA.suite completes +-- (see: subset_pg_purge_bcomps_excluded.sql). +-- - Joins are written to start from the subset (corporation_cte) to avoid "0 rows but slow" plans. +-- - ORDER BY clauses are removed (sorting is unnecessary overhead for transfers). +-- +-- Example (legacy vset mode): +-- vset target_corp_num_predicate=target_corp_num in ('BC1111585','BC1226175'); +-- vset oracle_corp_num_predicate=c.CORP_NUM in ('1111585','1226175'); +SET search_path TO TARGET_SCHEMA; + +-- corporation +transfer TARGET_SCHEMA.corporation from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + -- altered from BC to BEN then BEN to BC before directed launch + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +), +last_ar as ( + select e.corp_num, + to_number(to_char(max(date_1), 'YYYY')) as last_ar_reminder_year + from eml_log e + join rep_data r on r.param_id = e.param_id + and r.t20_1 = e.corp_num + join corp_list cl on cl.corp_num = e.corp_num + group by e.corp_num +) +select c.target_corp_num as CORP_NUM, + c.CORP_FROZEN_TYP_CD as corp_frozen_type_cd, + case + when c.CORP_TYP_CD in ('QA', 'QB', 'QC', 'QD', 'QE') then 'BC' + else c.CORP_TYP_CD + end as CORP_TYPE_CD, + c.CORP_PASSWORD, + c.RECOGNITION_DTS, + c.BN_9, + c.BN_15, + c.ADMIN_EMAIL, + c.ACCESSION_NUM, + c.LAST_AR_FILED_DT, + case c.SEND_AR_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'true' + end as SEND_AR_IND, + la.last_ar_reminder_year as LAST_AR_REMINDER_YEAR +from corporation_cte c +left join last_ar la on la.corp_num = c.corp_num; + + +-- event +transfer TARGET_SCHEMA.event from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + c.target_corp_num as CORP_NUM, + e.event_typ_cd as event_type_cd, + e.event_timestmp as event_timerstamp, + e.trigger_dts +from corporation_cte c +join event e on e.corp_num = c.corp_num +-- not transferring BNUPD, ADDLEDGR events +where e.event_typ_cd not in ('BNUPD', 'ADDLEDGR'); + + +-- corp_name +transfer TARGET_SCHEMA.corp_name from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + cn.CORP_NAME_TYP_CD, + cn.start_event_id, + cn.end_event_id, + cn.CORP_NME as corp_name +from corporation_cte c +join CORP_NAME cn on cn.corp_num = c.corp_num; + + +-- corp_state +transfer TARGET_SCHEMA.corp_state from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + cs.STATE_TYP_CD as state_type_cd, + cos.op_state_typ_cd as op_state_type_cd, + cs.start_event_id, + cs.end_event_id +from corporation_cte c +join CORP_STATE cs on cs.corp_num = c.corp_num +join corp_op_state cos on cos.state_typ_cd = cs.state_typ_cd; + + +-- filing +transfer TARGET_SCHEMA.filing from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + f.filing_typ_cd as filing_type_cd, + f.effective_dt, + f.withdrawn_event_id, + trim(f.ods_typ_cd) as ods_type_cd, + f.nr_num, + f.COURT_ORDER_NUM, + f.CHANGE_DT, + f.PERIOD_END_DT, + case f.ARRANGEMENT_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'true' + end as ARRANGEMENT_IND, + f.AUTH_SIGN_DT, + case f.COURT_APPR_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'true' + end as COURT_APPR_IND +from corporation_cte c +join event e on e.corp_num = c.corp_num +join filing f on f.event_id = e.event_id; + + +-- filing_user +transfer TARGET_SCHEMA.filing_user from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + u.user_id, + u.last_nme as last_name, + u.first_nme as first_name, + u.middle_nme as middle_name, + u.email_addr, + u.BCOL_ACCT_NUM, + u.ROLE_TYP_CD +from corporation_cte c +join event e on e.corp_num = c.corp_num +join filing_user u on u.event_id = e.event_id; + + +-- address (shared/global table; stage then merge before loading dependents) +TRUNCATE TABLE TARGET_SCHEMA.subset_address_stage; + +transfer TARGET_SCHEMA.subset_address_stage from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select distinct + addr_id, + trim(province) as province, + trim(country_typ_cd) as country_typ_cd, + trim(replace(postal_cd, CHR(0), '')) as POSTAL_CD, + trim(addr_line_1) as addr_line_1, + trim(replace(addr_line_2, CHR(0), '')) as ADDR_LINE_2, + trim(addr_line_3) as addr_line_3, + trim(city) as city +from ( + select a.* + from corporation_cte c + join corp_party x on x.corp_num = c.corp_num + join address a on (x.delivery_addr_id = a.addr_id or x.mailing_addr_id = a.addr_id) + + UNION ALL + select a.* + from corporation_cte c + join office x on x.corp_num = c.corp_num + join address a on (x.delivery_addr_id = a.addr_id or x.mailing_addr_id = a.addr_id) + + UNION ALL + select a.* + from corporation_cte c + join event e on e.corp_num = c.corp_num + join completing_party x on x.event_id = e.event_id + join address a on x.mailing_addr_id = a.addr_id + + UNION ALL + select a.* + from corporation_cte c + join event e on e.corp_num = c.corp_num + join notification x on x.event_id = e.event_id + join address a on x.mailing_addr_id = a.addr_id + + UNION ALL + select a.* + from corporation_cte c + join event e on e.corp_num = c.corp_num + join notification_resend x on x.event_id = e.event_id + join address a on x.mailing_addr_id = a.addr_id + + UNION ALL + select a.* + from corporation_cte c + join event e on e.corp_num = c.corp_num + join submitting_party x on x.event_id = e.event_id + join address a on (x.notify_addr_id = a.addr_id or x.mailing_addr_id = a.addr_id) + + UNION ALL + select a.* + from corporation_cte c + join corp_party p on p.corp_num = c.corp_num + join party_notification x on x.party_id = p.corp_party_id + join address a on x.mailing_addr_id = a.addr_id +); + +INSERT INTO TARGET_SCHEMA.address ( + addr_id, + province, + country_typ_cd, + postal_cd, + addr_line_1, + addr_line_2, + addr_line_3, + city +) +SELECT s.addr_id, + s.province, + s.country_typ_cd, + s.postal_cd, + s.addr_line_1, + s.addr_line_2, + s.addr_line_3, + s.city +FROM ( + SELECT DISTINCT ON (addr_id) + addr_id, + province, + country_typ_cd, + postal_cd, + addr_line_1, + addr_line_2, + addr_line_3, + city + FROM TARGET_SCHEMA.subset_address_stage + WHERE addr_id IS NOT NULL + ORDER BY addr_id +) s +ON CONFLICT (addr_id) DO UPDATE +SET province = EXCLUDED.province, + country_typ_cd = EXCLUDED.country_typ_cd, + postal_cd = EXCLUDED.postal_cd, + addr_line_1 = EXCLUDED.addr_line_1, + addr_line_2 = EXCLUDED.addr_line_2, + addr_line_3 = EXCLUDED.addr_line_3, + city = EXCLUDED.city; + +TRUNCATE TABLE subset_address_stage; + + +-- office +transfer TARGET_SCHEMA.office from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + o.office_typ_cd, + o.start_event_id, + o.end_event_id, + o.mailing_addr_id, + o.delivery_addr_id +from corporation_cte c +join office o on o.corp_num = c.corp_num; + + +-- corp_comments +transfer TARGET_SCHEMA.corp_comments from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select cc.comment_dts, + c.target_corp_num as CORP_NUM, + cc.comments, + cc.USER_ID, + cc.FIRST_NME, + cc.LAST_NME, + cc.MIDDLE_NME, + cc.ACCESSION_COMMENTS +from corporation_cte c +join corp_comments cc on cc.corp_num = c.corp_num; + + +-- ledger_text +transfer TARGET_SCHEMA.ledger_text from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + l.notation, + l.USER_ID, + l.LEDGER_TEXT_DTS +from corporation_cte c +join event e on e.corp_num = c.corp_num +join ledger_text l on l.event_id = e.event_id; + + +-- corp_party +transfer TARGET_SCHEMA.corp_party from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select p.corp_party_id, + p.mailing_addr_id, + p.delivery_addr_id, + c.target_corp_num as CORP_NUM, + nvl(p.party_typ_cd, ' ') as party_typ_cd, + p.start_event_id, + p.end_event_id, + p.prev_party_id, + p.appointment_dt, + p.cessation_dt, + nvl(p.LAST_NME, ' ') as last_name, + nvl(p.MIDDLE_NME, ' ') as middle_name, + nvl(p.FIRST_NME, ' ') as first_name, + nvl(p.BUSINESS_NME, ' ') as business_name, + p.BUS_COMPANY_NUM, + p.CORR_TYP_CD, + p.OFFICE_NOTIFICATION_DT +from corporation_cte c +join corp_party p on p.corp_num = c.corp_num; + + +-- corp_party_relationship +transfer TARGET_SCHEMA.corp_party_relationship from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select cpr.CORP_PARTY_ID as corp_party_id, + cpr.RELATIONSHIP_TYP_CD as relationship_typ_cd +from corporation_cte c +join corp_party p on p.corp_num = c.corp_num +join CORP_PARTY_RELATIONSHIP cpr on cpr.corp_party_id = p.corp_party_id; + + +-- offices_held +transfer TARGET_SCHEMA.offices_held from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select oh.CORP_PARTY_ID as corp_party_id, + oh.OFFICER_TYP_CD as officer_typ_cd +from corporation_cte c +join corp_party p on p.corp_num = c.corp_num +join OFFICES_HELD oh on oh.corp_party_id = p.corp_party_id; + + +-- completing_party +transfer TARGET_SCHEMA.completing_party from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + cp.MAILING_ADDR_ID, + cp.FIRST_NME, + cp.LAST_NME, + cp.MIDDLE_NME, + cp.EMAIL_REQ_ADDRESS +from corporation_cte c +join event e on e.corp_num = c.corp_num +join completing_party cp on cp.event_id = e.event_id; + + +-- submitting_party +transfer TARGET_SCHEMA.submitting_party from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + sp.MAILING_ADDR_ID, + sp.NOTIFY_ADDR_ID, + sp.METHOD_TYP_CD, + sp.FIRST_NME, + sp.LAST_NME, + sp.MIDDLE_NME, + sp.EMAIL_REQ_ADDRESS, + sp.PICKUP_BY, + sp.BUSINESS_NME, + sp.NOTIFY_FIRST_NME, + sp.NOTIFY_LAST_NME, + sp.NOTIFY_MIDDLE_NME, + sp.PHONE_NUMBER +from corporation_cte c +join event e on e.corp_num = c.corp_num +join SUBMITTING_PARTY sp on sp.event_id = e.event_id; + + +-- corp_flag +transfer TARGET_SCHEMA.corp_flag from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + cf.CORP_FLAG_TYPE_CD, + cf.start_event_id, + cf.end_event_id +from corporation_cte c +join corp_flag cf on cf.corp_num = c.corp_num; + + +-- cont_out +transfer TARGET_SCHEMA.cont_out from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + co.CAN_JUR_TYP_CD, + co.CONT_OUT_DT, + co.OTHR_JURI_DESC, + co.HOME_COMPANY_NME, + co.start_event_id, + co.end_event_id +from corporation_cte c +join CONT_OUT co on co.corp_num = c.corp_num; + + +-- conv_event +transfer TARGET_SCHEMA.conv_event from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + ce.effective_dt, + case ce.REPORT_CORP_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'true' + end as REPORT_CORP_IND, + ce.ACTIVITY_USER_ID, + ce.ACTIVITY_DT, + ce.ANNUAL_FILE_DT, + ce.ACCESSION_NUM, + ce.REMARKS +from corporation_cte c +join event e on e.corp_num = c.corp_num +join CONV_EVENT ce on ce.event_id = e.event_id; + + +-- conv_ledger +transfer TARGET_SCHEMA.conv_ledger from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + cl.LEDGER_TITLE_TXT, + cl.LEDGER_DESC, + cl.cars_docmnt_id +from corporation_cte c +join event e on e.corp_num = c.corp_num +join CONV_LEDGER cl on cl.event_id = e.event_id; + + +-- corp_involved - amalgamaTING_businesses +transfer TARGET_SCHEMA.corp_involved_amalgamating from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id as event_id, + c.target_corp_num as ted_corp_num, + case + when c2.corp_typ_cd in ('BC', 'ULC', 'CC') then 'BC' || c2.corp_num + else c2.corp_num + end as ting_corp_num, + ci.CORP_INVOLVE_ID as corp_involve_id, + ci.CAN_JUR_TYP_CD as can_jur_typ_cd, + case ci.ADOPTED_CORP_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as adopted_corp_ind, + ci.HOME_JURI_NUM as home_juri_num, + ci.OTHR_JURI_DESC as othr_juri_desc, + ci.FOREIGN_NME as foreign_nme +from corporation_cte c +join event e on e.corp_num = c.corp_num +join CORP_INVOLVED ci on ci.event_id = e.event_id +join corporation c2 on c2.corp_num = ci.corp_num +where e.event_typ_cd = 'CONVAMAL' +UNION ALL +select e.event_id as event_id, + c.target_corp_num as ted_corp_num, + case + when c2.corp_typ_cd in ('BC', 'ULC', 'CC') then 'BC' || c2.corp_num + else c2.corp_num + end as ting_corp_num, + ci.CORP_INVOLVE_ID as corp_involve_id, + ci.CAN_JUR_TYP_CD as can_jur_typ_cd, + case ci.ADOPTED_CORP_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as adopted_corp_ind, + ci.HOME_JURI_NUM as home_juri_num, + ci.OTHR_JURI_DESC as othr_juri_desc, + ci.FOREIGN_NME as foreign_nme +from corporation_cte c +join event e on e.corp_num = c.corp_num +join filing f on f.event_id = e.event_id +join CORP_INVOLVED ci on ci.event_id = e.event_id +join corporation c2 on c2.corp_num = ci.corp_num +where f.filing_typ_cd in ('AMALH', 'AMALV', 'AMALR', 'AMLHU', 'AMLVU', 'AMLRU', 'AMLHC', 'AMLVC', 'AMLRC'); + + +-- corp_involved - continue_in_historical_xpro +transfer TARGET_SCHEMA.corp_involved_cont_in from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + c.target_corp_num as CORP_NUM +from corporation_cte c +join event e on e.corp_num = c.corp_num +join filing f on f.event_id = e.event_id +where f.filing_typ_cd in ('CONTI', 'CONTU', 'CONTC') + and exists (select 1 from CORP_INVOLVED ci where ci.event_id = e.event_id); + + +-- corp_restriction +transfer TARGET_SCHEMA.corp_restriction from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + case cr.RESTRICTION_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as RESTRICTION_IND, + cr.start_event_id, + cr.end_event_id +from corporation_cte c +join CORP_RESTRICTION cr on cr.corp_num = c.corp_num; + + +-- correction +transfer TARGET_SCHEMA.correction from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + c.target_corp_num as CORP_NUM, + corr.ASSOCIATED_DOC_DESC +from corporation_cte c +join event e on e.corp_num = c.corp_num +join CORRECTION corr on corr.event_id = e.event_id; + + +-- continued_in_from_jurisdiction +transfer TARGET_SCHEMA.jurisdiction from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + j.CAN_JUR_TYP_CD, + j.XPRO_TYP_CD, + j.HOME_RECOGN_DT, + j.OTHR_JURIS_DESC, + j.HOME_JURIS_NUM, + j.BC_XPRO_NUM, + j.HOME_COMPANY_NME, + j.start_event_id +from corporation_cte c +join JURISDICTION j on j.corp_num = c.corp_num; + + +-- resolution +transfer TARGET_SCHEMA.resolution from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + r.RESOLUTION_DT, + r.RESOLUTION_TYPE_CODE, + r.start_event_id, + r.end_event_id +from corporation_cte c +join RESOLUTION r on r.corp_num = c.corp_num; + + +-- share_struct +transfer TARGET_SCHEMA.share_struct from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + ss.start_event_id, + ss.end_event_id +from corporation_cte c +join SHARE_STRUCT ss on ss.corp_num = c.corp_num; + + +-- share_struct_cls +transfer TARGET_SCHEMA.share_struct_cls from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + ssc.SHARE_CLASS_ID, + replace(ssc.CLASS_NME, CHR(0), '') as CLASS_NME, + ssc.CURRENCY_TYP_CD, + case ssc.MAX_SHARE_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as MAX_SHARE_IND, + ssc.SHARE_QUANTITY, + case ssc.SPEC_RIGHTS_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as SPEC_RIGHTS_IND, + case ssc.PAR_VALUE_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as PAR_VALUE_IND, + ssc.PAR_VALUE_AMT + 0 as PAR_VALUE_AMT, + ssc.OTHER_CURRENCY, + ssc.start_event_id +from corporation_cte c +join SHARE_STRUCT_CLS ssc on ssc.corp_num = c.corp_num; + + +-- share_series +transfer TARGET_SCHEMA.share_series from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select c.target_corp_num as CORP_NUM, + ss.SHARE_CLASS_ID, + ss.SERIES_ID, + case ss.MAX_SHARE_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as MAX_SHARE_IND, + ss.SHARE_QUANTITY, + case ss.SPEC_RIGHT_IND + when 'N' then 'false' + when 'Y' then 'true' + else 'false' + end as SPEC_RIGHT_IND, + ss.SERIES_NME, + ss.start_event_id +from corporation_cte c +join SHARE_SERIES ss on ss.corp_num = c.corp_num; + + +-- notification +transfer TARGET_SCHEMA.notification from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + n.METHOD_TYP_CD, + n.mailing_addr_id, + n.FIRST_NME, + n.LAST_NME, + n.MIDDLE_NME, + n.PICKUP_BY, + n.EMAIL_ADDRESS, + n.PHONE_NUMBER +from corporation_cte c +join event e on e.corp_num = c.corp_num +join NOTIFICATION n on n.event_id = e.event_id; + + +-- notification_resend +transfer TARGET_SCHEMA.notification_resend from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select e.event_id, + nr.METHOD_TYP_CD, + nr.mailing_addr_id, + nr.FIRST_NME, + nr.LAST_NME, + nr.MIDDLE_NME, + nr.PICKUP_BY, + nr.EMAIL_ADDRESS, + nr.PHONE_NUMBER +from corporation_cte c +join event e on e.corp_num = c.corp_num +join NOTIFICATION_RESEND nr on nr.event_id = e.event_id; + + +-- party_notification +transfer TARGET_SCHEMA.party_notification from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select pn.PARTY_ID, + pn.METHOD_TYP_CD, + pn.mailing_addr_id, + pn.FIRST_NME, + pn.LAST_NME, + pn.MIDDLE_NME, + pn.BUSINESS_NME, + pn.PICKUP_BY, + pn.EMAIL_ADDRESS, + pn.PHONE_NUMBER +from corporation_cte c +join corp_party cp on cp.corp_num = c.corp_num +join PARTY_NOTIFICATION pn on pn.party_id = cp.corp_party_id; + + +-- payment +transfer TARGET_SCHEMA.payment from cprd using +with corp_list as ( + select /*+ materialize */ c.corp_num + from corporation c + where &oracle_corp_num_predicate + and &oracle_corp_type_predicate + and c.CORP_NUM not in ('0460007', '1255957', '1186381') +), +corporation_cte as ( + select * + from ( + select c.*, + case + when c.CORP_TYP_CD in ('BC', 'ULC', 'CC') then 'BC' || c.CORP_NUM + else c.CORP_NUM + end as target_corp_num + from corporation c + join corp_list cl on cl.corp_num = c.corp_num + ) + where &target_corp_num_predicate +) +select p.event_id, + p.payment_typ_cd, + p.cc_holder_nme +from corporation_cte c +join event e on e.corp_num = c.corp_num +join payment p on p.event_id = e.event_id;