diff --git a/initialization.py b/initialization.py new file mode 100644 index 0000000..6116d6f --- /dev/null +++ b/initialization.py @@ -0,0 +1,27 @@ +import os +from dataset import examples +from transformers import AutoTokenizer, AutoModelForSequenceClassification + +def can_launch_backend(): + # Проверяем размеры train_texts и train_labels + count_text = len(examples.train_texts) + count_lb = len(examples.train_labels) + + if count_text != count_lb: + print(f"Размерности данных не совпадают: {count_text} текстов и {count_lb} меток.") + return False + + print(f"Размерности совпадают: {count_text} текстов и {count_lb} меток.") + + # Проверяем существование моделей и токенизаторов + model_name = "DeepPavlov/rubert-base-cased" + try: + tokenizer = AutoTokenizer.from_pretrained(model_name) + model = AutoModelForSequenceClassification.from_pretrained(model_name) + except Exception as e: + print(f"Ошибка при загрузке модели или токенизатора: {e}") + return False + + print("Модель и токенизатор успешно загружены.") + return True + diff --git a/main.py b/main.py index 97a3460..2c85043 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,255 @@ +import uvicorn +import os +import asyncio +import threading +from queue import Queue +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from typing import Optional +import torch +from transformers import AutoTokenizer, AutoModelForSequenceClassification from dataset import examples +from datasets import Dataset +import numpy as np +from evaluate import load +from transformers import Trainer, TrainingArguments -count_text = len(examples.train_texts) -count_lb = len(examples.train_labels) -print(f"{count_text} / {count_lb}") +app = FastAPI() -print(examples.train_labels) +model: Optional[AutoModelForSequenceClassification] = None +tokenizer: Optional[AutoTokenizer] = None +classes = examples.classes +# Флаги состояния +training_in_progress = False +training_complete = False + +# Очереди для обработки запросов +queue1 = Queue() +queue2 = Queue() +current_queue = queue1 +processing_queue = queue2 + +# Lock для переключения очередей +queue_lock = threading.Lock() + +MODEL_DIR = "./trained_model" +MODEL_NAME = "DeepPavlov/rubert-base-cased" + +# Pydantic модель для запроса предсказания +class PredictionRequest(BaseModel): + text: str + + +def prepare_datasets(train_texts, train_labels, val_texts, val_labels, tokenizer): + """Создаёт токенизированные датасеты для обучения и валидации.""" + train_dataset = Dataset.from_dict({"text": train_texts, "label": train_labels}) + val_dataset = Dataset.from_dict({"text": val_texts, "label": val_labels}) + + def tokenize_function(examples): + return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128) + + train_dataset = train_dataset.map(tokenize_function, batched=True) + val_dataset = val_dataset.map(tokenize_function, batched=True) + + train_dataset = train_dataset.remove_columns(["text"]) + val_dataset = val_dataset.remove_columns(["text"]) + + train_dataset = train_dataset.with_format("torch") + val_dataset = val_dataset.with_format("torch") + + return train_dataset, val_dataset + + +def train_model(train_dataset, val_dataset, classes, model_name, output_dir="./results", num_epochs=3): + """Обучает модель и сохраняет её в указанной директории.""" + global training_in_progress, training_complete, model, tokenizer + + training_in_progress = True + training_complete = False + + try: + tokenizer = AutoTokenizer.from_pretrained(model_name) + model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(classes)) + + training_args = TrainingArguments( + output_dir=output_dir, + eval_strategy="epoch", + save_strategy="epoch", + per_device_train_batch_size=8, + per_device_eval_batch_size=8, + num_train_epochs=num_epochs, + weight_decay=0.01, + logging_dir="./logs", + logging_steps=10, + load_best_model_at_end=True, + metric_for_best_model="accuracy" + ) + + accuracy_metric = load("accuracy") + + def compute_metrics(eval_pred): + logits, labels = eval_pred + predictions = np.argmax(logits, axis=-1) + return accuracy_metric.compute(predictions=predictions, references=labels) + + trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=val_dataset, + tokenizer=tokenizer, + compute_metrics=compute_metrics + ) + + trainer.train() + trainer.save_model(output_dir) + print(f"Модель сохранена в директории {output_dir}") + + # Загрузка сохранённой модели + model = AutoModelForSequenceClassification.from_pretrained(output_dir) + tokenizer = AutoTokenizer.from_pretrained(model_name) + + training_complete = True + except Exception as e: + print(f"Ошибка при обучении модели: {e}") + finally: + training_in_progress = False + +def can_launch_backend(): + """Проверяет, чтобы размерности данных совпадали и модель существует.""" + count_text = len(examples.train_texts) + count_lb = len(examples.train_labels) + + if count_text != count_lb: + print(f"Размерности данных не совпадают: {count_text} текстов и {count_lb} меток.") + return False + + if not os.path.exists(MODEL_DIR): + print(f"Директория модели {MODEL_DIR} не существует.") + return False + + return True + +def load_model(): + """Загружает сохранённую модель и токенизатор.""" + global model, tokenizer + model = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR) + tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) + print("Модель и токенизатор загружены.") + +def prediction_worker(): + while True: + request = processing_queue.get() + if request is None: + break # Завершение работы + text, response_future = request + try: + inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128) + outputs = model(**inputs) + predictions = torch.argmax(outputs.logits, dim=1).item() + predicted_class = classes[predictions] + response_future.set_result(predicted_class) + except Exception as e: + response_future.set_exception(e) + +# Функция переключения очередей +def flip_buffers(): + global current_queue, processing_queue + with queue_lock: + current_queue, processing_queue = processing_queue, current_queue + +async def process_queue(): + while True: + flip_buffers() + while not processing_queue.empty(): + request = processing_queue.get() + queue1.put(request) if current_queue == queue1 else queue2.put(request) + await asyncio.sleep(0.1) # Интервал между переключениями + +# Запуск воркера предсказаний +prediction_thread = threading.Thread(target=prediction_worker, daemon=True) +prediction_thread.start() + +# Запуск фоновой задачи для обработки очереди +@app.on_event("startup") +async def startup_event(): + global training_in_progress, training_complete, model, tokenizer + + # Проверка размерностей данных + count_text = len(examples.train_texts) + count_lb = len(examples.train_labels) + print(f"Размерности данных: {count_text} текстов и {count_lb} меток.") + + if count_text != count_lb: + print("Размерности данных не совпадают. Backend не может быть запущен.") + return + + # Проверка наличия модели + if not os.path.exists(MODEL_DIR): + print("Модель не найдена. Начинается обучение модели.") + tokenizer_temp = AutoTokenizer.from_pretrained(MODEL_NAME) + train_dataset, val_dataset = prepare_datasets( + examples.train_texts, + examples.train_labels, + examples.val_texts, + examples.val_labels, + tokenizer_temp + ) + + training_thread = threading.Thread( + target=train_model, + args=(train_dataset, val_dataset, classes, MODEL_NAME, MODEL_DIR, 3), + daemon=True + ) + training_thread.start() + else: + print("Модель найдена. Загружается модель.") + load_model() + + # Запуск фоновой задачи для обработки очереди + asyncio.create_task(process_queue()) + +@app.post("/predict") +async def predict_endpoint(request: PredictionRequest): + if training_in_progress: + raise HTTPException(status_code=503, detail="Модель обучается. Пожалуйста, попробуйте позже.") + if not training_complete and not model: + raise HTTPException(status_code=503, detail="Модель не загружена. Пожалуйста, попробуйте позже.") + + loop = asyncio.get_event_loop() + response_future = asyncio.Future() + + # Добавление запроса в очередь + with queue_lock: + current_queue.put((request.text, response_future)) + + try: + predicted_class = await asyncio.wait_for(response_future, timeout=10.0) + return {"predicted_class": predicted_class} + except asyncio.TimeoutError: + raise HTTPException(status_code=504, detail="Время ожидания предсказания истекло.") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/status") +async def status(): + if training_in_progress: + return {"status": "Обучение продолжается"} + elif training_complete: + return {"status": "Модель готова"} + elif os.path.exists(MODEL_DIR): + return {"status": "Модель загружена"} + else: + return {"status": "Модель отсутствует"} + +# Завершение работы +@app.on_event("shutdown") +def shutdown_event(): + # Остановить воркер предсказаний + processing_queue.put(None) + prediction_thread.join() + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/model_utils.py b/model_utils.py new file mode 100644 index 0000000..7307fcf --- /dev/null +++ b/model_utils.py @@ -0,0 +1,81 @@ +import os +import numpy as np +import torch +from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments +from datasets import Dataset +from evaluate import load + +def prepare_datasets(train_texts, train_labels, val_texts, val_labels, tokenizer): + """Создаёт токенизированные датасеты для обучения и валидации.""" + train_dataset = Dataset.from_dict({"text": train_texts, "label": train_labels}) + val_dataset = Dataset.from_dict({"text": val_texts, "label": val_labels}) + + def tokenize_function(examples): + return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128) + + train_dataset = train_dataset.map(tokenize_function, batched=True) + val_dataset = val_dataset.map(tokenize_function, batched=True) + + train_dataset = train_dataset.remove_columns(["text"]) + val_dataset = val_dataset.remove_columns(["text"]) + + train_dataset = train_dataset.with_format("torch") + val_dataset = val_dataset.with_format("torch") + + return train_dataset, val_dataset + +def train_model(train_dataset, val_dataset, classes, model_name, output_dir="./results", num_epochs=3): + """Обучает модель и сохраняет её в указанной директории.""" + model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(classes)) + tokenizer = AutoTokenizer.from_pretrained(model_name) + + training_args = TrainingArguments( + output_dir=output_dir, + eval_strategy="epoch", + save_strategy="epoch", + per_device_train_batch_size=8, + per_device_eval_batch_size=8, + num_train_epochs=num_epochs, + weight_decay=0.01, + logging_dir="./logs", + logging_steps=10, + load_best_model_at_end=True, + metric_for_best_model="accuracy" + ) + + accuracy_metric = load("accuracy") + + def compute_metrics(eval_pred): + logits, labels = eval_pred + predictions = np.argmax(logits, axis=-1) + return accuracy_metric.compute(predictions=predictions, references=labels) + + trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=val_dataset, + tokenizer=tokenizer, + compute_metrics=compute_metrics + ) + + trainer.train() + trainer.save_model(output_dir) + print(f"Модель сохранена в директории {output_dir}") + + return model, tokenizer + +def load_model_and_tokenizer(model_dir, model_name): + """Загружает сохранённую модель и токенизатор.""" + if not os.path.exists(model_dir): + raise ValueError(f"Директория {model_dir} не существует.") + model = AutoModelForSequenceClassification.from_pretrained(model_dir) + tokenizer = AutoTokenizer.from_pretrained(model_name) + return model, tokenizer + +def predict(model, tokenizer, text, classes): + """Делает предсказание для заданного текста.""" + inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128) + outputs = model(**inputs) + predictions = torch.argmax(outputs.logits, dim=1).item() + return classes[predictions]