@ -0,0 +1,45 @@
# Import necessary libraries
from transformers import AutoModelForCausalLM, AutoTokenizer
from pydantic import BaseModel
# from swarms import ToolAgent
from swarms.utils.json_utils import base_model_schema_to_json
# Load the pre-trained model and tokenizer
model = AutoModelForCausalLM.from_pretrained(
tokenizer = AutoTokenizer.from_pretrained("databricks/dolly-v2-12b")
class Schema(BaseModel):
name: str
agent: int
is_student: bool
courses: list[str]
json_schema = str(base_model_schema_to_json(Schema))
# # Define the task to generate a person's information
# task = (
# "Generate a person's information based on the following schema:"
# )
# # Create an instance of the ToolAgent class
# agent = ToolAgent(
# name="dolly-function-agent",
# description="Ana gent to create a child data",
# model=model,
# tokenizer=tokenizer,
# json_schema=json_schema,
# )
# # Run the agent to generate the person's information
# generated_data =
# # Print the generated data
# print(f"Generated data: {generated_data}")
@ -1,93 +0,0 @@
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::types::IntoPyDict;
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::thread;
fn rust_module(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(concurrent_exec, m)?)?;
/// This function wraps Python code in Rust concurrency for ultra high performance.
/// # Arguments
/// * `py_codes` - A vector of string slices that holds the Python codes to be executed.
/// * `timeout` - An optional duration to specify a timeout for the Python code execution.
/// * `num_threads` - The number of threads to use for executing the Python code.
/// * `error_handler` - A function to handle errors during Python code execution.
/// * `log_function` - A function to log the execution of the Python code.
/// * `result_handler` - A function to handle the results of the Python code execution.
/// # Example
/// ```
/// let py_codes = vec!["print('Hello, World!')", "print('Hello, Rust!')"];
/// let timeout = Some(Duration::from_secs(5));
/// let num_threads = 4;
/// let error_handler = |e| eprintln!("Error: {}", e);
/// let log_function = |s| println!("Log: {}", s);
/// let result_handler = |r| println!("Result: {:?}", r);
/// execute_python_codes(py_codes, timeout, num_threads, error_handler, log_function, result_handler);
/// ```
pub fn concurrent_exec<F, G, H>(
py_codes: Vec<&str>,
timeout: Option<Duration>,
num_threads: usize,
error_handler: F,
log_function: G,
result_handler: H,
) -> PyResult<Vec<PyResult<()>>>
F: Fn(&str),
G: Fn(&str),
H: Fn(&PyResult<()>),
let gil = Python::acquire_gil();
let py = gil.python();
let py_codes = Arc::new(Mutex::new(py_codes));
let results = Arc::new(Mutex::new(Vec::new()));
let pool = ThreadPool::new(num_threads);
pool.install(|| {
py_codes.par_iter().for_each(|code| {
let locals = [("__name__", "__main__")].into_py_dict(py);
let globals = [("__name__", "__main__")].into_py_dict(py);
log_function(&format!("Executing Python code: {}", code));
let result =, Some(globals), Some(locals));
match timeout {
Some(t) => {
let now = Instant::now();
let timeout_thread = thread::spawn(move || {
while now.elapsed() < t {
if let Ok(_) = result {
if now.elapsed() >= t {
error_handler(&format!("Python code execution timed out: {}", code));
None => {}
@ -1,71 +0,0 @@
use pyo3::prelude::*;
use rustacuda::prelude::*;
use rustacuda::memory::DeviceBox;
use std::error::Error;
use std::ffi::CString;
fn rust_cuda(_py: Python, m: &PyModule) -> PyResult<()> {
#[pyfn(m, "execute_on_device")]
fn execute_on_device(py: Python, device_id: u32, a: f32, b: f32) -> PyResult<f32> {
/// The result of executing the CUDA operation.
let result = py.allow_threads(|| {
execute_cuda(device_id, a, b)
match result {
Ok(res) => Ok(res),
Err(err) => Err(PyErr::new::<pyo3::exceptions::PyException, _>(format!("{}", err))),
fn execute_cuda(device_id: u32, a: f32, b: f32) -> Result<f32, Box<dyn Error>> {
let device = Device::get_device(device_id)?;
/// Creates a new CUDA context and pushes it onto the current thread's stack.
/// # Arguments
/// * `flags` - The flags to be used when creating the context.
/// * `device` - The device on which the context will be created.
/// # Returns
/// The newly created CUDA context.
/// # Errors
/// Returns an error if the context creation fails.
/// # Example
/// ```rust
/// use swarms::cuda_wrapper::Context;
/// let device = 0;
/// let context = Context::create_and_push(ContextFlags::MAP_HOST | ContextFlags::SCHED_AUTO, device)?;
/// ```
pub fn create_and_push(flags: ContextFlags, device: i32) -> Result<Context, CudaError> {
// implementation goes here
let context = Context::create_and_push(ContextFlags::MAP_HOST | ContextFlags::SCHED_AUTO, device)?;
let module_data = CString::new(include_str!("../resources/add.ptx"))?;
let module = Module::load_from_string(&module_data)?;
let stream = Stream::new(StreamFlags::NON_BLOCKING, None)?;
let mut x = DeviceBox::new(&a)?;
let mut y = DeviceBox::new(&b)?;
let mut result = DeviceBox::new(&0.0f32)?;
unsafe {
launch!(module.sum<<<1, 1, 0, stream>>>(
let mut result_host = 0.0f32;
result.copy_to(&mut result_host)?;
@ -1,37 +0,0 @@
use std::fs::File;
use std::io::prelude::*;
use std::time::Instant;
use std::io::{BufReader, io};
use ranyon::prelude::{IntoParallelRefIterator, ParallelIterator};
fn read_file(path: &str) -> Vec<String> {
/// Reads the contents of a file located at the specified path.
/// # Arguments
/// * `path` - The path to the file.
/// # Returns
/// A `Result` containing a vector of strings representing the lines of the file if the file was successfully read,
/// or an `io::Error` if there was an error reading the file.
/// # Example
/// ```
/// use std::io;
/// use std::fs::File;
/// use std::io::BufReader;
/// fn read_file(path: &str) -> io::Result<Vec<String>> {
/// let contents: io::Result<Vec<String>> = BufReader::new(File::open(path).expect("Could not open file"))
/// .lines()
/// .collect();
/// contents
/// }
/// ```
let contents: io::Result<Vec<String>> = BufReader::new(File::open(path).expect("Could not open file"))
return contents.expect("Could not read file");
@ -1,113 +0,0 @@
/// This module provides a multi-threading processor for executing Python modules and functions in parallel.
/// It utilizes the `rayon` crate for parallel processing and the `pyo3` crate for interacting with the Python interpreter.
/// The `multithreading_processor` function takes a vector of `PythonModule` structs and the number of threads to use.
/// Each `PythonModule` struct contains the name of the Python module, the name of the function to call, and any arguments to pass to the function.
/// The function imports the Python module, calls the specified function, and sends any errors encountered back to the main thread.
/// If an import error occurs, a `PythonError::ImportError` is returned.
/// If a function call error occurs, a `PythonError::FunctionError` is returned.
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use rayon::prelude::*;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use log::{info, error};
struct PythonModule<'a> {
name: &'a str,
function: &'a str,
enum PythonError {
fn my_module(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(process_python_modules, m)?)?;
/// The function returns `Ok(())` if all modules are processed successfully.
/// Note: This code assumes that the necessary dependencies (`pyo3`, `rayon`, `log`) are already imported and initialized.
/// # Arguments
/// * `modules` - A vector of `PythonModule` structs representing the Python modules and functions to execute.
/// * `num_threads` - The number of threads to use for parallel processing.
/// # Examples
/// ```
/// use pyo3::types::PyModule;
/// use pyo3::types::PyResult;
/// use pyo3::prelude::*;
/// struct PythonModule<'a> {
/// name: &'a str,
/// function: &'a str,
/// args: Vec<&'a str>,
/// }
/// #[pymodule]
/// fn multithreading_processor(modules: Vec<PythonModule>, num_threads: usize) -> Result<(), PythonError> {
/// // Function implementation
/// Ok(())
/// }
/// ```
/// # Errors
/// Returns a `PythonError` if an import error or a function call error occurs.
/// # Panics
/// This function does not panic.
/// # Safety
/// This function is safe to call, but it assumes that the necessary dependencies (`pyo3`, `rayon`, `log`) are already imported and initialized.
// Initialize Python interpreter
fn process_python_modules(modules: Vec<PythonModule>, num_threads: usize) -> Result<(), PythonError> {
let gil = Python::acquire_gil();
let py = gil.python();
// Set the global thread pool's configuration
// Create a channel to send errors from threads to the main thread
let (tx, rx) = channel();
let tx = Arc::new(Mutex::new(tx));
// Process each Python module in parallel
modules.par_iter().for_each(|module| {
let result = PyModule::import(py,
.map_err(|_| PythonError::ImportError(
.and_then(|m| m.call0(module.function)
.map_err(|_| PythonError::FunctionError(module.function.to_string())));
if let Err(e) = result {
let tx = tx.lock().unwrap();
// Check for errors
drop(tx); // Close the sender
for error in rx {
match error {
PythonError::ImportError(module) => error!("Failed to import module {}", module),
PythonError::FunctionError(function) => error!("Failed to call function {}", function),
@ -1,59 +0,0 @@
use pyo3::prelude::*;
use pyo3::types::PyList;
use rayon::prelude::*;
use std::fs;
use std::time::Instant;
// Define the new execute function
fn exec_concurrently(script_path: &str, threads: usize) -> PyResult<()> {
(0..threads).into_par_iter().for_each(|_| {
Python::with_gil(|py| {
let sys = py.import("sys").unwrap();
let path: &PyList = match sys.getattr("path") {
Ok(path) => match path.downcast() {
Ok(path) => path,
Err(e) => {
eprintln!("Failed to downcast path: {:?}", e);
Err(e) => {
eprintln!("Failed to get path attribute: {:?}", e);
if let Err(e) = path.append("lib/python3.11/site-packages") {
eprintln!("Failed to append path: {:?}", e);
let script = fs::read_to_string(script_path).unwrap();
||||||, None, None).unwrap();
fn main() -> PyResult<()> {
let args: Vec<String> = std::env::args().collect();
let threads = 20;
if args.len() < 2 {
eprintln!("Usage: {} <path_to_python_script>", args[0]);
let script_path = &args[1];
let start = Instant::now();
// Call the execute function
exec_concurrently(script_path, threads)?;
let duration = start.elapsed();
match fs::write("/tmp/elapsed.time", format!("booting time: {:?}", duration)) {
Ok(_) => println!("Successfully wrote elapsed time to /tmp/elapsed.time"),
Err(e) => eprintln!("Failed to write elapsed time: {:?}", e),
@ -0,0 +1,42 @@
import concurrent.futures
from typing import Any, Callable, Dict, List
from inspect import iscoroutinefunction
import asyncio
# Helper function to run an asynchronous function in a synchronous way
def run_async_function_in_sync(
func: Callable, *args, **kwargs
) -> Any:
loop = asyncio.new_event_loop()
coroutine = func(*args, **kwargs)
return loop.run_until_complete(coroutine)
# Main omni function for parallel execution
def omni_parallel_function_caller(
function_calls: List[Dict[str, Any]]
) -> List[Any]:
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_call = {}
for call in function_calls:
func = call["function"]
args = call.get("args", ())
kwargs = call.get("kwargs", {})
if iscoroutinefunction(func):
# Wrap and execute asynchronous function in a separate process
future = executor.submit(
run_async_function_in_sync, func, *args, **kwargs
# Directly execute synchronous function in a thread
future = executor.submit(func, *args, **kwargs)
future_to_call[future] = call
for future in concurrent.futures.as_completed(future_to_call):
return results
@ -0,0 +1,17 @@
import json
from pydantic import BaseModel
def base_model_schema_to_json(model: BaseModel):
Converts the JSON schema of a base model to a formatted JSON string.
model (BaseModel): The base model for which to generate the JSON schema.
str: The JSON schema of the base model as a formatted JSON string.
return json.dumps(model.model_json_schema(), indent=2)
Reference in new issue