Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/master-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: recursive

- name: Setup
run: source /home/hicr/.bashrc && meson setup build -Dengines=mpi,cloudr -Db_coverage=true -DbuildTests=true -DbuildExamples=true -DcompileWarningsAsErrors=false
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/pr-development-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: recursive

- name: Setup
run: source /home/hicr/.bashrc && meson setup build -Db_coverage=true -Dengines=mpi,cloudr -DbuildTests=true -DbuildExamples=true -DcompileWarningsAsErrors=false
Expand Down
104 changes: 55 additions & 49 deletions examples/basic/basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int main(int argc, char *argv[])
// Creating HWloc topology object
hwloc_topology_t hwlocTopologyObject;

// Reserving memory for hwloc
// Reserving memory for hwloc
hwloc_topology_init(&hwlocTopologyObject);

// Initializing host (CPU) topology manager
Expand Down Expand Up @@ -51,20 +51,20 @@ int main(int argc, char *argv[])
HiCR::backend::pthreads::Core core(1);

// Getting managers
auto instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv);
auto communicationManager = std::make_shared<HiCR::backend::mpi::CommunicationManager>();
auto memoryManager = std::make_shared<HiCR::backend::mpi::MemoryManager>();
auto workerComputeManager = std::make_shared<HiCR::backend::pthreads::ComputeManager>();
auto taskComputeManager = std::make_shared<HiCR::backend::boost::ComputeManager>();
auto instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv);
auto communicationManager = std::make_shared<HiCR::backend::mpi::CommunicationManager>();
auto memoryManager = std::make_shared<HiCR::backend::mpi::MemoryManager>();
auto workerComputeManager = std::make_shared<HiCR::backend::pthreads::ComputeManager>();
auto taskComputeManager = std::make_shared<HiCR::backend::boost::ComputeManager>();

// Creating taskr object
nlohmann::json taskrConfig;
taskrConfig["Task Worker Inactivity Time (Ms)"] = 100; // Suspend workers if a certain time of inactivity elapses
taskrConfig["Task Suspend Interval Time (Ms)"] = 100; // Workers suspend for this time before checking back
taskrConfig["Minimum Active Task Workers"] = 1; // Have at least one worker active at all times
taskrConfig["Service Worker Count"] = 1; // Have one dedicated service workers at all times to listen for incoming messages
taskrConfig["Make Task Workers Run Services"] = true; // Workers will check for meta messages in between executions
auto taskr = std::make_unique<taskr::Runtime>(taskComputeManager.get(), workerComputeManager.get(), computeResources, taskrConfig);
taskrConfig["Task Worker Inactivity Time (Ms)"] = 100; // Suspend workers if a certain time of inactivity elapses
taskrConfig["Task Suspend Interval Time (Ms)"] = 100; // Workers suspend for this time before checking back
taskrConfig["Minimum Active Task Workers"] = 1; // Have at least one worker active at all times
taskrConfig["Service Worker Count"] = 1; // Have one dedicated service workers at all times to listen for incoming messages
taskrConfig["Make Task Workers Run Services"] = true; // Workers will check for meta messages in between executions
auto taskr = std::make_unique<taskr::Runtime>(taskComputeManager.get(), workerComputeManager.get(), computeResources, taskrConfig);

// Instantiate RPC Engine
auto rpcEngine = std::make_shared<HiCR::frontend::RPCEngine>(*communicationManager, *instanceManager, *memoryManager, *workerComputeManager, bufferMemorySpace, computeResource);
Expand Down Expand Up @@ -95,7 +95,7 @@ int main(int argc, char *argv[])

// Parsing request file contents to a JSON object
std::ifstream hllmConfigFs(hllmConfigFilePath);
auto hllmConfigJs = nlohmann::json::parse(hllmConfigFs);
auto hllmConfigJs = nlohmann::json::parse(hllmConfigFs);

// Parsing config file using hLLM
deployment.deserialize(hllmConfigJs);
Expand All @@ -114,7 +114,7 @@ int main(int argc, char *argv[])
auto instance = *instanceManager->getInstances().begin();

// Assigning instance ids to the partitions
for (auto p : deployment.getPartitions())
for (auto p : deployment.getPartitions())
{
// Getting instance Id that will run this partition (only one)
const auto partitionInstanceId = instance->getId();
Expand All @@ -129,7 +129,7 @@ int main(int argc, char *argv[])
p->addReplica(replica);
}
}

