diff --git a/libensemble/gen_classes/aposmm.py b/libensemble/gen_classes/aposmm.py index 543de53c5..8a55efa12 100644 --- a/libensemble/gen_classes/aposmm.py +++ b/libensemble/gen_classes/aposmm.py @@ -7,11 +7,11 @@ from gest_api.vocs import VOCS from numpy import typing as npt -from libensemble.generators import PersistentGenInterfacer -from libensemble.message_numbers import EVAL_GEN_TAG, PERSIS_STOP +from libensemble.generators import LibensembleGenerator +from libensemble.utils.misc import unmap_numpy_array -class APOSMM(PersistentGenInterfacer): +class APOSMM(LibensembleGenerator): """ APOSMM coordinates multiple local optimization runs, dramatically reducing time for discovering multiple minima on parallel systems. @@ -188,15 +188,32 @@ def __init__( **kwargs, ) -> None: - from libensemble.gen_funcs.persistent_aposmm import aposmm + from libensemble.gen_funcs.aposmm_localopt_support import LocalOptInterfacer + from libensemble.gen_funcs.persistent_aposmm import ( + add_k_sample_points_to_local_H, + add_to_local_H, + decide_where_to_start_localopt, + initialize_APOSMM, + initialize_children, + initialize_dists_and_inds, + update_history_dist, + update_history_optimal, + ) + + # Store references to the functions we'll call later + self._add_k_sample_points = add_k_sample_points_to_local_H + self._add_to_local_H = add_to_local_H + self._decide_where_to_start = decide_where_to_start_localopt + self._initialize_dists_and_inds = initialize_dists_and_inds + self._update_history_dist = update_history_dist + self._update_history_optimal = update_history_optimal + self._LocalOptInterfacer = LocalOptInterfacer self.vocs = vocs gen_specs = {} gen_specs["user"] = {} persis_info = {} - libE_info = {} - gen_specs["gen_f"] = aposmm n = len(list(vocs.variables.keys())) if not rk_const: @@ -222,7 +239,10 @@ def __init__( if val is not None: gen_specs["user"][k] = val - super().__init__(vocs, History, persis_info, gen_specs, libE_info, **kwargs) + super().__init__(vocs, History, persis_info, gen_specs, {}, **kwargs) + + # APOSMM manages sim_id internally — don't remap to _id + self.variables_mapping.pop("sim_id", None) # Set bounds using the correct x mapping x_mapping = self.variables_mapping["x"] @@ -268,115 +288,265 @@ def __init__( if "components" in kwargs or "components" in gen_specs.get("user", {}): gen_specs["persis_in"].append("fvec") - # SH - Need to know if this is gen_on_manager or not. - self.persis_info["nworkers"] = gen_specs["user"].get("max_active_runs") + # Initialize APOSMM internal state directly (no subprocess) + user_specs = gen_specs["user"] + libE_info = {"comm": []} # no comm needed in direct mode + self._n, self._n_s, self._rk_const, self._ld, self._mu, self._nu, _, self.local_H = initialize_APOSMM( + History, user_specs, libE_info + ) + ( + self._local_opters, + self._sim_id_to_child_inds, + self._run_order, + self._run_pts, + self._total_runs, + self._ended_runs, + self._fields_to_pass, + ) = initialize_children(user_specs) + + self._user_specs = user_specs + self._max_active_runs = max_active_runs + + # Build reverse mapping: VOCS field name -> (internal_name, index) + self._reverse_mapping = {} + for internal_name, vocs_names in self.variables_mapping.items(): + for i, vocs_name in enumerate(vocs_names): + self._reverse_mapping[vocs_name] = (internal_name, i, len(vocs_names)) + self.all_local_minima = [] - self._suggest_idx = 0 - self._last_suggest = None - self._ingest_buf = None - self._n_buffd_results = 0 self._told_initial_sample = False self._first_called_method = None - self._last_call = None - self._last_num_points = 0 + self._pending_results = None + self._first_pass = True + self._n_r = 0 # number of results received in last ingest + self._initial_sample_generated = False + self._initial_suggest_idx = 0 # tracks how many initial sample points have been handed out + + def _map_to_internal(self, results): + """Map VOCS-named structured array to internal APOSMM field names (x, x_on_cube, f, sim_id).""" + if results is None or len(results) == 0: + return results + # If already has internal names, return as-is + if "x" in results.dtype.names and "f" in results.dtype.names: + return results + + n_rows = len(results) + # Build dtype for internal array + internal_fields = [] + added = set() + for vocs_name in results.dtype.names: + if vocs_name in self._reverse_mapping: + internal_name, _, size = self._reverse_mapping[vocs_name] + if internal_name not in added: + if size > 1: + internal_fields.append((internal_name, float, size)) + else: + internal_fields.append((internal_name, float)) + added.add(internal_name) + elif vocs_name == "_id": + if "sim_id" not in added: + internal_fields.append(("sim_id", int)) + added.add("sim_id") + elif vocs_name == "sim_id": + if "sim_id" not in added: + internal_fields.append(("sim_id", int)) + added.add("sim_id") + + out = np.zeros(n_rows, dtype=internal_fields) + has_sim_id = "sim_id" in results.dtype.names + for vocs_name in results.dtype.names: + if vocs_name in self._reverse_mapping: + internal_name, idx, size = self._reverse_mapping[vocs_name] + if size > 1: + out[internal_name][:, idx] = results[vocs_name] + else: + out[internal_name] = results[vocs_name] + elif vocs_name == "sim_id": + out["sim_id"] = results["sim_id"] + elif vocs_name == "_id" and not has_sim_id: + out["sim_id"] = results["_id"] + + return out def _slot_in_data(self, results): - """Slot in libE_calc_in and trial data into corresponding array fields. *Initial sample only!!*""" - for name in results.dtype.names: - if name == "_id": - self._ingest_buf["sim_id"][self._n_buffd_results : self._n_buffd_results + len(results)] = results[ - "_id" - ] - else: - self._ingest_buf[name][self._n_buffd_results : self._n_buffd_results + len(results)] = results[name] - - def _enough_initial_sample(self): - return ( - self._n_buffd_results >= int(self.gen_specs["user"]["initial_sample_size"]) - ) or self._told_initial_sample - - def _ready_to_suggest_genf(self): - """ - We're presumably ready to be suggested IF: - - When we're working on the initial sample: - - We have no _last_suggest cached - - all points given out have returned AND we've been suggested *at least* as many points as we cached - - When we're done with the initial sample: - - we've been suggested *at least* as many points as we cached - - we've just ingested some results - """ - if not self._told_initial_sample and self._last_suggest is not None: - cond = all([i in self._ingest_buf["sim_id"] for i in self._last_suggest["sim_id"]]) - else: - cond = True - return self._last_suggest is None or (cond and (self._suggest_idx >= len(self._last_suggest))) + """Slot ingested results into local_H during initial sample phase.""" + n_s_before = self._n_s + n_new = len(results) + old_len = len(self.local_H) + needed = n_s_before + n_new + if needed > old_len: + self.local_H.resize(needed, refcheck=False) + self._initialize_dists_and_inds(self.local_H, needed - old_len) + + for i, row in enumerate(results): + idx = n_s_before + i + self.local_H["sim_id"][idx] = idx + for name in results.dtype.names: + if name == "sim_id": + continue + if name in self.local_H.dtype.names: + self.local_H[name][idx] = row[name] + self.local_H["sim_ended"][idx] = True + self._n_s += n_new + self._update_history_dist(self.local_H, self._n) def suggest_numpy(self, num_points: int = 0) -> npt.NDArray: """Request the next set of points to evaluate, as a NumPy array.""" + out_fields = [i[0] for i in self.gen_specs["out"]] if self._first_called_method is None: self._first_called_method = "suggest" - self.gen_specs["user"]["generate_sample_points"] = True - if self._ready_to_suggest_genf(): - self._suggest_idx = 0 - if self._last_call == "suggest" and num_points == 0 and self._last_num_points == 0: - self.finalize() - raise RuntimeError("Cannot suggest points since APOSMM is currently expecting to receive a sample") - self._last_suggest = super().suggest_numpy(num_points) + # Initial sample phase: generate random points once, return in batches + if not self._told_initial_sample: + if not self._initial_sample_generated: + total = self._user_specs["initial_sample_size"] + self._add_k_sample_points( + total, self._user_specs, self.persis_info, + self._n, [], self.local_H, self._sim_id_to_child_inds, + ) + self._initial_sample_generated = True + self._initial_suggest_idx = 0 + + k = num_points if num_points > 0 else (self._user_specs["initial_sample_size"] - self._initial_suggest_idx) + start = self._initial_suggest_idx + end = min(start + k, self._user_specs["initial_sample_size"]) + result = self.local_H[start:end][out_fields].copy() + self._initial_suggest_idx = end + return unmap_numpy_array(result, self.variables_mapping) + + # Main optimization phase + new_opt_inds = [] + new_inds = [] + + # Process any pending ingested results through local optimizers + if self._pending_results is not None: + from libensemble.gen_funcs.aposmm_localopt_support import ConvergedMsg + + calc_in = self._pending_results + self._pending_results = None + + # Update local_H with received results + for row in calc_in: + sim_id = int(row["sim_id"]) + self.local_H[sim_id]["sim_ended"] = True + for name in calc_in.dtype.names: + if name in self.local_H.dtype.names: + self.local_H[name][sim_id] = row[name] + self._n_s = int(np.sum(~self.local_H["local_pt"][:len(self.local_H)])) + self._update_history_dist(self.local_H, self._n) + + for row in calc_in: + sim_id = int(row["sim_id"]) + if self._sim_id_to_child_inds.get(sim_id): + for child_idx in self._sim_id_to_child_inds[sim_id]: + if child_idx not in self._local_opters: + continue + x_new = self._local_opters[child_idx].iterate(row[self._fields_to_pass]) + if isinstance(x_new, ConvergedMsg): + x_opt = x_new.x + opt_flag = x_new.opt_flag + opt_ind = self._update_history_optimal( + x_opt, opt_flag, self.local_H, self._run_order[child_idx], + ) + new_opt_inds.append(opt_ind) + self._local_opters.pop(child_idx) + self._ended_runs.append(child_idx) + else: + self._add_to_local_H(self.local_H, x_new, self._user_specs, local_flag=1, on_cube=True) + new_inds.append(len(self.local_H) - 1) + self._run_order[child_idx].append(self.local_H[-1]["sim_id"]) + self._run_pts[child_idx].append(x_new) + sid = self.local_H[-1]["sim_id"] + if sid in self._sim_id_to_child_inds: + self._sim_id_to_child_inds[sid] += (child_idx,) + else: + self._sim_id_to_child_inds[sid] = (child_idx,) + + # Decide where to start new local optimization runs + starting_inds = self._decide_where_to_start( + self.local_H, self._n, self._n_s, self._rk_const, self._ld, self._mu, self._nu, + ) + + for ind in starting_inds: + if len([p for p in self._local_opters.values() if p.is_running]) < self._max_active_runs: + self.local_H["started_run"][ind] = 1 + local_opter = self._LocalOptInterfacer( + self._user_specs, + self.local_H[ind]["x_on_cube"], + self.local_H[ind]["f"] if "f" in self._fields_to_pass else self.local_H[ind]["fvec"], + self.local_H[ind]["grad"] if "grad" in self._fields_to_pass else None, + ) + self._local_opters[self._total_runs] = local_opter + x_new = local_opter.iterate(self.local_H[ind][self._fields_to_pass]) + self._add_to_local_H(self.local_H, x_new, self._user_specs, local_flag=1, on_cube=True) + new_inds.append(len(self.local_H) - 1) + self._run_order[self._total_runs] = [ind, self.local_H[-1]["sim_id"]] + self._run_pts[self._total_runs] = [self.local_H["x_on_cube"], x_new] + sid = self.local_H[-1]["sim_id"] + if sid in self._sim_id_to_child_inds: + self._sim_id_to_child_inds[sid] += (self._total_runs,) + else: + self._sim_id_to_child_inds[sid] = (self._total_runs,) + self._total_runs += 1 + + # Fill remaining slots with sample points + if self._first_pass: + num_samples = self._max_active_runs - 1 - len(new_inds) + self._first_pass = False + else: + num_samples = self._n_r - len(new_inds) - if self._last_suggest["local_min"].any(): # filter out local minima rows - min_idxs = self._last_suggest["local_min"] - self.all_local_minima.append(self._last_suggest[min_idxs]) - self._last_suggest = self._last_suggest[~min_idxs] + if num_samples > 0: + self._add_k_sample_points( + num_samples, self._user_specs, self.persis_info, + self._n, [], self.local_H, self._sim_id_to_child_inds, + ) + new_inds = new_inds + list(range(len(self.local_H) - num_samples, len(self.local_H))) - if num_points > 0: # we've been suggested for a selection of the last suggest - results = np.copy(self._last_suggest[self._suggest_idx : self._suggest_idx + num_points]) - self._suggest_idx += num_points + all_inds = new_inds + new_opt_inds + if len(all_inds) == 0: + return np.zeros(0, dtype=[(name, self.local_H.dtype[name]) for name in out_fields]) - else: - results = np.copy(self._last_suggest) - self._last_suggest = None + result = self.local_H[all_inds][out_fields].copy() - self._last_call = "suggest" - self._last_num_points = num_points - return results + # Track local minima for suggest_updates() + if result["local_min"].any(): + min_idxs = result["local_min"] + self.all_local_minima.append(result[min_idxs].copy()) - def ingest_numpy(self, results: npt.NDArray, tag: int = EVAL_GEN_TAG) -> None: + return unmap_numpy_array(result, self.variables_mapping) - if self._first_called_method is None: - self._first_called_method = "ingest" - self.gen_specs["user"]["generate_sample_points"] = False + def ingest_numpy(self, results: npt.NDArray, tag: int = 0) -> None: + """Send the results of evaluations to the generator.""" - if (results is None and tag == PERSIS_STOP) or self._told_initial_sample: - super().ingest_numpy(results, tag) - self._last_call = "ingest" + if results is None: return - # Initial sample buffering here: - - if self._n_buffd_results == 0: - # Create a dtype that includes sim_id but excludes _id - descr = [d for d in results.dtype.descr if d[0] != "_id"] - if "sim_id" not in [d[0] for d in descr]: - descr.append(("sim_id", int)) - self._ingest_buf = np.zeros(self.gen_specs["user"]["initial_sample_size"], dtype=descr) + if self._first_called_method is None: + self._first_called_method = "ingest" - if not self._enough_initial_sample(): - self._slot_in_data(np.copy(results)) - self._n_buffd_results += len(results) + results = self._map_to_internal(results) - if self._enough_initial_sample(): - if "sim_id" in results.dtype.names and not self._told_initial_sample: - self._ingest_buf["sim_id"] = range(len(self._ingest_buf)) - super().ingest_numpy(self._ingest_buf, tag) - self._told_initial_sample = True - self._n_buffd_results = 0 + if not self._told_initial_sample: + # Initial sample phase: slot data into local_H + self._slot_in_data(results) + if self._n_s >= self._user_specs["initial_sample_size"]: + self._told_initial_sample = True + return - self._last_call = "ingest" + # Main phase: buffer results for processing in next suggest call + self._n_r = len(results) + self._pending_results = results.copy() def suggest_updates(self) -> List[npt.NDArray]: """Request a list of NumPy arrays containing entries that have been identified as minima.""" minima = copy.deepcopy(self.all_local_minima) self.all_local_minima = [] return minima + + def finalize(self) -> None: + """Stop all local optimizer processes.""" + for _, p in self._local_opters.items(): + p.destroy() + self._local_opters.clear() diff --git a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py index ec7615fdb..0dfdf0c61 100644 --- a/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py +++ b/libensemble/tests/regression_tests/test_asktell_aposmm_nlopt.py @@ -120,12 +120,14 @@ def six_hump_camel_func(x): # Perform the run if workflow.is_manager and run == 0: - print("[Manager]:", H[np.where(H["local_min"])]["x"]) + lm = H[H["local_min"]] + x_min = np.column_stack([lm["core"], lm["edge"]]) + print("[Manager]:", x_min) print("[Manager]: Time taken =", time() - start_time, flush=True) tol = 1e-5 for m in minima: # The minima are known on this test problem. # We use their values to test APOSMM has identified all minima - print(np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)), flush=True) - assert np.min(np.sum((H[H["local_min"]]["x"] - m) ** 2, 1)) < tol + print(np.min(np.sum((x_min - m) ** 2, 1)), flush=True) + assert np.min(np.sum((x_min - m) ** 2, 1)) < tol