diff --git a/data-tool/Makefile b/data-tool/Makefile index a646a1d30a..e6907203ba 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -89,15 +89,6 @@ run-prefect-server: ## Start Prefect server run-prefect-reset-db: ## clears all data and reapplies the schema. Handy for clearing flow run history. . $(PREFECT_VENV_DIR)/bin/activate && prefect server database reset -y -run-corps-migration: ## Run migration flow - @echo "Current directory: $(CURRENT_ABS_DIR)" - @echo "SQLAlchemy path: $(CURRENT_ABS_DIR)/$(SQLALCHEMY_VENV_DIR)/lib/$(PYTHON_VERSION)/site-packages" - . $(VENV_DIR)/bin/activate && \ - export PYTHONPATH="$(CURRENT_ABS_DIR):$$PYTHONPATH" && \ - export SQLALCHEMY_PATH="$(CURRENT_ABS_DIR)/$(SQLALCHEMY_VENV_DIR)/lib/$(PYTHON_VERSION)/site-packages" && \ - FLASK_ENV=development && \ - python flows/migrate_corps_flow.py - run-corps-delete: ## Run delete flow . $(VENV_DIR)/bin/activate && \ python flows/batch_delete_flow.py diff --git a/data-tool/flows/migrate_corps_flow.py b/data-tool/flows/migrate_corps_flow.py deleted file mode 100644 index a0e49fb1b2..0000000000 --- a/data-tool/flows/migrate_corps_flow.py +++ /dev/null @@ -1,475 +0,0 @@ -import os -import sys - -# Prefect 3 dependencies requires SQLAlchemy 2.x so we load what Prefect 3 needs by default. -# We override SqlAlchemy in this flow to use SQLAlchemy 1.4.44 which is still required by -# legal api dependencies. -# Add SQLAlchemy 1.4.44 to path - do this before any other imports -sqlalchemy_path = os.getenv('SQLALCHEMY_PATH') -if sqlalchemy_path: - print(f'Using SQLAlchemy from: {sqlalchemy_path}') - sys.path.insert(0, sqlalchemy_path) - from sqlalchemy import __version__, create_engine, engine, text - from sqlalchemy.exc import InvalidRequestError - print(f'SQLAlchemy version: {__version__}') - - -import pandas as pd -import prefect -from legal_api.models import db -from legal_api.models.db import init_db -from legal_api.models import Business -from prefect import flow, task, serve - - -from config import get_named_config -from flows.corps.corp_queries import get_unprocessed_corps_query -from flows.corps.event_filing_service import EventFilingService, IAEventFilings -from corps.filing_data_cleaning_utils import clean_offices_data, clean_corp_party_data, clean_corp_data, clean_event_data -from common.processing_status_service import ProcessingStatusService, ProcessingStatuses -from custom_filer.corps_filer import process_filing -from common.custom_exceptions import CustomException, CustomUnsupportedTypeException -from flows.corps.lear_data_utils import populate_filing_json_from_lear, populate_filing -from corps.filing_json_factory_service import FilingJsonFactoryService -from corps.filing_data_utils import get_previous_event_ids, \ - get_processed_event_ids, get_event_info_to_retrieve, is_in_lear -from flask import Flask -from datetime import timedelta - - - -@task( - name="colin_init", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def colin_init(config): - print("Initializing COLIN connection") - engine = create_engine(config.SQLALCHEMY_DATABASE_URI_COLIN_MIGR) - return engine - - -@task( - name="lear_init", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def lear_init(config): - print("Initializing LEAR connection") - FLASK_APP = Flask('init_lear') - FLASK_APP.config.from_object(config) - init_db(FLASK_APP) - FLASK_APP.app_context().push() - return FLASK_APP, db - - -@task( - name="get_config", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def get_config(): - config = get_named_config() - return config - - -@task( - name="get_unprocessed_corps", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def get_unprocessed_corps(config, db_engine: engine): - print("Getting unprocessed corporations") - logger = prefect.get_run_logger() - query = get_unprocessed_corps_query(config.DATA_LOAD_ENV) - sql_text = text(query) - - with db_engine.connect() as conn: - rs = conn.execute(sql_text) - df = pd.DataFrame(rs, columns=rs.keys()) - raw_data_dict = df.to_dict('records') - corp_nums = [x.get('corp_num') for x in raw_data_dict] - # logger.info(f'{len(raw_data_dict)} corp_nums to process from colin data: {corp_nums}') - status_service = ProcessingStatusService(config.DATA_LOAD_ENV, db_engine) - # TODO: optimize to update all records in one-shot - for corp_num in corp_nums: - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - processed_status=ProcessingStatuses.PROCESSING) - return raw_data_dict - - -@task( - name="get_event_filing_data", - # retries=3, - # retry_delay_seconds=60, - # cache_key_fn=task_input_hash, - # cache_expiration=timedelta(minutes=30), # Shorter expiration for data freshness - log_prints=True -) -def get_event_filing_data(config, colin_db_engine: engine, unprocessed_corp_dict: dict): - logger = prefect.get_run_logger() - corp_num = unprocessed_corp_dict.get('corp_num') - print(f"Starting event filing data processing for corp: {corp_num}") - status_service = ProcessingStatusService(config.DATA_LOAD_ENV, colin_db_engine) - event_filing_service = EventFilingService(colin_db_engine, config) - corp_name = '' - - try: - event_ids = unprocessed_corp_dict.get('event_ids') - correction_event_ids = unprocessed_corp_dict.get('correction_event_ids') - events_ids_to_process, event_filing_types_to_process = get_event_info_to_retrieve(unprocessed_corp_dict) - processed_events_ids = get_processed_event_ids(unprocessed_corp_dict) - unprocessed_corp_dict['retrieved_events_cnt'] = len(events_ids_to_process) - event_filing_data_arr = [] - - corp_comments = event_filing_service.get_corp_comments_data(corp_num) - unprocessed_corp_dict['corp_comments'] = corp_comments - unprocessed_corp_dict['correctionEventFilingMappings'] = {} - correction_event_filing_mappings = unprocessed_corp_dict['correctionEventFilingMappings'] - - prev_event_filing_data = None - for idx, event_id in enumerate(events_ids_to_process): - event_file_type = event_filing_types_to_process[idx] - is_supported_event_filing = event_filing_service.get_event_filing_is_supported(event_file_type) - # print(f'event_id: {event_id}, event_file_type: {event_file_type}, is_supported_event_filing: {is_supported_event_filing}') - prev_event_ids = get_previous_event_ids(event_ids, event_id) - event_filing_data_dict, is_corrected_event_filing, correction_event_id = \ - event_filing_service.get_event_filing_data(corp_num, - event_id, - event_file_type, - prev_event_filing_data, - prev_event_ids, - correction_event_ids, - correction_event_filing_mappings) - if is_corrected_event_filing: - correction_event_filing_mappings[correction_event_id] = { - 'correctedEventId': event_id, - 'learFilingType': event_filing_data_dict['target_lear_filing_type'] - } - - event_filing_data_arr.append({ - 'is_in_lear': is_in_lear(processed_events_ids, event_id), - 'is_supported_type': is_supported_event_filing, - 'skip_filing': event_filing_data_dict['skip_filing'], - 'data': event_filing_data_dict - }) - prev_event_filing_data = event_filing_data_dict - - unprocessed_corp_dict['event_filing_data'] = event_filing_data_arr - - except Exception as err: - error_msg = f'error getting event filing data {corp_num}, {corp_name}, {err}' - error_msg_minimal = f'error getting event filing data {corp_num}, {corp_name}' - logger.error(error_msg_minimal) - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - processed_status=ProcessingStatuses.FAILED, - failed_event_id=event_id, - failed_event_file_type=event_file_type, - last_error=error_msg) - raise CustomException(error_msg_minimal) - - print(f"Completed event filing data processing for corp: {corp_num}") - return unprocessed_corp_dict - - -@task( - name="clean_event_filing_data", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def clean_event_filing_data(config, colin_db_engine: engine, event_filing_data_dict: dict): - logger = prefect.get_run_logger() - corp_num = event_filing_data_dict.get('corp_num') - print(f"Starting data cleaning for corp: {corp_num}") - status_service = ProcessingStatusService(config.DATA_LOAD_ENV, colin_db_engine) - corp_name = '' - event_id = None - event_filing_type = None - - try: - event_filing_data_arr = event_filing_data_dict['event_filing_data'] - for event_filing_data in event_filing_data_arr: - if event_filing_data['is_supported_type'] and not event_filing_data['skip_filing']: - filing_data = event_filing_data['data'] - event_filing_type = filing_data['event_file_type'] - event_id=filing_data['e_event_id'] - clean_event_data(filing_data) - clean_corp_data(config, filing_data) - corp_name = filing_data['curr_corp_name'] - clean_corp_party_data(filing_data) - clean_offices_data(filing_data) - except Exception as err: - error_msg = f'error cleaning business {corp_num}, {corp_name}, {err}' - error_msg_minimal = f'error cleaning business {corp_num}, {corp_name}' - logger.error(error_msg_minimal) - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - processed_status=ProcessingStatuses.FAILED, - failed_event_id=event_id, - failed_event_file_type=event_filing_type, - last_error=error_msg) - raise CustomException(error_msg_minimal) - - print(f"Completed data cleaning for corp: {corp_num}") - return event_filing_data_dict - - -@task( - name="transform_event_filing_data", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def transform_event_filing_data(config, colin_db_engine: engine, event_filing_data_dict: dict): - logger = prefect.get_run_logger() - corp_num = event_filing_data_dict.get('corp_num') - print(f"Starting data transformation for corp: {corp_num}") - status_service = ProcessingStatusService(config.DATA_LOAD_ENV, colin_db_engine) - corp_name = '' - event_id = None - event_filing_type = None - - try: - event_filing_data_arr = event_filing_data_dict['event_filing_data'] - for event_filing_data in event_filing_data_arr: - if not event_filing_data['is_in_lear'] and event_filing_data['is_supported_type'] \ - and not event_filing_data['skip_filing']: - # process and create LEAR json filing dict - filing_data = event_filing_data['data'] - event_filing_type = filing_data['event_file_type'] - event_id=filing_data['e_event_id'] - corp_name = filing_data['curr_corp_name'] - corp_filing_json_factory_service = FilingJsonFactoryService(event_filing_data) - filing_json = corp_filing_json_factory_service.get_filing_json() - event_filing_data['filing_json'] = filing_json - except Exception as err: - error_msg = f'error transforming business {corp_num}, {corp_name}, {err}' - error_msg_minimal = f'error transforming business {corp_num}, {corp_name}' - logger.error(error_msg_minimal) - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - processed_status=ProcessingStatuses.FAILED, - failed_event_id=event_id, - failed_event_file_type=event_filing_type, - last_error=error_msg) - raise CustomException(error_msg_minimal) - - print(f"Completed data transformation for corp: {corp_num}") - return event_filing_data_dict - - -@task( - name="load_event_filing", - # retries=3, - # retry_delay_seconds=60, - log_prints=True -) -def load_event_filing_data(config, app: any, colin_db_engine: engine, db_lear, event_filing_data_dict: dict): - logger = prefect.get_run_logger() - corp_num = event_filing_data_dict.get('corp_num') - print(f"Starting data load for corp: {corp_num}") - status_service = ProcessingStatusService(config.DATA_LOAD_ENV, colin_db_engine) - corp_type = event_filing_data_dict['corp_type_cd'] - filings_count = event_filing_data_dict['cnt'] - corp_name = '' - event_id = None - event_filing_type = None - filing = None - filing_processed = False - - with app.app_context(): - try: - event_filing_data_arr = event_filing_data_dict['event_filing_data'] - for idx, event_filing_data in enumerate(event_filing_data_arr): - filing_data = event_filing_data['data'] - event_id=filing_data['e_event_id'] - event_filing_type = filing_data['event_file_type'] - - if not event_filing_data['is_supported_type']: - error_msg = f'could not finish processing this corp as there is an unsupported event/filing type: {event_filing_type}' - raise CustomUnsupportedTypeException(f'{error_msg}') - - if not event_filing_data['is_in_lear'] and not event_filing_data['skip_filing']: - # the corp_processing table should already track whether an event/filing has been processed and - # saved to lear but just to be safe a final check against lear is made to ensure the event/filing - # is not already in lear - # colin_event = get_colin_event(db_lear, event_id) - # if colin_event: - # error_msg = f'colin event id ({event_id}) already exists in lear: {event_filing_type}' - # raise CustomException(f'{error_msg}') - - business = None - if not IAEventFilings.has_value(event_filing_type): - business = Business.find_by_identifier(corp_num) - populate_filing_json_from_lear(db, event_filing_data, business) - corp_name = filing_data['curr_corp_name'] - - # save filing to filing table - filing = populate_filing(business, event_filing_data, filing_data) - filing.save() - - # process filing with custom filer function - business = process_filing(config, filing.id, event_filing_data_dict, filing_data, db_lear) - filing_processed = True - - event_cnt = event_filing_data_dict['retrieved_events_cnt'] - if event_cnt == (idx + 1): - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - corp_type=corp_type, - filings_count=filings_count, - processed_status=ProcessingStatuses.COMPLETED, - last_processed_event_id=event_id) - print(f"Completed data load for corp: {corp_num}") - else: - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - processed_status=ProcessingStatuses.PROCESSING, - last_processed_event_id=event_id) - except CustomUnsupportedTypeException as err: - error_msg = f'Partial loading of business {corp_num}, {corp_name}, {err}' - error_msg_minimal = f'Partial loading of business {corp_num}, {corp_name}' - logger.error(error_msg_minimal) - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - corp_type=corp_type, - filings_count=filings_count, - processed_status=ProcessingStatuses.PARTIAL, - failed_event_id=event_id, - failed_event_file_type=event_filing_type, - last_error=error_msg) - raise CustomException(error_msg_minimal) - except InvalidRequestError as err: - error_msg = f'error loading business InvalidRequestError: {corp_num}, {corp_name}, {err}' - error_msg_minimal = f'error loading business InvalidRequestError: {corp_num}, {corp_name}' - logger.error(error_msg_minimal) - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - corp_type=corp_type, - filings_count=filings_count, - processed_status=ProcessingStatuses.FAILED, - failed_event_id=event_id, - failed_event_file_type=event_filing_type, - last_error=error_msg) - logger.info(f'filing processed: {filing_processed}') - logger.info('lear db rollback') - db_lear.session.rollback() - - raise CustomException(error_msg_minimal) - except Exception as err: - error_msg = f'error loading business {corp_num}, {corp_name}, {err}' - error_msg_minimal = f'error loading business {corp_num}, {corp_name}' - logger.error(error_msg_minimal) - status_service.update_flow_status(flow_name='corps-flow', - corp_num=corp_num, - corp_name=corp_name, - corp_type=corp_type, - filings_count=filings_count, - processed_status=ProcessingStatuses.FAILED, - failed_event_id=event_id, - failed_event_file_type=event_filing_type, - last_error=error_msg) - logger.info(f'filing processed: {filing_processed}') - logger.info('lear db rollback') - db_lear.session.rollback() - - raise CustomException(error_msg_minimal) - - - -@flow( - name="Corps-Migrate-ETL", - description="Migrate corporation data through ETL process", - version="1.0", - log_prints=True -) -def migrate_flow(): - # setup - config = get_config() - db_colin_engine = colin_init(config) - FLASK_APP, db_lear = lear_init(config) - - unprocessed_corps = get_unprocessed_corps(config, db_colin_engine) - print(f"Found {len(unprocessed_corps)} corps to process") - - # Submit all event filing tasks in parallel - event_filing_futures = [] - for corp in unprocessed_corps: - future = get_event_filing_data.submit( - config=config, - colin_db_engine=db_colin_engine, - unprocessed_corp_dict=corp - ) - event_filing_futures.append(future) - - # Process results and submit cleaning tasks - clean_futures = [] - for future in event_filing_futures: - data = future.result() # Wait for result - if data: - clean_future = clean_event_filing_data.submit( - config=config, - colin_db_engine=db_colin_engine, - event_filing_data_dict=data - ) - clean_futures.append(clean_future) - - # Process cleaning results and submit transform tasks - transform_futures = [] - for future in clean_futures: - data = future.result() # Wait for result - if data: - transform_future = transform_event_filing_data.submit( - config=config, - colin_db_engine=db_colin_engine, - event_filing_data_dict=data - ) - transform_futures.append(transform_future) - - # Process transform results and submit load tasks - load_futures = [] - for future in transform_futures: - data = future.result() # Wait for result - if data: - load_future = load_event_filing_data.submit( - config=config, - app=FLASK_APP, - colin_db_engine=db_colin_engine, - db_lear=db_lear, - event_filing_data_dict=data - ) - load_futures.append(load_future) - - # Wait for all loads to complete - for future in load_futures: - future.result() - - -if __name__ == "__main__": - migrate_flow() - - -# if __name__ == "__main__": -# # Create deployment with schedule -# deployment = migrate_flow.to_deployment( -# name="corps-migration", -# interval=timedelta(seconds=15), # Run every 15 seconds -# tags=["corps-migration"] -# ) - -# # Start serving the deployment -# serve(deployment) diff --git a/data-tool/scripts/generate_cprd_subset_extract.py b/data-tool/scripts/generate_cprd_subset_extract.py index 479aba2f58..2286e6cc9e 100644 --- a/data-tool/scripts/generate_cprd_subset_extract.py +++ b/data-tool/scripts/generate_cprd_subset_extract.py @@ -98,12 +98,13 @@ class cfg_GenerationConfig: TMPL_TOKEN_TARGET_PRED = "&target_corp_num_predicate" # used by transfer template (Oracle-side) TMPL_TOKEN_ORACLE_PRED = "&oracle_corp_num_predicate" # used by transfer template (Oracle-side) TMPL_TOKEN_ORACLE_CORP_TYPE_PRED = "&oracle_corp_type_predicate" # used by transfer template (Oracle-side) - +TMPL_TOKEN_SCHEMA = "TARGET_SCHEMA" @dataclass(frozen=True) class tmpl_TemplateSpec: name: str path: Path + schema: str required_tokens: tuple[str, ...] = () @@ -292,70 +293,85 @@ def sql_render_pg_session_probe(label: str) -> str: # tmpl_* (template loading/validation/rendering) # ========================= -def tmpl_default_bundle(repo_root: Path) -> tmpl_TemplateBundle: - subset_dir = repo_root / "data-tool" / "scripts" / "subset" - +def tmpl_default_bundle(repo_root: Path, schema: str) -> tmpl_TemplateBundle: + subset_dir = repo_root / "colin-extract-refresh" / "src" / "subset" + pg_acquire_advisory_lock = tmpl_TemplateSpec( name="subset_pg_acquire_advisory_lock", path=subset_dir / "subset_pg_acquire_advisory_lock.sql", + schema=schema, ) pg_release_advisory_lock = tmpl_TemplateSpec( name="subset_pg_release_advisory_lock", path=subset_dir / "subset_pg_release_advisory_lock.sql", + schema=schema, ) pg_prepare_address_stage = tmpl_TemplateSpec( name="subset_pg_prepare_address_stage", path=subset_dir / "subset_pg_prepare_address_stage.sql", + schema=schema, ) pg_cleanup_address_stage = tmpl_TemplateSpec( name="subset_pg_cleanup_address_stage", path=subset_dir / "subset_pg_cleanup_address_stage.sql", + schema=schema, ) pg_cleanup_orphan_children = tmpl_TemplateSpec( name="subset_pg_cleanup_orphan_children", path=subset_dir / "subset_pg_cleanup_orphan_children.sql", + schema=schema, ) disable_triggers = tmpl_TemplateSpec( name="subset_disable_triggers", path=subset_dir / "subset_disable_triggers.sql", + schema=schema, ) enable_triggers = tmpl_TemplateSpec( name="subset_enable_triggers", path=subset_dir / "subset_enable_triggers.sql", + schema=schema, ) pg_boolean_casts = tmpl_TemplateSpec( name="subset_pg_boolean_casts", path=subset_dir / "subset_pg_boolean_casts.sql", + schema=schema, ) pg_fastload_begin = tmpl_TemplateSpec( name="subset_pg_fastload_begin", path=subset_dir / "subset_pg_fastload_begin.sql", + schema=schema, ) pg_fastload_end = tmpl_TemplateSpec( name="subset_pg_fastload_end", path=subset_dir / "subset_pg_fastload_end.sql", + schema=schema, ) pg_purge_bcomps_excluded = tmpl_TemplateSpec( name="subset_pg_purge_bcomps_excluded", path=subset_dir / "subset_pg_purge_bcomps_excluded.sql", + schema=schema, ) delete_chunk = tmpl_TemplateSpec( name="subset_delete_chunk", path=subset_dir / "subset_delete_chunk.sql", required_tokens=(TMPL_TOKEN_CORP_IDS,), + schema=schema, ) transfer_chunk = tmpl_TemplateSpec( name="subset_transfer_chunk", path=subset_dir / "subset_transfer_chunk.sql", required_tokens=(TMPL_TOKEN_TARGET_PRED, TMPL_TOKEN_ORACLE_PRED, TMPL_TOKEN_ORACLE_CORP_TYPE_PRED), + schema=schema, ) delete_cars = tmpl_TemplateSpec( name="subset_delete_cars", path=subset_dir / "subset_delete_cars.sql", + schema=schema, ) transfer_cars = tmpl_TemplateSpec( name="subset_transfer_cars", path=subset_dir / "subset_transfer_cars.sql", + schema=schema, ) return tmpl_TemplateBundle( @@ -384,7 +400,6 @@ def tmpl_load_text(spec: tmpl_TemplateSpec) -> str: tmpl_validate_tokens(spec, text) return text - def tmpl_validate_tokens(spec: tmpl_TemplateSpec, template_text: str) -> None: if not spec.required_tokens: return @@ -404,12 +419,23 @@ def tmpl_render(template_text: str, *, replacements: Dict[str, str]) -> str: out = out.replace(token, value) return out +def tmpl_resolve_execute_path(spec: tmpl_TemplateSpec, *, out_dir:Path) -> str: + text = tmpl_load_text(spec) + if TMPL_TOKEN_SCHEMA not in text: + return spec.path + out_dir.mkdir(parents=True, exist_ok=True) + rendered = tmpl_render(text, replacements={TMPL_TOKEN_SCHEMA: spec.schema}) + out_path = out_dir / f"{spec.name}.sql" + gen_write_text(out_path, rendered) + print(f"substitued =>{out_path}") + return out_path # ========================= # chunk_* (chunk planning) # ========================= def chunk_chunked(items: Sequence[str], size: int) -> List[List[str]]: + if size <= 0: raise ValueError("chunk size must be > 0") return [list(items[i:i + size]) for i in range(0, len(items), size)] @@ -462,12 +488,14 @@ def gen_build_chunk_sql( oracle_predicate_sql: str, oracle_corp_type_predicate_sql: str, pg_debug_session_probes: bool, + schema: str, ) -> str: replacements = { TMPL_TOKEN_CORP_IDS: corp_ids_sql, TMPL_TOKEN_TARGET_PRED: target_predicate_sql, TMPL_TOKEN_ORACLE_PRED: oracle_predicate_sql, TMPL_TOKEN_ORACLE_CORP_TYPE_PRED: oracle_corp_type_predicate_sql, + TMPL_TOKEN_SCHEMA: schema, } parts: List[str] = [] @@ -497,7 +525,8 @@ def gen_build_chunk_sql( rendered_transfer = tmpl_render(transfer_template_text, replacements=replacements) if (TMPL_TOKEN_TARGET_PRED in rendered_transfer or TMPL_TOKEN_ORACLE_PRED in rendered_transfer or - TMPL_TOKEN_ORACLE_CORP_TYPE_PRED in rendered_transfer): + TMPL_TOKEN_ORACLE_CORP_TYPE_PRED in rendered_transfer or + TMPL_TOKEN_SCHEMA in rendered_transfer): raise SystemExit( f"Internal error: token(s) remained after rendering transfer template for chunk {chunk.index:03d}." ) @@ -518,6 +547,7 @@ def gen_write_chunk_files( max_in_list: int, include_cp: bool, pg_debug_session_probes: bool, + schema: str ) -> List[Path]: out_paths: List[Path] = [] oracle_corp_type_predicate_sql = sql_render_oracle_corp_type_predicate(include_cp=include_cp) @@ -550,6 +580,7 @@ def gen_write_chunk_files( oracle_predicate_sql=oracle_predicate_sql, oracle_corp_type_predicate_sql=oracle_corp_type_predicate_sql, pg_debug_session_probes=pg_debug_session_probes, + schema=schema ) gen_write_text(ch.chunk_file, chunk_sql) out_paths.append(ch.chunk_file) @@ -558,7 +589,7 @@ def gen_write_chunk_files( def _gen_emit_pg_disable_begin(lines: List[str], *, cfg: cfg_GenerationConfig, templates: tmpl_TemplateBundle) -> None: if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: - lines.append(f"execute {templates.disable_triggers.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.disable_triggers, out_dir=cfg.out_chunks_dir).as_posix()}") if cfg.mode == cfg_GenerationMode.REFRESH: lines.append("-- Refresh-only: preserved processing/tracking tables still reference corporation/event rows.") lines.append("ALTER TABLE corp_processing DISABLE TRIGGER ALL;") @@ -578,7 +609,7 @@ def _gen_emit_pg_disable_begin(lines: List[str], *, cfg: cfg_GenerationConfig, t def _gen_emit_pg_disable_end(lines: List[str], *, cfg: cfg_GenerationConfig, templates: tmpl_TemplateBundle) -> None: if cfg.pg_disable_method == cfg_PgDisableMethod.TABLE_TRIGGERS: - lines.append(f"execute {templates.enable_triggers.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.enable_triggers, out_dir=cfg.out_chunks_dir).as_posix()}") if cfg.mode == cfg_GenerationMode.REFRESH: lines.append("-- Refresh-only: restore preserved processing/tracking table triggers too.") lines.append("ALTER TABLE corp_processing ENABLE TRIGGER ALL;") @@ -639,32 +670,27 @@ def gen_build_master_script_inline( lines.append("") lines.append(f"connect {cfg.target_connection};") lines.append("-- Serialize subset runs on this target DB.") - lines.append(f"execute {templates.pg_acquire_advisory_lock.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_acquire_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") lines.append("-- Prepare shared address staging table before learning schema") - lines.append(f"execute {templates.pg_prepare_address_stage.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_prepare_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append(f"learn schema {cfg.target_schema};") lines.append("") + lines.append(f"SET search_path TO {cfg.target_schema};") - lines.append("truncate table public.colin_extract_version; " - "insert into public.colin_extract_version (extracted_at) values (current_timestamp); " + lines.append("truncate table colin_extract_version; " + "insert into colin_extract_version (extracted_at) values (current_timestamp); " ) lines.append("") if cfg.pg_fastload: lines.append("-- Postgres fast-load mode (session-level settings)") - lines.append(f"execute {templates.pg_fastload_begin.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_begin, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") - lines.append("-- Postgres helper: allow VARCHAR/BPCHAR -> BOOLEAN assignment (DbSchemaCLI boolean inserts)") - lines.append(f"execute {templates.pg_boolean_casts.path.as_posix()}") - lines.append("-- Fail-fast: verify varchar/bpchar -> boolean casts exist") - lines.append("select 't'::varchar::boolean;") - lines.append("select 'f'::bpchar::boolean;") - lines.append("") if cfg.mode == cfg_GenerationMode.REFRESH: lines.append("-- Cleanup stale orphan child rows before entering the trigger-suppressed refresh window.") - lines.append(f"execute {templates.pg_cleanup_orphan_children.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_orphan_children, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") _gen_emit_pg_disable_begin(lines, cfg=cfg, templates=templates) @@ -676,8 +702,8 @@ def gen_build_master_script_inline( if cfg.include_cars: lines.append("-- global cars* refresh (not corp-scoped; full dataset truncate + reload)") - lines.append(f"execute {templates.delete_cars.path.as_posix()}") - lines.append(f"execute {templates.transfer_cars.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.delete_cars, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_cars, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") if delete_chunk_files: @@ -702,15 +728,15 @@ def gen_build_master_script_inline( _gen_emit_pg_disable_end(lines, cfg=cfg, templates=templates) lines.append("-- Cleanup shared address staging table") - lines.append(f"execute {templates.pg_cleanup_address_stage.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") lines.append("-- Release subset-run advisory lock") - lines.append(f"execute {templates.pg_release_advisory_lock.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_release_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") if cfg.pg_fastload: lines.append("-- Reset Postgres fast-load session settings") - lines.append(f"execute {templates.pg_fastload_end.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_end, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") return "\n".join(lines) @@ -743,45 +769,40 @@ def gen_build_master_script_vset( lines.append("") lines.append(f"connect {cfg.target_connection};") lines.append("-- Serialize subset runs on this target DB.") - lines.append(f"execute {templates.pg_acquire_advisory_lock.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_acquire_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") lines.append("-- Prepare shared address staging table before learning schema") - lines.append(f"execute {templates.pg_prepare_address_stage.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_prepare_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append(f"learn schema {cfg.target_schema};") lines.append("") - lines.append("truncate table public.colin_extract_version; " - "insert into public.colin_extract_version (extracted_at) values (current_timestamp); " + lines.append("truncate table colin_extract_version; " + "insert into colin_extract_version (extracted_at) values (current_timestamp); " ) lines.append("") if cfg.pg_fastload: lines.append("-- Postgres fast-load mode (session-level settings)") - lines.append(f"execute {templates.pg_fastload_begin.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_begin, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") - lines.append("-- Postgres helper: allow VARCHAR/BPCHAR -> BOOLEAN assignment (DbSchemaCLI boolean inserts)") - lines.append(f"execute {templates.pg_boolean_casts.path.as_posix()}") - lines.append("-- Fail-fast: verify varchar/bpchar -> boolean casts exist") - lines.append("select 't'::varchar::boolean;") - lines.append("select 'f'::bpchar::boolean;") - lines.append("") + if cfg.mode == cfg_GenerationMode.REFRESH: lines.append("-- Cleanup stale orphan child rows before entering the trigger-suppressed refresh window.") - lines.append(f"execute {templates.pg_cleanup_orphan_children.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_orphan_children, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") _gen_emit_pg_disable_begin(lines, cfg=cfg, templates=templates) _gen_emit_refresh_fk_note(lines, cfg=cfg) if cfg.mode == cfg_GenerationMode.REFRESH: lines.append("-- Cleanup stale orphan child rows before chunked refresh deletes.") - lines.append(f"execute {templates.pg_cleanup_orphan_children.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_orphan_children, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") if cfg.include_cars: lines.append("-- global cars* refresh (not corp-scoped; full dataset truncate + reload)") - lines.append(f"execute {templates.delete_cars.path.as_posix()}") - lines.append(f"execute {templates.transfer_cars.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.delete_cars, out_dir=cfg.out_chunks_dir).as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_cars, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") def _vset_predicates(target_values: Sequence[str], oracle_values: Sequence[str]) -> None: @@ -810,13 +831,13 @@ def _vset_predicates(target_values: Sequence[str], oracle_values: Sequence[str]) for idx, ch in enumerate(delete_chunks, start=1): lines.append(f"-- delete chunk {idx:03d}/{total_del:03d} (target corps={len(ch)})") lines.append(f"vset corp_ids_in={','.join(sql_quote_literal(v) for v in ch)}") - lines.append(f"execute {templates.delete_chunk.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.delete_chunk, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") if effective_strategy == cfg_OracleInStrategy.OR_OF_IN_LISTS: lines.append("-- transfer subset (single pass via OR-of-IN-lists predicate)") _vset_predicates(corp_ids, oracle_ids_all) - lines.append(f"execute {templates.transfer_chunk.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_chunk, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") else: lines.append("-- transfer subset (chunked)") @@ -827,25 +848,25 @@ def _vset_predicates(target_values: Sequence[str], oracle_values: Sequence[str]) f"-- transfer chunk {idx:03d}/{total_tr:03d} (target corps={len(ch)}, oracle corp_num={len(oracle_ids)})" ) _vset_predicates(ch, oracle_ids) - lines.append(f"execute {templates.transfer_chunk.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.transfer_chunk, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") lines.append("-- purge BCOMPS-excluded corps (computed in Postgres after load)") - lines.append(f"execute {templates.pg_purge_bcomps_excluded.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_purge_bcomps_excluded, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") _gen_emit_pg_disable_end(lines, cfg=cfg, templates=templates) lines.append("-- Cleanup shared address staging table") - lines.append(f"execute {templates.pg_cleanup_address_stage.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_cleanup_address_stage, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") lines.append("-- Release subset-run advisory lock") - lines.append(f"execute {templates.pg_release_advisory_lock.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_release_advisory_lock, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") if cfg.pg_fastload: lines.append("-- Reset Postgres fast-load session settings") - lines.append(f"execute {templates.pg_fastload_end.path.as_posix()}") + lines.append(f"execute {tmpl_resolve_execute_path(templates.pg_fastload_end, out_dir=cfg.out_chunks_dir).as_posix()}") lines.append("") return "\n".join(lines) @@ -972,6 +993,7 @@ def cfg_resolve_repo_root() -> Path: def cfg_build_config(args: argparse.Namespace) -> cfg_GenerationConfig: repo_root = cfg_resolve_repo_root() + print(repo_root) corp_file = Path(args.corp_file).expanduser().resolve() if not corp_file.exists(): @@ -990,12 +1012,15 @@ def cfg_build_config(args: argparse.Namespace) -> cfg_GenerationConfig: out_master = ( Path(args.out).expanduser().resolve() if args.out - else (repo_root / "data-tool" / "scripts" / "generated" / f"subset_{mode.value}.sql") + else (repo_root / "colin-extract-refresh" / "src" / "subset" / "generated" / f"subset_{mode.value}.sql") ) out_master.parent.mkdir(parents=True, exist_ok=True) # Chunk scripts dir is always derived from master output stem for determinism. out_chunks_dir = out_master.parent / f"{out_master.stem}_chunks" + + print(out_master, out_chunks_dir) + if render_mode == cfg_RenderMode.INLINE: out_chunks_dir.mkdir(parents=True, exist_ok=True) @@ -1033,7 +1058,7 @@ def _effective_oracle_strategy(cfg: cfg_GenerationConfig, total_ids: int) -> cfg def run(cfg: cfg_GenerationConfig) -> int: - templates = tmpl_default_bundle(cfg.repo_root) + templates = tmpl_default_bundle(cfg.repo_root, cfg.target_schema) if cfg.pg_debug_session_probes and cfg.render_mode != cfg_RenderMode.INLINE: raise SystemExit("--pg-debug-session-probes currently supports only --render-mode inline.") @@ -1041,7 +1066,6 @@ def run(cfg: cfg_GenerationConfig) -> int: # Load/validate templates we depend on (fail fast if template contract changes). delete_template_text = tmpl_load_text(templates.delete_chunk) transfer_template_text = tmpl_load_text(templates.transfer_chunk) - # Ensure the execute-only templates exist too (even though we don't render them). for spec in ( templates.pg_acquire_advisory_lock, @@ -1087,6 +1111,7 @@ def run(cfg: cfg_GenerationConfig) -> int: max_in_list=cfg.chunk_size, include_cp=cfg.include_cp, pg_debug_session_probes=cfg.pg_debug_session_probes, + schema=cfg.target_schema, ) transfer_files = combined_files @@ -1104,6 +1129,7 @@ def run(cfg: cfg_GenerationConfig) -> int: max_in_list=cfg.chunk_size, include_cp=cfg.include_cp, pg_debug_session_probes=cfg.pg_debug_session_probes, + schema=cfg.target_schema, ) oracle_ids_all = corp_to_oracle_ids(target_ids) @@ -1124,13 +1150,14 @@ def run(cfg: cfg_GenerationConfig) -> int: max_in_list=cfg.chunk_size, include_cp=cfg.include_cp, pg_debug_session_probes=cfg.pg_debug_session_probes, + schema=cfg.target_schema, ) - pg_purge_script_path = templates.pg_purge_bcomps_excluded.path + pg_purge_script_path = tmpl_resolve_execute_path(templates.pg_purge_bcomps_excluded, out_dir=cfg.out_chunks_dir) if cfg.pg_debug_session_probes: pg_purge_script_path = gen_write_pg_debug_copy( out_path=cfg.out_chunks_dir / "pg_purge_bcomps_excluded_debug.sql", - source_path=templates.pg_purge_bcomps_excluded.path, + source_path=pg_purge_script_path, probe_label="execute:pg_purge_bcomps_excluded", ) @@ -1179,6 +1206,7 @@ def run(cfg: cfg_GenerationConfig) -> int: print(" - Address loads use the predeclared helper table public.subset_address_stage and merge into public.address by addr_id.") print(" - BCOMPS purge keysets also use predeclared helper tables in the extract schema (subset_excluded_*).") print(" - subset runs should not overlap on the same target DB, and the runtime role must be able to truncate/read/write those helper tables.") + print(f"schema passed down as : {cfg.target_schema}") if cfg.pg_debug_session_probes: print(" - Postgres session probes: ENABLED (--pg-debug-session-probes)") print(" - master + nested execute files will print pg_backend_pid/current_user/session_replication_role.") diff --git a/data-tool/scripts/subset/subset_disable_triggers.sql b/data-tool/scripts/subset/subset_disable_triggers.sql index 6089f524c8..a56f3952f6 100644 --- a/data-tool/scripts/subset/subset_disable_triggers.sql +++ b/data-tool/scripts/subset/subset_disable_triggers.sql @@ -1,38 +1,61 @@ --- Disable triggers for corp-scoped tables (subset refresh/load). --- Intended to be executed from a master DbSchemaCLI script while connected to the target Postgres DB. +-- -- Disable triggers for corp-scoped tables (subset refresh/load). +-- -- Intended to be executed from a master DbSchemaCLI script while connected to the target Postgres DB. + +-- ALTER TABLE corporation DISABLE TRIGGER ALL; +-- ALTER TABLE corp_name DISABLE TRIGGER ALL; +-- ALTER TABLE corp_state DISABLE TRIGGER ALL; +-- ALTER TABLE event DISABLE TRIGGER ALL; +-- ALTER TABLE filing DISABLE TRIGGER ALL; +-- ALTER TABLE filing_user DISABLE TRIGGER ALL; +-- ALTER TABLE office DISABLE TRIGGER ALL; +-- ALTER TABLE corp_comments DISABLE TRIGGER ALL; +-- ALTER TABLE ledger_text DISABLE TRIGGER ALL; +-- ALTER TABLE corp_party DISABLE TRIGGER ALL; +-- ALTER TABLE corp_party_relationship DISABLE TRIGGER ALL; +-- ALTER TABLE offices_held DISABLE TRIGGER ALL; +-- ALTER TABLE completing_party DISABLE TRIGGER ALL; +-- ALTER TABLE submitting_party DISABLE TRIGGER ALL; +-- ALTER TABLE corp_flag DISABLE TRIGGER ALL; +-- ALTER TABLE cont_out DISABLE TRIGGER ALL; +-- ALTER TABLE conv_event DISABLE TRIGGER ALL; +-- ALTER TABLE conv_ledger DISABLE TRIGGER ALL; +-- ALTER TABLE corp_involved_amalgamating DISABLE TRIGGER ALL; +-- ALTER TABLE corp_involved_cont_in DISABLE TRIGGER ALL; +-- ALTER TABLE corp_restriction DISABLE TRIGGER ALL; +-- ALTER TABLE correction DISABLE TRIGGER ALL; +-- ALTER TABLE jurisdiction DISABLE TRIGGER ALL; +-- ALTER TABLE resolution DISABLE TRIGGER ALL; +-- ALTER TABLE share_series DISABLE TRIGGER ALL; +-- ALTER TABLE share_struct DISABLE TRIGGER ALL; +-- ALTER TABLE share_struct_cls DISABLE TRIGGER ALL; +-- ALTER TABLE notification DISABLE TRIGGER ALL; +-- ALTER TABLE notification_resend DISABLE TRIGGER ALL; +-- ALTER TABLE party_notification DISABLE TRIGGER ALL; +-- ALTER TABLE payment DISABLE TRIGGER ALL; +-- ALTER TABLE carsfile DISABLE TRIGGER ALL; +-- ALTER TABLE carsbox DISABLE TRIGGER ALL; +-- ALTER TABLE carsrept DISABLE TRIGGER ALL; +-- ALTER TABLE carindiv DISABLE TRIGGER ALL; + +ALTER TABLE TARGET_SCHEMA.notification + DROP CONSTRAINT fk_notification_filing ; +ALTER TABLE TARGET_SCHEMA.office + DROP CONSTRAINT fk_office_mailing_address ; +ALTER TABLE TARGET_SCHEMA.office +DROP CONSTRAINT fk_office_delivery_address ; +ALTER TABLE TARGET_SCHEMA.corp_party + DROP CONSTRAINT fk_corp_party_mailing_address ; +ALTER TABLE TARGET_SCHEMA.completing_party + DROP CONSTRAINT fk_completing_party_address ; +ALTER TABLE TARGET_SCHEMA.office + DROP CONSTRAINT fk_corp_party_delivery_address ; +ALTER TABLE TARGET_SCHEMA.notification + DROP CONSTRAINT fk_notification_address ; +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN arrangement_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.corporation ALTER COLUMN send_ar_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_series ALTER COLUMN max_share_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN max_share_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN court_appr_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN spec_rights_ind TYPE CHARACTER VARYING(255); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN par_value_ind TYPE CHARACTER VARYING(255); -ALTER TABLE corporation DISABLE TRIGGER ALL; -ALTER TABLE corp_name DISABLE TRIGGER ALL; -ALTER TABLE corp_state DISABLE TRIGGER ALL; -ALTER TABLE event DISABLE TRIGGER ALL; -ALTER TABLE filing DISABLE TRIGGER ALL; -ALTER TABLE filing_user DISABLE TRIGGER ALL; -ALTER TABLE office DISABLE TRIGGER ALL; -ALTER TABLE corp_comments DISABLE TRIGGER ALL; -ALTER TABLE ledger_text DISABLE TRIGGER ALL; -ALTER TABLE corp_party DISABLE TRIGGER ALL; -ALTER TABLE corp_party_relationship DISABLE TRIGGER ALL; -ALTER TABLE offices_held DISABLE TRIGGER ALL; -ALTER TABLE completing_party DISABLE TRIGGER ALL; -ALTER TABLE submitting_party DISABLE TRIGGER ALL; -ALTER TABLE corp_flag DISABLE TRIGGER ALL; -ALTER TABLE cont_out DISABLE TRIGGER ALL; -ALTER TABLE conv_event DISABLE TRIGGER ALL; -ALTER TABLE conv_ledger DISABLE TRIGGER ALL; -ALTER TABLE corp_involved_amalgamating DISABLE TRIGGER ALL; -ALTER TABLE corp_involved_cont_in DISABLE TRIGGER ALL; -ALTER TABLE corp_restriction DISABLE TRIGGER ALL; -ALTER TABLE correction DISABLE TRIGGER ALL; -ALTER TABLE jurisdiction DISABLE TRIGGER ALL; -ALTER TABLE resolution DISABLE TRIGGER ALL; -ALTER TABLE share_series DISABLE TRIGGER ALL; -ALTER TABLE share_struct DISABLE TRIGGER ALL; -ALTER TABLE share_struct_cls DISABLE TRIGGER ALL; -ALTER TABLE notification DISABLE TRIGGER ALL; -ALTER TABLE notification_resend DISABLE TRIGGER ALL; -ALTER TABLE party_notification DISABLE TRIGGER ALL; -ALTER TABLE payment DISABLE TRIGGER ALL; -ALTER TABLE carsfile DISABLE TRIGGER ALL; -ALTER TABLE carsbox DISABLE TRIGGER ALL; -ALTER TABLE carsrept DISABLE TRIGGER ALL; -ALTER TABLE carindiv DISABLE TRIGGER ALL; diff --git a/data-tool/scripts/subset/subset_enable_triggers.sql b/data-tool/scripts/subset/subset_enable_triggers.sql index 071691976d..2b68c14fb9 100644 --- a/data-tool/scripts/subset/subset_enable_triggers.sql +++ b/data-tool/scripts/subset/subset_enable_triggers.sql @@ -1,38 +1,57 @@ --- Enable triggers for corp-scoped tables (subset refresh/load). --- Intended to be executed from a master DbSchemaCLI script while connected to the target Postgres DB. +-- -- Enable triggers for corp-scoped tables (subset refresh/load). +-- -- Intended to be executed from a master DbSchemaCLI script while connected to the target Postgres DB. -ALTER TABLE corporation ENABLE TRIGGER ALL; -ALTER TABLE corp_name ENABLE TRIGGER ALL; -ALTER TABLE corp_state ENABLE TRIGGER ALL; -ALTER TABLE event ENABLE TRIGGER ALL; -ALTER TABLE filing ENABLE TRIGGER ALL; -ALTER TABLE filing_user ENABLE TRIGGER ALL; -ALTER TABLE office ENABLE TRIGGER ALL; -ALTER TABLE corp_comments ENABLE TRIGGER ALL; -ALTER TABLE ledger_text ENABLE TRIGGER ALL; -ALTER TABLE corp_party ENABLE TRIGGER ALL; -ALTER TABLE corp_party_relationship ENABLE TRIGGER ALL; -ALTER TABLE offices_held ENABLE TRIGGER ALL; -ALTER TABLE completing_party ENABLE TRIGGER ALL; -ALTER TABLE submitting_party ENABLE TRIGGER ALL; -ALTER TABLE corp_flag ENABLE TRIGGER ALL; -ALTER TABLE cont_out ENABLE TRIGGER ALL; -ALTER TABLE conv_event ENABLE TRIGGER ALL; -ALTER TABLE conv_ledger ENABLE TRIGGER ALL; -ALTER TABLE corp_involved_amalgamating ENABLE TRIGGER ALL; -ALTER TABLE corp_involved_cont_in ENABLE TRIGGER ALL; -ALTER TABLE corp_restriction ENABLE TRIGGER ALL; -ALTER TABLE correction ENABLE TRIGGER ALL; -ALTER TABLE jurisdiction ENABLE TRIGGER ALL; -ALTER TABLE resolution ENABLE TRIGGER ALL; -ALTER TABLE share_series ENABLE TRIGGER ALL; -ALTER TABLE share_struct ENABLE TRIGGER ALL; -ALTER TABLE share_struct_cls ENABLE TRIGGER ALL; -ALTER TABLE notification ENABLE TRIGGER ALL; -ALTER TABLE notification_resend ENABLE TRIGGER ALL; -ALTER TABLE party_notification ENABLE TRIGGER ALL; -ALTER TABLE payment ENABLE TRIGGER ALL; -ALTER TABLE carsfile ENABLE TRIGGER ALL; -ALTER TABLE carsbox ENABLE TRIGGER ALL; -ALTER TABLE carsrept ENABLE TRIGGER ALL; -ALTER TABLE carindiv ENABLE TRIGGER ALL; +-- ALTER TABLE corporation ENABLE TRIGGER ALL; +-- ALTER TABLE corp_name ENABLE TRIGGER ALL; +-- ALTER TABLE corp_state ENABLE TRIGGER ALL; +-- ALTER TABLE event ENABLE TRIGGER ALL; +-- ALTER TABLE filing ENABLE TRIGGER ALL; +-- ALTER TABLE filing_user ENABLE TRIGGER ALL; +-- ALTER TABLE office ENABLE TRIGGER ALL; +-- ALTER TABLE corp_comments ENABLE TRIGGER ALL; +-- ALTER TABLE ledger_text ENABLE TRIGGER ALL; +-- ALTER TABLE corp_party ENABLE TRIGGER ALL; +-- ALTER TABLE corp_party_relationship ENABLE TRIGGER ALL; +-- ALTER TABLE offices_held ENABLE TRIGGER ALL; +-- ALTER TABLE completing_party ENABLE TRIGGER ALL; +-- ALTER TABLE submitting_party ENABLE TRIGGER ALL; +-- ALTER TABLE corp_flag ENABLE TRIGGER ALL; +-- ALTER TABLE cont_out ENABLE TRIGGER ALL; +-- ALTER TABLE conv_event ENABLE TRIGGER ALL; +-- ALTER TABLE conv_ledger ENABLE TRIGGER ALL; +-- ALTER TABLE corp_involved_amalgamating ENABLE TRIGGER ALL; +-- ALTER TABLE corp_involved_cont_in ENABLE TRIGGER ALL; +-- ALTER TABLE corp_restriction ENABLE TRIGGER ALL; +-- ALTER TABLE correction ENABLE TRIGGER ALL; +-- ALTER TABLE jurisdiction ENABLE TRIGGER ALL; +-- ALTER TABLE resolution ENABLE TRIGGER ALL; +-- ALTER TABLE share_series ENABLE TRIGGER ALL; +-- ALTER TABLE share_struct ENABLE TRIGGER ALL; +-- ALTER TABLE share_struct_cls ENABLE TRIGGER ALL; +-- ALTER TABLE notification ENABLE TRIGGER ALL; +-- ALTER TABLE notification_resend ENABLE TRIGGER ALL; +-- ALTER TABLE party_notification ENABLE TRIGGER ALL; +-- ALTER TABLE payment ENABLE TRIGGER ALL; +-- ALTER TABLE carsfile ENABLE TRIGGER ALL; +-- ALTER TABLE carsbox ENABLE TRIGGER ALL; +-- ALTER TABLE carsrept ENABLE TRIGGER ALL; +-- ALTER TABLE carindiv ENABLE TRIGGER ALL; + +SET search_path TO TARGET_SCHEMA; + +ALTER TABLE TARGET_SCHEMA.notification ADD CONSTRAINT fk_notification_filing FOREIGN KEY (event_id) REFERENCES TARGET_SCHEMA.filing (event_id); +ALTER TABLE TARGET_SCHEMA.office ADD CONSTRAINT fk_office_mailing_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.office ADD CONSTRAINT fk_office_delivery_address FOREIGN KEY (delivery_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.office ADD CONSTRAINT fk_corp_party_delivery_address FOREIGN KEY (delivery_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.corp_party ADD CONSTRAINT fk_corp_party_mailing_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.completing_party ADD CONSTRAINT fk_completing_party_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); +ALTER TABLE TARGET_SCHEMA.notification ADD CONSTRAINT fk_notification_address FOREIGN KEY (mailing_addr_id) REFERENCES TARGET_SCHEMA.address (addr_id); + + +ALTER TABLE TARGET_SCHEMA.corporation ALTER COLUMN send_ar_ind TYPE boolean USING (CASE send_ar_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN arrangement_ind TYPE boolean USING (CASE arrangement_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.filing ALTER COLUMN court_appr_ind TYPE boolean USING (CASE court_appr_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_series ALTER COLUMN max_share_ind TYPE boolean USING (CASE max_share_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN max_share_ind TYPE boolean USING (CASE max_share_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN spec_rights_ind TYPE boolean USING (CASE spec_rights_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); +ALTER TABLE TARGET_SCHEMA.share_struct_cls ALTER COLUMN par_value_ind TYPE boolean USING (CASE par_value_ind WHEN 'true' THEN true WHEN 'false' THEN false ELSE true END); diff --git a/data-tool/scripts/subset/subset_pg_boolean_casts.sql b/data-tool/scripts/subset/subset_pg_boolean_casts.sql index b97a72568d..9aa38ae083 100644 --- a/data-tool/scripts/subset/subset_pg_boolean_casts.sql +++ b/data-tool/scripts/subset/subset_pg_boolean_casts.sql @@ -23,33 +23,33 @@ -- DbSchemaCLI splits statements on semicolons and does not reliably handle semicolons inside dollar-quoted -- bodies. Keep dollar-quoted bodies free of internal semicolons and avoid DO $$ blocks. -CREATE OR REPLACE FUNCTION public.dbcli_varchar_to_boolean(val varchar) -RETURNS boolean -LANGUAGE sql -IMMUTABLE -STRICT -AS $$ - SELECT (val::text)::boolean -$$; +-- CREATE OR REPLACE FUNCTION public.dbcli_varchar_to_boolean(val varchar) +-- RETURNS boolean +-- LANGUAGE sql +-- IMMUTABLE +-- STRICT +-- AS $$ +-- SELECT (val::text)::boolean +-- $$; -CREATE OR REPLACE FUNCTION public.dbcli_bpchar_to_boolean(val bpchar) -RETURNS boolean -LANGUAGE sql -IMMUTABLE -STRICT -AS $$ - SELECT (val::text)::boolean -$$; +-- CREATE OR REPLACE FUNCTION public.dbcli_bpchar_to_boolean(val bpchar) +-- RETURNS boolean +-- LANGUAGE sql +-- IMMUTABLE +-- STRICT +-- AS $$ +-- SELECT (val::text)::boolean +-- $$; --- Recreate casts in an idempotent way (Postgres has no CREATE CAST IF NOT EXISTS). -DROP CAST IF EXISTS (varchar AS boolean); -CREATE CAST (varchar AS boolean) - WITH FUNCTION public.dbcli_varchar_to_boolean(varchar) - AS IMPLICIT -- DbSchemaCLI workaround: avoid keyword being last token -; +-- -- Recreate casts in an idempotent way (Postgres has no CREATE CAST IF NOT EXISTS). +-- DROP CAST IF EXISTS (varchar AS boolean); +-- CREATE CAST (varchar AS boolean) +-- WITH FUNCTION public.dbcli_varchar_to_boolean(varchar) +-- AS IMPLICIT -- DbSchemaCLI workaround: avoid keyword being last token +-- ; -DROP CAST IF EXISTS (bpchar AS boolean); -CREATE CAST (bpchar AS boolean) - WITH FUNCTION public.dbcli_bpchar_to_boolean(bpchar) - AS IMPLICIT -- DbSchemaCLI workaround: avoid keyword being last token -; +-- DROP CAST IF EXISTS (bpchar AS boolean); +-- CREATE CAST (bpchar AS boolean) +-- WITH FUNCTION public.dbcli_bpchar_to_boolean(bpchar) +-- AS IMPLICIT -- DbSchemaCLI workaround: avoid keyword being last token +-- ; diff --git a/data-tool/scripts/subset/subset_pg_cleanup_orphan_children.sql b/data-tool/scripts/subset/subset_pg_cleanup_orphan_children.sql index 345a6e55e1..2a6ba73a58 100644 --- a/data-tool/scripts/subset/subset_pg_cleanup_orphan_children.sql +++ b/data-tool/scripts/subset/subset_pg_cleanup_orphan_children.sql @@ -11,52 +11,52 @@ -- - corp-scoped rows deleted directly by corp_num are left to the regular chunk deletes -- Event-scoped children whose parent event row is missing. -DELETE FROM notification_resend t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.notification_resend t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM notification t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.notification t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM filing_user t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.filing_user t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM payment t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.payment t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM ledger_text t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.ledger_text t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM conv_ledger t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.conv_ledger t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM conv_event t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.conv_event t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM completing_party t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.completing_party t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM submitting_party t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.submitting_party t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM corp_involved_amalgamating t +DELETE FROM TARGET_SCHEMA.corp_involved_amalgamating t WHERE t.event_id IS NOT NULL - AND NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); + AND NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM corp_involved_cont_in t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.corp_involved_cont_in t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM correction t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.correction t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -DELETE FROM filing t -WHERE NOT EXISTS (SELECT 1 FROM event e WHERE e.event_id = t.event_id); +DELETE FROM TARGET_SCHEMA.filing t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.event e WHERE e.event_id = t.event_id); -- Corp-party children whose parent corp_party row is missing. -DELETE FROM party_notification t -WHERE NOT EXISTS (SELECT 1 FROM corp_party cp WHERE cp.corp_party_id = t.party_id); +DELETE FROM TARGET_SCHEMA.party_notification t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.corp_party cp WHERE cp.corp_party_id = t.party_id); -DELETE FROM offices_held t -WHERE NOT EXISTS (SELECT 1 FROM corp_party cp WHERE cp.corp_party_id = t.corp_party_id); +DELETE FROM TARGET_SCHEMA.offices_held t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.corp_party cp WHERE cp.corp_party_id = t.corp_party_id); -DELETE FROM corp_party_relationship t -WHERE NOT EXISTS (SELECT 1 FROM corp_party cp WHERE cp.corp_party_id = t.corp_party_id); +DELETE FROM TARGET_SCHEMA.corp_party_relationship t +WHERE NOT EXISTS (SELECT 1 FROM TARGET_SCHEMA.corp_party cp WHERE cp.corp_party_id = t.corp_party_id); diff --git a/data-tool/scripts/subset/subset_pg_prepare_address_stage.sql b/data-tool/scripts/subset/subset_pg_prepare_address_stage.sql index 12c00c30d3..5c8552e0d1 100644 --- a/data-tool/scripts/subset/subset_pg_prepare_address_stage.sql +++ b/data-tool/scripts/subset/subset_pg_prepare_address_stage.sql @@ -1,4 +1,4 @@ -- Prepare the shared address staging table used by subset_transfer_chunk.sql. -- This is a predeclared regular table (not TEMP) because DbSchemaCLI transfer work may use separate sessions. -TRUNCATE TABLE public.subset_address_stage; +TRUNCATE TABLE TARGET_SCHEMA.subset_address_stage; diff --git a/data-tool/scripts/subset/subset_pg_purge_bcomps_excluded.sql b/data-tool/scripts/subset/subset_pg_purge_bcomps_excluded.sql index 7c48f0e562..aa6a2ba038 100644 --- a/data-tool/scripts/subset/subset_pg_purge_bcomps_excluded.sql +++ b/data-tool/scripts/subset/subset_pg_purge_bcomps_excluded.sql @@ -15,11 +15,11 @@ -- 1) Build keysets -TRUNCATE TABLE public.subset_excluded_corp_parties; -TRUNCATE TABLE public.subset_excluded_events; -TRUNCATE TABLE public.subset_excluded_corps; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corp_parties; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_events; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corps; -INSERT INTO public.subset_excluded_corps (corp_num) +INSERT INTO TARGET_SCHEMA.subset_excluded_corps (corp_num) SELECT DISTINCT e.corp_num FROM event e JOIN filing f ON f.event_id = e.event_id @@ -28,154 +28,154 @@ WHERE e.corp_num IS NOT NULL AND u.user_id = 'BCOMPS' AND f.filing_type_cd IN ('BEINC', 'ICORP', 'ICORU', 'ICORC', 'CONTB', 'CONTI', 'CONTU', 'CONTC'); -INSERT INTO public.subset_excluded_events (event_id) +INSERT INTO TARGET_SCHEMA.subset_excluded_events (event_id) SELECT DISTINCT e.event_id FROM event e -JOIN public.subset_excluded_corps x ON x.corp_num = e.corp_num +JOIN TARGET_SCHEMA.subset_excluded_corps x ON x.corp_num = e.corp_num WHERE e.event_id IS NOT NULL; -INSERT INTO public.subset_excluded_corp_parties (corp_party_id) +INSERT INTO TARGET_SCHEMA.subset_excluded_corp_parties (corp_party_id) SELECT DISTINCT cp.corp_party_id FROM corp_party cp -JOIN public.subset_excluded_corps x ON x.corp_num = cp.corp_num +JOIN TARGET_SCHEMA.subset_excluded_corps x ON x.corp_num = cp.corp_num WHERE cp.corp_party_id IS NOT NULL; -- 2) Purge (delete child tables first) -- Event-scoped children DELETE FROM notification_resend t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM notification t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM filing_user t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM payment t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM ledger_text t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM conv_ledger t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM conv_event t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM completing_party t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM submitting_party t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM corp_involved_cont_in t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM correction t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; DELETE FROM filing t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; -- corp_involved_amalgamating can reference corp_num via ted_corp_num/ting_corp_num as well as event_id. -- Delete any rows where either side is excluded (covers non-event-owned references too). DELETE FROM corp_involved_amalgamating t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.ted_corp_num = x.corp_num OR t.ting_corp_num = x.corp_num; -- Corp-party related DELETE FROM party_notification t -USING public.subset_excluded_corp_parties x +USING TARGET_SCHEMA.subset_excluded_corp_parties x WHERE t.party_id = x.corp_party_id; DELETE FROM offices_held t -USING public.subset_excluded_corp_parties x +USING TARGET_SCHEMA.subset_excluded_corp_parties x WHERE t.corp_party_id = x.corp_party_id; DELETE FROM corp_party_relationship t -USING public.subset_excluded_corp_parties x +USING TARGET_SCHEMA.subset_excluded_corp_parties x WHERE t.corp_party_id = x.corp_party_id; DELETE FROM corp_party t -USING public.subset_excluded_corp_parties x +USING TARGET_SCHEMA.subset_excluded_corp_parties x WHERE t.corp_party_id = x.corp_party_id; -- Corp-scoped tables DELETE FROM office t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM corp_name t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM corp_state t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM corp_comments t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM corp_flag t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM cont_out t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM corp_restriction t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM jurisdiction t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM resolution t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; -- Share tables (delete deepest first) DELETE FROM share_series t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM share_struct_cls t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; DELETE FROM share_struct t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; -- Events last (many tables reference event_id) DELETE FROM event t -USING public.subset_excluded_events x +USING TARGET_SCHEMA.subset_excluded_events x WHERE t.event_id = x.event_id; -- Corporation last DELETE FROM corporation t -USING public.subset_excluded_corps x +USING TARGET_SCHEMA.subset_excluded_corps x WHERE t.corp_num = x.corp_num; -- 3) Cleanup helper tables -TRUNCATE TABLE public.subset_excluded_corp_parties; -TRUNCATE TABLE public.subset_excluded_events; -TRUNCATE TABLE public.subset_excluded_corps; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corp_parties; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_events; +TRUNCATE TABLE TARGET_SCHEMA.subset_excluded_corps; diff --git a/data-tool/scripts/subset/subset_transfer_cars.sql b/data-tool/scripts/subset/subset_transfer_cars.sql index f41876127a..0ea265f651 100644 --- a/data-tool/scripts/subset/subset_transfer_cars.sql +++ b/data-tool/scripts/subset/subset_transfer_cars.sql @@ -4,14 +4,14 @@ -- These tables are NOT corp-scoped. The full dataset is transferred without filtering. -- Volume is low enough that a full refresh is appropriate. -transfer public.carsfile from cprd using +transfer TARGET_SCHEMA.carsfile from cprd using select documtid, filedate, regiracf from carsfile; -transfer public.carsbox from cprd using +transfer TARGET_SCHEMA.carsbox from cprd using select documtid, accesnum, @@ -19,14 +19,14 @@ select boxrracf from carsbox; -transfer public.carsrept from cprd using +transfer TARGET_SCHEMA.carsrept from cprd using select documtid, docutype, compnumb from carsrept; -transfer public.carindiv from cprd using +transfer TARGET_SCHEMA.carindiv from cprd using select documtid, replace(surname, CHR(0), '') as surname, diff --git a/data-tool/scripts/subset/subset_transfer_chunk.sql b/data-tool/scripts/subset/subset_transfer_chunk.sql index 50b4e91757..0ba0a65f5c 100644 --- a/data-tool/scripts/subset/subset_transfer_chunk.sql +++ b/data-tool/scripts/subset/subset_transfer_chunk.sql @@ -33,7 +33,7 @@ -- vset oracle_corp_num_predicate=c.CORP_NUM in ('1111585','1226175'); -- corporation -transfer public.corporation from cprd using +transfer TARGET_SCHEMA.corporation from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -88,7 +88,7 @@ left join last_ar la on la.corp_num = c.corp_num; -- event -transfer public.event from cprd using +transfer TARGET_SCHEMA.event from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -121,7 +121,7 @@ where e.event_typ_cd not in ('BNUPD', 'ADDLEDGR'); -- corp_name -transfer public.corp_name from cprd using +transfer TARGET_SCHEMA.corp_name from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -152,7 +152,7 @@ join CORP_NAME cn on cn.corp_num = c.corp_num; -- corp_state -transfer public.corp_state from cprd using +transfer TARGET_SCHEMA.corp_state from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -184,7 +184,7 @@ join corp_op_state cos on cos.state_typ_cd = cs.state_typ_cd; -- filing -transfer public.filing from cprd using +transfer TARGET_SCHEMA.filing from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -231,7 +231,7 @@ join filing f on f.event_id = e.event_id; -- filing_user -transfer public.filing_user from cprd using +transfer TARGET_SCHEMA.filing_user from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -266,9 +266,9 @@ join filing_user u on u.event_id = e.event_id; -- address (shared/global table; stage then merge before loading dependents) -TRUNCATE TABLE public.subset_address_stage; +TRUNCATE TABLE TARGET_SCHEMA.subset_address_stage; -transfer public.subset_address_stage from cprd using +transfer TARGET_SCHEMA.subset_address_stage from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -346,7 +346,7 @@ from ( join address a on x.mailing_addr_id = a.addr_id ); -INSERT INTO public.address ( +INSERT INTO TARGET_SCHEMA.address ( addr_id, province, country_typ_cd, @@ -374,7 +374,7 @@ FROM ( addr_line_2, addr_line_3, city - FROM public.subset_address_stage + FROM TARGET_SCHEMA.subset_address_stage WHERE addr_id IS NOT NULL ORDER BY addr_id ) s @@ -387,11 +387,11 @@ SET province = EXCLUDED.province, addr_line_3 = EXCLUDED.addr_line_3, city = EXCLUDED.city; -TRUNCATE TABLE public.subset_address_stage; +TRUNCATE TABLE TARGET_SCHEMA.subset_address_stage; -- office -transfer public.office from cprd using +transfer TARGET_SCHEMA.office from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -423,7 +423,7 @@ join office o on o.corp_num = c.corp_num; -- corp_comments -transfer public.corp_comments from cprd using +transfer TARGET_SCHEMA.corp_comments from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -457,7 +457,7 @@ join corp_comments cc on cc.corp_num = c.corp_num; -- ledger_text -transfer public.ledger_text from cprd using +transfer TARGET_SCHEMA.ledger_text from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -488,7 +488,7 @@ join ledger_text l on l.event_id = e.event_id; -- corp_party -transfer public.corp_party from cprd using +transfer TARGET_SCHEMA.corp_party from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -531,7 +531,7 @@ join corp_party p on p.corp_num = c.corp_num; -- corp_party_relationship -transfer public.corp_party_relationship from cprd using +transfer TARGET_SCHEMA.corp_party_relationship from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -560,7 +560,7 @@ join CORP_PARTY_RELATIONSHIP cpr on cpr.corp_party_id = p.corp_party_id; -- offices_held -transfer public.offices_held from cprd using +transfer TARGET_SCHEMA.offices_held from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -589,7 +589,7 @@ join OFFICES_HELD oh on oh.corp_party_id = p.corp_party_id; -- completing_party -transfer public.completing_party from cprd using +transfer TARGET_SCHEMA.completing_party from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -622,7 +622,7 @@ join completing_party cp on cp.event_id = e.event_id; -- submitting_party -transfer public.submitting_party from cprd using +transfer TARGET_SCHEMA.submitting_party from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -663,7 +663,7 @@ join SUBMITTING_PARTY sp on sp.event_id = e.event_id; -- corp_flag -transfer public.corp_flag from cprd using +transfer TARGET_SCHEMA.corp_flag from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -693,7 +693,7 @@ join corp_flag cf on cf.corp_num = c.corp_num; -- cont_out -transfer public.cont_out from cprd using +transfer TARGET_SCHEMA.cont_out from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -726,7 +726,7 @@ join CONT_OUT co on co.corp_num = c.corp_num; -- conv_event -transfer public.conv_event from cprd using +transfer TARGET_SCHEMA.conv_event from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -765,7 +765,7 @@ join CONV_EVENT ce on ce.event_id = e.event_id; -- conv_ledger -transfer public.conv_ledger from cprd using +transfer TARGET_SCHEMA.conv_ledger from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -796,7 +796,7 @@ join CONV_LEDGER cl on cl.event_id = e.event_id; -- corp_involved - amalgamaTING_businesses -transfer public.corp_involved_amalgamating from cprd using +transfer TARGET_SCHEMA.corp_involved_amalgamating from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -864,7 +864,7 @@ where f.filing_typ_cd in ('AMALH', 'AMALV', 'AMALR', 'AMLHU', 'AMLVU', 'AMLRU', -- corp_involved - continue_in_historical_xpro -transfer public.corp_involved_cont_in from cprd using +transfer TARGET_SCHEMA.corp_involved_cont_in from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -895,7 +895,7 @@ where f.filing_typ_cd in ('CONTI', 'CONTU', 'CONTC') -- corp_restriction -transfer public.corp_restriction from cprd using +transfer TARGET_SCHEMA.corp_restriction from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -929,7 +929,7 @@ join CORP_RESTRICTION cr on cr.corp_num = c.corp_num; -- correction -transfer public.correction from cprd using +transfer TARGET_SCHEMA.correction from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -959,7 +959,7 @@ join CORRECTION corr on corr.event_id = e.event_id; -- continued_in_from_jurisdiction -transfer public.jurisdiction from cprd using +transfer TARGET_SCHEMA.jurisdiction from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -994,7 +994,7 @@ join JURISDICTION j on j.corp_num = c.corp_num; -- resolution -transfer public.resolution from cprd using +transfer TARGET_SCHEMA.resolution from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1025,7 +1025,7 @@ join RESOLUTION r on r.corp_num = c.corp_num; -- share_struct -transfer public.share_struct from cprd using +transfer TARGET_SCHEMA.share_struct from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1054,7 +1054,7 @@ join SHARE_STRUCT ss on ss.corp_num = c.corp_num; -- share_struct_cls -transfer public.share_struct_cls from cprd using +transfer TARGET_SCHEMA.share_struct_cls from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1103,7 +1103,7 @@ join SHARE_STRUCT_CLS ssc on ssc.corp_num = c.corp_num; -- share_series -transfer public.share_series from cprd using +transfer TARGET_SCHEMA.share_series from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1145,7 +1145,7 @@ join SHARE_SERIES ss on ss.corp_num = c.corp_num; -- notification -transfer public.notification from cprd using +transfer TARGET_SCHEMA.notification from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1181,7 +1181,7 @@ join NOTIFICATION n on n.event_id = e.event_id; -- notification_resend -transfer public.notification_resend from cprd using +transfer TARGET_SCHEMA.notification_resend from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1217,7 +1217,7 @@ join NOTIFICATION_RESEND nr on nr.event_id = e.event_id; -- party_notification -transfer public.party_notification from cprd using +transfer TARGET_SCHEMA.party_notification from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c @@ -1254,7 +1254,7 @@ join PARTY_NOTIFICATION pn on pn.party_id = cp.corp_party_id; -- payment -transfer public.payment from cprd using +transfer TARGET_SCHEMA.payment from cprd using with corp_list as ( select /*+ materialize */ c.corp_num from corporation c