// printf("%s\n", deployment.serialize().dump(2).c_str());
}

Expand All @@ -138,7 +138,7 @@ int main(int argc, char *argv[])

// Before deploying, we need to indicate what communication and memory managers to assign to each of the edges
// This allows for flexibility to choose in which devices to place the payload and coordination buffers
for (const auto& edge : hllm.getDeployment().getEdges())
for (const auto &edge : hllm.getDeployment().getEdges())
{
edge->setPayloadCommunicationManager(communicationManager.get());
edge->setPayloadMemoryManager(memoryManager.get());
Expand All @@ -154,19 +154,18 @@ int main(int argc, char *argv[])

// Setting managers for partition-wise control messaging
hllm.getDeployment().getControlBuffer().communicationManager = communicationManager.get();
hllm.getDeployment().getControlBuffer().memoryManager = memoryManager.get();
hllm.getDeployment().getControlBuffer().memorySpace = bufferMemorySpace;
hllm.getDeployment().getControlBuffer().memoryManager = memoryManager.get();
hllm.getDeployment().getControlBuffer().memorySpace = bufferMemorySpace;

// Declaring local value outside the functions for them to persist, but particular to each replica
std::vector<float> cathetusSquaredSummedOutput;
cathetusSquaredSummedOutput.resize(_REPLICAS_PER_PARTITION);

// Declaring the hLLM tasks for the application
hllm.registerFunction("Listen Request", [&](hLLM::Task *task)
{
hllm.registerFunction("Listen Request", [&](hLLM::Task *task) {
// Getting raw request
const auto &requestMemSlot = task->getInput("Catheti");
const auto request = std::string((const char *)requestMemSlot->getPointer());
const auto request = std::string((const char *)requestMemSlot->getPointer());

// Getting catheti values
float cathetusAoutput;
Expand All @@ -182,11 +181,10 @@ int main(int argc, char *argv[])
memoryManager->deregisterLocalMemorySlot(cathetusBMemorySlot);
});

hllm.registerFunction("Square Cathetus A", [&](hLLM::Task *task)
{
hllm.registerFunction("Square Cathetus A", [&](hLLM::Task *task) {
// Getting input
const auto &cathetusMemSlot = task->getInput("Cathetus A");
const float* cathetusA = (float *)cathetusMemSlot->getPointer();
const auto &cathetusMemSlot = task->getInput("Cathetus A");
const float *cathetusA = (float *)cathetusMemSlot->getPointer();

// Squaring cathetus
float cathetusASquaredOutput = (*cathetusA) * (*cathetusA);
Expand All @@ -197,11 +195,10 @@ int main(int argc, char *argv[])
memoryManager->deregisterLocalMemorySlot(cathetusASquaredMemorySlot);
});

hllm.registerFunction("Square Cathetus B", [&](hLLM::Task *task)
{
hllm.registerFunction("Square Cathetus B", [&](hLLM::Task *task) {
// Getting input
const auto &cathetusMemSlot = task->getInput("Cathetus B");
const float* cathetusB = (float *)cathetusMemSlot->getPointer();
const auto &cathetusMemSlot = task->getInput("Cathetus B");
const float *cathetusB = (float *)cathetusMemSlot->getPointer();

// Squaring cathetus
float cathetusBSquaredOutput = (*cathetusB) * (*cathetusB);
Expand All @@ -212,13 +209,12 @@ int main(int argc, char *argv[])
memoryManager->deregisterLocalMemorySlot(cathetusBSquaredMemorySlot);
});

hllm.registerFunction("Sum Catheti Squares", [&](hLLM::Task *task)
{
hllm.registerFunction("Sum Catheti Squares", [&](hLLM::Task *task) {
// Getting inputs
const auto &cathetusASquaredMemSlot = task->getInput("Cathetus A Squared");
const float* cathetusAsquared = (float *)cathetusASquaredMemSlot->getPointer();
const auto &cathetusBSquaredMemSlot = task->getInput("Cathetus B Squared");
const float* cathetusBsquared = (float *)cathetusBSquaredMemSlot->getPointer();
const auto &cathetusASquaredMemSlot = task->getInput("Cathetus A Squared");
const float *cathetusAsquared = (float *)cathetusASquaredMemSlot->getPointer();
const auto &cathetusBSquaredMemSlot = task->getInput("Cathetus B Squared");
const float *cathetusBsquared = (float *)cathetusBSquaredMemSlot->getPointer();

// Getting my replica id
const auto replicaId = task->getReplicaIdx();
Expand All @@ -227,8 +223,7 @@ int main(int argc, char *argv[])
cathetusSquaredSummedOutput[replicaId] = (*cathetusAsquared) + (*cathetusBsquared);
});

hllm.registerFunction("Square Root Sum", [&](hLLM::Task *task)
{
hllm.registerFunction("Square Root Sum", [&](hLLM::Task *task) {
// Getting my replica id
const auto replicaId = task->getReplicaIdx();

Expand All @@ -246,12 +241,12 @@ int main(int argc, char *argv[])
if (isRoot)
{
// RNG for wait time between prompts
std::default_random_engine promptTimeRandomEngine;
std::uniform_real_distribution<float> promptTimeRandomDistribution(0.0, 1.0);
std::default_random_engine promptTimeRandomEngine;
std::uniform_real_distribution<float> promptTimeRandomDistribution(0.0, 1.0);

// RNG for catheti values
std::default_random_engine cathetiRandomEngine;
std::uniform_real_distribution<float> cathetiRandomDistribution(0.1, 10.0);
std::default_random_engine cathetiRandomEngine;
std::uniform_real_distribution<float> cathetiRandomDistribution(0.1, 10.0);

// Counter for the finished threads
std::atomic<size_t> finishedPromptThreads = 0;
Expand All @@ -260,11 +255,10 @@ int main(int argc, char *argv[])
const auto tolerance = 0.0001;

for (size_t i = 0; i < _PROMPT_THREAD_COUNT; i++)
promptThreads.push_back(std::make_unique<std::thread>([&, i]()
{
promptThreads.push_back(std::make_unique<std::thread>([&, i]() {
// Wait until the hLLM has deployed
while (hllm.isDeployed() == false);

// Now create session
auto session = hllm.createSession();
printf("[User %04lu] Created Session: %lu\n", i, session->getSessionId());
Expand All @@ -287,19 +281,30 @@ int main(int argc, char *argv[])
// printf("[User] Sent prompt (%lu/%lu): '%s'\n", promptId.first, promptId.second, prompt->getPrompt().c_str());

// Wait until the prompt receives a response
while(prompt->hasResponse() == false);
while (prompt->hasResponse() == false);

// Getting response
const float response = *(float*)prompt->getResponse().data();
const float response = *(float *)prompt->getResponse().data();

// Calculating
const float error = std::abs(sqrtf(cathetusA * cathetusA + cathetusB * cathetusB) - response);

// Printing response
printf("[User %04lu] Got response: %f for prompt %lu/%lu: '%s'. |Error|: %f (< tolerance: %f)\n", i, response, promptId.first, promptId.second, prompt->getPrompt().c_str(), error, tolerance);
printf("[User %04lu] Got response: %f for prompt %lu/%lu: '%s'. |Error|: %f (< tolerance: %f)\n",
i,
response,
promptId.first,
promptId.second,
prompt->getPrompt().c_str(),
error,
tolerance);

// Verifying result
if (error > tolerance) { fprintf(stderr, "Response error is higher than tolerance, aborting...\n"); exit(-1); }
if (error > tolerance)
{
fprintf(stderr, "Response error is higher than tolerance, aborting...\n");
exit(-1);
}

// Waiting a random amount of time before sending the next prompt
usleep(100000.0 * promptTimeRandomDistribution(promptTimeRandomEngine));
Expand All @@ -318,7 +323,8 @@ int main(int argc, char *argv[])
hllm.deploy(deployment);

// // Waiting for prompt thread to finish
if (isRoot) for (auto& thread : promptThreads) thread->join();
if (isRoot)
for (auto &thread : promptThreads) thread->join();

// Finalize Instance Manager
instanceManager->finalize();
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if get_option('buildTests')
args: [
'-np', '1',
'--oversubscribe', e.full_path(),
meson.current_source_dir() + '/config.json',
meson.current_source_dir() + '/policy.json',
],
timeout: 60,
suite: testSuite,
Expand Down
Loading
Loading