|
|
@ -2153,124 +2153,124 @@ router_toolkit = VectorStoreRouterToolkit(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
############################################### ===========================> Whisperx speech to text
|
|
|
|
############################################### ===========================> Whisperx speech to text
|
|
|
|
import os
|
|
|
|
# import os
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
# from pydantic import BaseModel, Field
|
|
|
|
from pydub import AudioSegment
|
|
|
|
# from pydub import AudioSegment
|
|
|
|
from pytube import YouTube
|
|
|
|
# from pytube import YouTube
|
|
|
|
import whisperx
|
|
|
|
# import whisperx
|
|
|
|
from langchain.tools import tool
|
|
|
|
# from langchain.tools import tool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
hf_api_key = os.environ["HF_API_KEY"]
|
|
|
|
|
|
|
|
# define a custom input schema for the youtube url
|
|
|
|
|
|
|
|
class YouTubeVideoInput(BaseModel):
|
|
|
|
|
|
|
|
video_url: str = Field(description="YouTube Video URL to transcribe")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_youtube_video(video_url, audio_format='mp3'):
|
|
|
|
|
|
|
|
audio_file = f'video.{audio_format}'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Download video
|
|
|
|
# hf_api_key = os.environ["HF_API_KEY"]
|
|
|
|
yt = YouTube(video_url)
|
|
|
|
# # define a custom input schema for the youtube url
|
|
|
|
yt_stream = yt.streams.filter(only_audio=True).first()
|
|
|
|
# class YouTubeVideoInput(BaseModel):
|
|
|
|
yt_stream.download(filename='video.mp4')
|
|
|
|
# video_url: str = Field(description="YouTube Video URL to transcribe")
|
|
|
|
|
|
|
|
|
|
|
|
# Convert video to audio
|
|
|
|
|
|
|
|
video = AudioSegment.from_file("video.mp4", format="mp4")
|
|
|
|
|
|
|
|
video.export(audio_file, format=audio_format)
|
|
|
|
|
|
|
|
os.remove("video.mp4")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return audio_file
|
|
|
|
# def download_youtube_video(video_url, audio_format='mp3'):
|
|
|
|
|
|
|
|
# audio_file = f'video.{audio_format}'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# # Download video
|
|
|
|
|
|
|
|
# yt = YouTube(video_url)
|
|
|
|
|
|
|
|
# yt_stream = yt.streams.filter(only_audio=True).first()
|
|
|
|
|
|
|
|
# yt_stream.download(filename='video.mp4')
|
|
|
|
|
|
|
|
|
|
|
|
@tool("transcribe_youtube_video", args_schema=YouTubeVideoInput, return_direct=True)
|
|
|
|
# # Convert video to audio
|
|
|
|
def transcribe_youtube_video(video_url: str) -> str:
|
|
|
|
# video = AudioSegment.from_file("video.mp4", format="mp4")
|
|
|
|
"""Transcribes a YouTube video."""
|
|
|
|
# video.export(audio_file, format=audio_format)
|
|
|
|
audio_file = download_youtube_video(video_url)
|
|
|
|
# os.remove("video.mp4")
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda"
|
|
|
|
# return audio_file
|
|
|
|
batch_size = 16
|
|
|
|
|
|
|
|
compute_type = "float16"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 1. Transcribe with original Whisper (batched)
|
|
|
|
|
|
|
|
model = whisperx.load_model("large-v2", device, compute_type=compute_type)
|
|
|
|
|
|
|
|
audio = whisperx.load_audio(audio_file)
|
|
|
|
|
|
|
|
result = model.transcribe(audio, batch_size=batch_size)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 2. Align Whisper output
|
|
|
|
# @tool("transcribe_youtube_video", args_schema=YouTubeVideoInput, return_direct=True)
|
|
|
|
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
|
|
|
|
# def transcribe_youtube_video(video_url: str) -> str:
|
|
|
|
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
|
|
|
|
# """Transcribes a YouTube video."""
|
|
|
|
|
|
|
|
# audio_file = download_youtube_video(video_url)
|
|
|
|
|
|
|
|
|
|
|
|
# 3. Assign speaker labels
|
|
|
|
# device = "cuda"
|
|
|
|
|
|
|
|
# batch_size = 16
|
|
|
|
|
|
|
|
# compute_type = "float16"
|
|
|
|
|
|
|
|
|
|
|
|
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device)
|
|
|
|
# # 1. Transcribe with original Whisper (batched)
|
|
|
|
diarize_segments = diarize_model(audio_file)
|
|
|
|
# model = whisperx.load_model("large-v2", device, compute_type=compute_type)
|
|
|
|
|
|
|
|
# audio = whisperx.load_audio(audio_file)
|
|
|
|
|
|
|
|
# result = model.transcribe(audio, batch_size=batch_size)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
# # 2. Align Whisper output
|
|
|
|
segments = result["segments"]
|
|
|
|
# model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
|
|
|
|
transcription = " ".join(segment['text'] for segment in segments)
|
|
|
|
# result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
|
|
|
|
return transcription
|
|
|
|
|
|
|
|
except KeyError:
|
|
|
|
|
|
|
|
print("The key 'segments' is not found in the result.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# # 3. Assign speaker labels
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device)
|
|
|
|
|
|
|
|
# diarize_segments = diarize_model(audio_file)
|
|
|
|
|
|
|
|
|
|
|
|
################################################### BASE WHISPER TOOL
|
|
|
|
# try:
|
|
|
|
from typing import Optional, Type
|
|
|
|
# segments = result["segments"]
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
# transcription = " ".join(segment['text'] for segment in segments)
|
|
|
|
from langchain.tools import BaseTool
|
|
|
|
# return transcription
|
|
|
|
from langchain.callbacks.manager import (
|
|
|
|
# except KeyError:
|
|
|
|
AsyncCallbackManagerForToolRun,
|
|
|
|
# print("The key 'segments' is not found in the result.")
|
|
|
|
CallbackManagerForToolRun,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
import whisperx
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AudioInput(BaseModel):
|
|
|
|
|
|
|
|
audio_file: str = Field(description="Path to audio file")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TranscribeAudioTool(BaseTool):
|
|
|
|
# ################################################### BASE WHISPER TOOL
|
|
|
|
name = "transcribe_audio"
|
|
|
|
# from typing import Optional, Type
|
|
|
|
description = "Transcribes an audio file using WhisperX"
|
|
|
|
# from pydantic import BaseModel, Field
|
|
|
|
args_schema: Type[AudioInput] = AudioInput
|
|
|
|
# from langchain.tools import BaseTool
|
|
|
|
|
|
|
|
# from langchain.callbacks.manager import (
|
|
|
|
def _run(
|
|
|
|
# AsyncCallbackManagerForToolRun,
|
|
|
|
self,
|
|
|
|
# CallbackManagerForToolRun,
|
|
|
|
audio_file: str,
|
|
|
|
# )
|
|
|
|
device: str = "cuda",
|
|
|
|
# import requests
|
|
|
|
batch_size: int = 16,
|
|
|
|
# import whisperx
|
|
|
|
compute_type: str = "float16",
|
|
|
|
|
|
|
|
run_manager: Optional[CallbackManagerForToolRun] = None,
|
|
|
|
# class AudioInput(BaseModel):
|
|
|
|
) -> str:
|
|
|
|
# audio_file: str = Field(description="Path to audio file")
|
|
|
|
"""Use the tool."""
|
|
|
|
|
|
|
|
model = whisperx.load_model("large-v2", device, compute_type=compute_type)
|
|
|
|
|
|
|
|
audio = whisperx.load_audio(audio_file)
|
|
|
|
# class TranscribeAudioTool(BaseTool):
|
|
|
|
result = model.transcribe(audio, batch_size=batch_size)
|
|
|
|
# name = "transcribe_audio"
|
|
|
|
|
|
|
|
# description = "Transcribes an audio file using WhisperX"
|
|
|
|
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
|
|
|
|
# args_schema: Type[AudioInput] = AudioInput
|
|
|
|
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
|
|
|
|
|
|
|
|
|
|
|
|
# def _run(
|
|
|
|
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device)
|
|
|
|
# self,
|
|
|
|
diarize_segments = diarize_model(audio_file)
|
|
|
|
# audio_file: str,
|
|
|
|
|
|
|
|
# device: str = "cuda",
|
|
|
|
try:
|
|
|
|
# batch_size: int = 16,
|
|
|
|
segments = result["segments"]
|
|
|
|
# compute_type: str = "float16",
|
|
|
|
transcription = " ".join(segment['text'] for segment in segments)
|
|
|
|
# run_manager: Optional[CallbackManagerForToolRun] = None,
|
|
|
|
return transcription
|
|
|
|
# ) -> str:
|
|
|
|
except KeyError:
|
|
|
|
# """Use the tool."""
|
|
|
|
print("The key 'segments' is not found in the result.")
|
|
|
|
# model = whisperx.load_model("large-v2", device, compute_type=compute_type)
|
|
|
|
|
|
|
|
# audio = whisperx.load_audio(audio_file)
|
|
|
|
async def _arun(
|
|
|
|
# result = model.transcribe(audio, batch_size=batch_size)
|
|
|
|
self,
|
|
|
|
|
|
|
|
audio_file: str,
|
|
|
|
# model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
|
|
|
|
device: str = "cuda",
|
|
|
|
# result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
|
|
|
|
batch_size: int = 16,
|
|
|
|
|
|
|
|
compute_type: str = "float16",
|
|
|
|
# diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device)
|
|
|
|
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
|
|
|
|
# diarize_segments = diarize_model(audio_file)
|
|
|
|
) -> str:
|
|
|
|
|
|
|
|
"""Use the tool asynchronously."""
|
|
|
|
# try:
|
|
|
|
raise NotImplementedError("transcribe_audio does not support async")
|
|
|
|
# segments = result["segments"]
|
|
|
|
|
|
|
|
# transcription = " ".join(segment['text'] for segment in segments)
|
|
|
|
|
|
|
|
# return transcription
|
|
|
|
|
|
|
|
# except KeyError:
|
|
|
|
|
|
|
|
# print("The key 'segments' is not found in the result.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# async def _arun(
|
|
|
|
|
|
|
|
# self,
|
|
|
|
|
|
|
|
# audio_file: str,
|
|
|
|
|
|
|
|
# device: str = "cuda",
|
|
|
|
|
|
|
|
# batch_size: int = 16,
|
|
|
|
|
|
|
|
# compute_type: str = "float16",
|
|
|
|
|
|
|
|
# run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
|
|
|
|
|
|
|
|
# ) -> str:
|
|
|
|
|
|
|
|
# """Use the tool asynchronously."""
|
|
|
|
|
|
|
|
# raise NotImplementedError("transcribe_audio does not support async")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|