commit
6ae8e6fd72
@ -0,0 +1,45 @@
|
|||||||
|
# Import necessary libraries
|
||||||
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
# from swarms import ToolAgent
|
||||||
|
from swarms.utils.json_utils import base_model_schema_to_json
|
||||||
|
|
||||||
|
# Load the pre-trained model and tokenizer
|
||||||
|
model = AutoModelForCausalLM.from_pretrained(
|
||||||
|
"databricks/dolly-v2-12b",
|
||||||
|
load_in_4bit=True,
|
||||||
|
device_map="auto",
|
||||||
|
)
|
||||||
|
tokenizer = AutoTokenizer.from_pretrained("databricks/dolly-v2-12b")
|
||||||
|
|
||||||
|
|
||||||
|
class Schema(BaseModel):
|
||||||
|
name: str
|
||||||
|
agent: int
|
||||||
|
is_student: bool
|
||||||
|
courses: list[str]
|
||||||
|
|
||||||
|
|
||||||
|
json_schema = str(base_model_schema_to_json(Schema))
|
||||||
|
print(json_schema)
|
||||||
|
|
||||||
|
# # Define the task to generate a person's information
|
||||||
|
# task = (
|
||||||
|
# "Generate a person's information based on the following schema:"
|
||||||
|
# )
|
||||||
|
|
||||||
|
# # Create an instance of the ToolAgent class
|
||||||
|
# agent = ToolAgent(
|
||||||
|
# name="dolly-function-agent",
|
||||||
|
# description="Ana gent to create a child data",
|
||||||
|
# model=model,
|
||||||
|
# tokenizer=tokenizer,
|
||||||
|
# json_schema=json_schema,
|
||||||
|
# )
|
||||||
|
|
||||||
|
# # Run the agent to generate the person's information
|
||||||
|
# generated_data = agent.run(task)
|
||||||
|
|
||||||
|
# # Print the generated data
|
||||||
|
# print(f"Generated data: {generated_data}")
|
@ -1,93 +0,0 @@
|
|||||||
use pyo3::prelude::*;
|
|
||||||
use pyo3::wrap_pyfunction;
|
|
||||||
use pyo3::types::IntoPyDict;
|
|
||||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
|
|
||||||
#[pymodule]
|
|
||||||
fn rust_module(py: Python, m: &PyModule) -> PyResult<()> {
|
|
||||||
m.add_function(wrap_pyfunction!(concurrent_exec, m)?)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This function wraps Python code in Rust concurrency for ultra high performance.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `py_codes` - A vector of string slices that holds the Python codes to be executed.
|
|
||||||
/// * `timeout` - An optional duration to specify a timeout for the Python code execution.
|
|
||||||
/// * `num_threads` - The number of threads to use for executing the Python code.
|
|
||||||
/// * `error_handler` - A function to handle errors during Python code execution.
|
|
||||||
/// * `log_function` - A function to log the execution of the Python code.
|
|
||||||
/// * `result_handler` - A function to handle the results of the Python code execution.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// let py_codes = vec!["print('Hello, World!')", "print('Hello, Rust!')"];
|
|
||||||
/// let timeout = Some(Duration::from_secs(5));
|
|
||||||
/// let num_threads = 4;
|
|
||||||
/// let error_handler = |e| eprintln!("Error: {}", e);
|
|
||||||
/// let log_function = |s| println!("Log: {}", s);
|
|
||||||
/// let result_handler = |r| println!("Result: {:?}", r);
|
|
||||||
/// execute_python_codes(py_codes, timeout, num_threads, error_handler, log_function, result_handler);
|
|
||||||
/// ```
|
|
||||||
|
|
||||||
#[pyfunction]
|
|
||||||
pub fn concurrent_exec<F, G, H>(
|
|
||||||
py_codes: Vec<&str>,
|
|
||||||
timeout: Option<Duration>,
|
|
||||||
num_threads: usize,
|
|
||||||
error_handler: F,
|
|
||||||
log_function: G,
|
|
||||||
result_handler: H,
|
|
||||||
) -> PyResult<Vec<PyResult<()>>>
|
|
||||||
where
|
|
||||||
F: Fn(&str),
|
|
||||||
G: Fn(&str),
|
|
||||||
H: Fn(&PyResult<()>),
|
|
||||||
{
|
|
||||||
let gil = Python::acquire_gil();
|
|
||||||
let py = gil.python();
|
|
||||||
let py_codes = Arc::new(Mutex::new(py_codes));
|
|
||||||
let results = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let pool = ThreadPool::new(num_threads);
|
|
||||||
|
|
||||||
pool.install(|| {
|
|
||||||
py_codes.par_iter().for_each(|code| {
|
|
||||||
let locals = [("__name__", "__main__")].into_py_dict(py);
|
|
||||||
let globals = [("__name__", "__main__")].into_py_dict(py);
|
|
||||||
|
|
||||||
log_function(&format!("Executing Python code: {}", code));
|
|
||||||
let result = py.run(code, Some(globals), Some(locals));
|
|
||||||
|
|
||||||
match timeout {
|
|
||||||
Some(t) => {
|
|
||||||
let now = Instant::now();
|
|
||||||
let timeout_thread = thread::spawn(move || {
|
|
||||||
while now.elapsed() < t {
|
|
||||||
if let Ok(_) = result {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if now.elapsed() >= t {
|
|
||||||
error_handler(&format!("Python code execution timed out: {}", code));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
timeout_thread.join().unwrap();
|
|
||||||
}
|
|
||||||
None => {}
|
|
||||||
}
|
|
||||||
|
|
||||||
results.lock().unwrap().push(result.clone(result));
|
|
||||||
result_handler(&result);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
pool.join();
|
|
||||||
Ok(results.lock().unwrap().clone())
|
|
||||||
}
|
|
@ -1,71 +0,0 @@
|
|||||||
use pyo3::prelude::*;
|
|
||||||
use rustacuda::prelude::*;
|
|
||||||
use rustacuda::memory::DeviceBox;
|
|
||||||
use std::error::Error;
|
|
||||||
use std::ffi::CString;
|
|
||||||
|
|
||||||
#[pymodule]
|
|
||||||
fn rust_cuda(_py: Python, m: &PyModule) -> PyResult<()> {
|
|
||||||
#[pyfn(m, "execute_on_device")]
|
|
||||||
fn execute_on_device(py: Python, device_id: u32, a: f32, b: f32) -> PyResult<f32> {
|
|
||||||
/// The result of executing the CUDA operation.
|
|
||||||
let result = py.allow_threads(|| {
|
|
||||||
execute_cuda(device_id, a, b)
|
|
||||||
});
|
|
||||||
match result {
|
|
||||||
Ok(res) => Ok(res),
|
|
||||||
Err(err) => Err(PyErr::new::<pyo3::exceptions::PyException, _>(format!("{}", err))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn execute_cuda(device_id: u32, a: f32, b: f32) -> Result<f32, Box<dyn Error>> {
|
|
||||||
rustacuda::init(CudaFlags::empty())?;
|
|
||||||
let device = Device::get_device(device_id)?;
|
|
||||||
/// Creates a new CUDA context and pushes it onto the current thread's stack.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `flags` - The flags to be used when creating the context.
|
|
||||||
/// * `device` - The device on which the context will be created.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
///
|
|
||||||
/// The newly created CUDA context.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
///
|
|
||||||
/// Returns an error if the context creation fails.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
///
|
|
||||||
/// ```rust
|
|
||||||
/// use swarms::cuda_wrapper::Context;
|
|
||||||
///
|
|
||||||
/// let device = 0;
|
|
||||||
/// let context = Context::create_and_push(ContextFlags::MAP_HOST | ContextFlags::SCHED_AUTO, device)?;
|
|
||||||
/// ```
|
|
||||||
pub fn create_and_push(flags: ContextFlags, device: i32) -> Result<Context, CudaError> {
|
|
||||||
// implementation goes here
|
|
||||||
}
|
|
||||||
let context = Context::create_and_push(ContextFlags::MAP_HOST | ContextFlags::SCHED_AUTO, device)?;
|
|
||||||
let module_data = CString::new(include_str!("../resources/add.ptx"))?;
|
|
||||||
let module = Module::load_from_string(&module_data)?;
|
|
||||||
let stream = Stream::new(StreamFlags::NON_BLOCKING, None)?;
|
|
||||||
let mut x = DeviceBox::new(&a)?;
|
|
||||||
let mut y = DeviceBox::new(&b)?;
|
|
||||||
let mut result = DeviceBox::new(&0.0f32)?;
|
|
||||||
unsafe {
|
|
||||||
launch!(module.sum<<<1, 1, 0, stream>>>(
|
|
||||||
x.as_device_ptr(),
|
|
||||||
y.as_device_ptr(),
|
|
||||||
result.as_device_ptr(),
|
|
||||||
1
|
|
||||||
))?;
|
|
||||||
}
|
|
||||||
stream.synchronize()?;
|
|
||||||
let mut result_host = 0.0f32;
|
|
||||||
result.copy_to(&mut result_host)?;
|
|
||||||
Ok(result_host)
|
|
||||||
}
|
|
@ -1,37 +0,0 @@
|
|||||||
use std::fs::File;
|
|
||||||
use std::io::prelude::*;
|
|
||||||
use std::time::Instant;
|
|
||||||
use std::io::{BufReader, io};
|
|
||||||
use ranyon::prelude::{IntoParallelRefIterator, ParallelIterator};
|
|
||||||
|
|
||||||
fn read_file(path: &str) -> Vec<String> {
|
|
||||||
/// Reads the contents of a file located at the specified path.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `path` - The path to the file.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
///
|
|
||||||
/// A `Result` containing a vector of strings representing the lines of the file if the file was successfully read,
|
|
||||||
/// or an `io::Error` if there was an error reading the file.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use std::io;
|
|
||||||
/// use std::fs::File;
|
|
||||||
/// use std::io::BufReader;
|
|
||||||
///
|
|
||||||
/// fn read_file(path: &str) -> io::Result<Vec<String>> {
|
|
||||||
/// let contents: io::Result<Vec<String>> = BufReader::new(File::open(path).expect("Could not open file"))
|
|
||||||
/// .lines()
|
|
||||||
/// .collect();
|
|
||||||
/// contents
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
let contents: io::Result<Vec<String>> = BufReader::new(File::open(path).expect("Could not open file"))
|
|
||||||
.lines()
|
|
||||||
.collect();
|
|
||||||
return contents.expect("Could not read file");
|
|
||||||
}
|
|
@ -1,113 +0,0 @@
|
|||||||
/// This module provides a multi-threading processor for executing Python modules and functions in parallel.
|
|
||||||
/// It utilizes the `rayon` crate for parallel processing and the `pyo3` crate for interacting with the Python interpreter.
|
|
||||||
/// The `multithreading_processor` function takes a vector of `PythonModule` structs and the number of threads to use.
|
|
||||||
/// Each `PythonModule` struct contains the name of the Python module, the name of the function to call, and any arguments to pass to the function.
|
|
||||||
/// The function imports the Python module, calls the specified function, and sends any errors encountered back to the main thread.
|
|
||||||
/// If an import error occurs, a `PythonError::ImportError` is returned.
|
|
||||||
/// If a function call error occurs, a `PythonError::FunctionError` is returned.
|
|
||||||
|
|
||||||
use pyo3::prelude::*;
|
|
||||||
use pyo3::wrap_pyfunction;
|
|
||||||
use rayon::prelude::*;
|
|
||||||
use std::sync::mpsc::{channel, Sender};
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use log::{info, error};
|
|
||||||
|
|
||||||
struct PythonModule<'a> {
|
|
||||||
name: &'a str,
|
|
||||||
function: &'a str,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum PythonError {
|
|
||||||
ImportError(String),
|
|
||||||
FunctionError(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[pyfunction]
|
|
||||||
fn my_module(py: Python, m: &PyModule) -> PyResult<()> {
|
|
||||||
m.add_function(wrap_pyfunction!(process_python_modules, m)?)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// The function returns `Ok(())` if all modules are processed successfully.
|
|
||||||
/// Note: This code assumes that the necessary dependencies (`pyo3`, `rayon`, `log`) are already imported and initialized.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `modules` - A vector of `PythonModule` structs representing the Python modules and functions to execute.
|
|
||||||
/// * `num_threads` - The number of threads to use for parallel processing.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use pyo3::types::PyModule;
|
|
||||||
/// use pyo3::types::PyResult;
|
|
||||||
/// use pyo3::prelude::*;
|
|
||||||
///
|
|
||||||
/// struct PythonModule<'a> {
|
|
||||||
/// name: &'a str,
|
|
||||||
/// function: &'a str,
|
|
||||||
/// args: Vec<&'a str>,
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// #[pymodule]
|
|
||||||
/// fn multithreading_processor(modules: Vec<PythonModule>, num_threads: usize) -> Result<(), PythonError> {
|
|
||||||
/// // Function implementation
|
|
||||||
/// Ok(())
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
///
|
|
||||||
/// Returns a `PythonError` if an import error or a function call error occurs.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// This function does not panic.
|
|
||||||
///
|
|
||||||
/// # Safety
|
|
||||||
///
|
|
||||||
/// This function is safe to call, but it assumes that the necessary dependencies (`pyo3`, `rayon`, `log`) are already imported and initialized.
|
|
||||||
// Initialize Python interpreter
|
|
||||||
#[pyfunction]
|
|
||||||
fn process_python_modules(modules: Vec<PythonModule>, num_threads: usize) -> Result<(), PythonError> {
|
|
||||||
|
|
||||||
let gil = Python::acquire_gil();
|
|
||||||
let py = gil.python();
|
|
||||||
|
|
||||||
// Set the global thread pool's configuration
|
|
||||||
rayon::ThreadPoolBuilder::new()
|
|
||||||
.num_threads(num_threads)
|
|
||||||
.build_global()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Create a channel to send errors from threads to the main thread
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
let tx = Arc::new(Mutex::new(tx));
|
|
||||||
|
|
||||||
// Process each Python module in parallel
|
|
||||||
modules.par_iter().for_each(|module| {
|
|
||||||
let result = PyModule::import(py, module.name)
|
|
||||||
.map_err(|_| PythonError::ImportError(module.name.to_string()))
|
|
||||||
.and_then(|m| m.call0(module.function)
|
|
||||||
.map_err(|_| PythonError::FunctionError(module.function.to_string())));
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
|
||||||
let tx = tx.lock().unwrap();
|
|
||||||
tx.send(e).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Check for errors
|
|
||||||
drop(tx); // Close the sender
|
|
||||||
for error in rx {
|
|
||||||
match error {
|
|
||||||
PythonError::ImportError(module) => error!("Failed to import module {}", module),
|
|
||||||
PythonError::FunctionError(function) => error!("Failed to call function {}", function),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,59 +0,0 @@
|
|||||||
use pyo3::prelude::*;
|
|
||||||
use pyo3::types::PyList;
|
|
||||||
use rayon::prelude::*;
|
|
||||||
use std::fs;
|
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
// Define the new execute function
|
|
||||||
fn exec_concurrently(script_path: &str, threads: usize) -> PyResult<()> {
|
|
||||||
(0..threads).into_par_iter().for_each(|_| {
|
|
||||||
Python::with_gil(|py| {
|
|
||||||
let sys = py.import("sys").unwrap();
|
|
||||||
let path: &PyList = match sys.getattr("path") {
|
|
||||||
Ok(path) => match path.downcast() {
|
|
||||||
Ok(path) => path,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to downcast path: {:?}", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to get path attribute: {:?}", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = path.append("lib/python3.11/site-packages") {
|
|
||||||
eprintln!("Failed to append path: {:?}", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
let script = fs::read_to_string(script_path).unwrap();
|
|
||||||
py.run(&script, None, None).unwrap();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() -> PyResult<()> {
|
|
||||||
let args: Vec<String> = std::env::args().collect();
|
|
||||||
let threads = 20;
|
|
||||||
|
|
||||||
if args.len() < 2 {
|
|
||||||
eprintln!("Usage: {} <path_to_python_script>", args[0]);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
let script_path = &args[1];
|
|
||||||
|
|
||||||
let start = Instant::now();
|
|
||||||
|
|
||||||
// Call the execute function
|
|
||||||
exec_concurrently(script_path, threads)?;
|
|
||||||
|
|
||||||
let duration = start.elapsed();
|
|
||||||
match fs::write("/tmp/elapsed.time", format!("booting time: {:?}", duration)) {
|
|
||||||
Ok(_) => println!("Successfully wrote elapsed time to /tmp/elapsed.time"),
|
|
||||||
Err(e) => eprintln!("Failed to write elapsed time: {:?}", e),
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -0,0 +1,42 @@
|
|||||||
|
import concurrent.futures
|
||||||
|
from typing import Any, Callable, Dict, List
|
||||||
|
from inspect import iscoroutinefunction
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
|
# Helper function to run an asynchronous function in a synchronous way
|
||||||
|
def run_async_function_in_sync(
|
||||||
|
func: Callable, *args, **kwargs
|
||||||
|
) -> Any:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
coroutine = func(*args, **kwargs)
|
||||||
|
return loop.run_until_complete(coroutine)
|
||||||
|
|
||||||
|
|
||||||
|
# Main omni function for parallel execution
|
||||||
|
def omni_parallel_function_caller(
|
||||||
|
function_calls: List[Dict[str, Any]]
|
||||||
|
) -> List[Any]:
|
||||||
|
results = []
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
future_to_call = {}
|
||||||
|
for call in function_calls:
|
||||||
|
func = call["function"]
|
||||||
|
args = call.get("args", ())
|
||||||
|
kwargs = call.get("kwargs", {})
|
||||||
|
|
||||||
|
if iscoroutinefunction(func):
|
||||||
|
# Wrap and execute asynchronous function in a separate process
|
||||||
|
future = executor.submit(
|
||||||
|
run_async_function_in_sync, func, *args, **kwargs
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Directly execute synchronous function in a thread
|
||||||
|
future = executor.submit(func, *args, **kwargs)
|
||||||
|
|
||||||
|
future_to_call[future] = call
|
||||||
|
|
||||||
|
for future in concurrent.futures.as_completed(future_to_call):
|
||||||
|
results.append(future.result())
|
||||||
|
return results
|
@ -0,0 +1,17 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
def base_model_schema_to_json(model: BaseModel):
|
||||||
|
"""
|
||||||
|
Converts the JSON schema of a base model to a formatted JSON string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model (BaseModel): The base model for which to generate the JSON schema.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The JSON schema of the base model as a formatted JSON string.
|
||||||
|
"""
|
||||||
|
return json.dumps(model.model_json_schema(), indent=2)
|
Loading…
Reference in new issue