@ -1,30 +1,22 @@
import asyncio
import base64
import logging
import os
import time
import concurrent . futures
import re
from dataclasses import dataclass
from typing import List , Optional , Union
from typing import List , Optional , Tuple
import openai
import requests
from cachetools import TTLCache
from dotenv import load_dotenv
from openai import OpenAI
from ratelimit import limits , sleep_and_retry
from termcolor import colored
# ENV
load_dotenv ( )
def logging_config ( ) :
""" Configures logging """
logging . basicConfig (
level = logging . INFO ,
format = " %(asctime)s - %(name)s - %(levelname)s - %(message)s " ,
)
logger = logging . getLogger ( __name__ )
return logger
@dataclass
class GPT4VisionResponse :
""" A response structure for GPT-4 """
@ -56,7 +48,7 @@ class GPT4Vision:
- - - - - - - -
process_img ( self , img_path : str ) - > str :
Processes the image to be used for the API request
__call__ ( self , img : Union [ str , List [ str ] ] , tasks : List [ str ] ) - > GPT4VisionResponse :
run ( self , img : Union [ str , List [ str ] ] , tasks : List [ str ] ) - > GPT4VisionResponse :
Makes a call to the GPT - 4 Vision API and returns the image url
Example :
@ -66,23 +58,24 @@ class GPT4Vision:
>> > answer = gpt4vision ( img , tasks )
>> > print ( answer )
"""
max_retries : int = 3
model : str = " gpt-4-vision-preview "
backoff_factor : float = 2.0
timeout_seconds : int = 10
api_key: Optional [ str ] = None
openai_ api_key: Optional [ str ] = None
# 'Low' or 'High' for respesctively fast or high quality, but high more token usage
quality : str = " low "
# Max tokens to use for the API request, the maximum might be 3,000 but we don't know
max_tokens : int = 200
client = OpenAI (
api_key = api_key ,
max_retries = max_retries ,
)
logger = logging_config ( )
client = OpenAI ( api_key = openai_api_key , )
dashboard : bool = True
call_limit : int = 1
period_seconds : int = 60
# Cache for storing API Responses
cache = TTLCache ( maxsize = 100 , ttl = 600 ) # Cache for 10 minutes
class Config :
""" Config class for the GPT4Vision model """
@ -94,204 +87,172 @@ class GPT4Vision:
with open ( img , " rb " ) as image_file :
return base64 . b64encode ( image_file . read ( ) ) . decode ( " utf-8 " )
def __call__ (
self ,
img : Union [ str , List [ str ] ] ,
tasks : List [ str ] ,
) - > GPT4VisionResponse :
@sleep_and_retry
@limits ( calls = call_limit ,
period = period_seconds ) # Rate limit of 10 calls per minute
def run ( self , task : str , img : str ) :
"""
Calls the GPT - 4 Vision API and returns the image url
Parameters :
- - - - - - - - - - -
img : Union [ str , List [ str ] ]
The image to be used for the API request
tasks : List [ str ]
The tasks to be used for the API request
Returns :
- - - - - - - -
answer : GPT4VisionResponse
The response from the API request
Example :
- - - - - - - -
>> > gpt4vision = GPT4Vision ( )
>> > img = " https://cdn.openai.com/dall-e/encoded/feats/feats_01J9J5ZKJZJY9.png "
>> > tasks = [ " A painting of a dog " ]
>> > answer = gpt4vision ( img , tasks )
>> > print ( answer )
Run the GPT - 4 Vision model
Task : str
The task to run
Img : str
The image to run the task on
"""
headers = {
" Content-Type " : " application/json " ,
" Authorization " : f " Bearer { self . api_key } " ,
}
# Image content
image_content = [ {
" type " : " imavge_url " ,
" image_url " : img
} if img . startswith ( " http " ) else {
" type " : " image " ,
" data " : img
} for img in img ]
messages = [ {
" role " :
" user " ,
" content " :
image_content + [ {
" type " : " text " ,
" text " : q
} for q in tasks ] ,
} ]
payload = {
" model " : " gpt-4-vision-preview " ,
" messages " : messages ,
" max_tokens " : self . max_tokens ,
" detail " : self . quality ,
}
for attempt in range ( self . max_retries ) :
try :
response = requests . post (
" https://api.openai.com/v1/chat/completions " ,
headers = headers ,
json = payload ,
timeout = self . timeout_seconds ,
)
response . raise_for_status ( )
answer = response . json (
) [ " choices " ] [ 0 ] [ " message " ] [ " content " ] [ " text " ]
return GPT4VisionResponse ( answer = answer )
except requests . exceptions . HTTPError as error :
self . logger . error (
f " HTTP error: { error . response . status_code } , { error . response . text } "
)
if error . response . status_code in [ 429 , 500 , 503 ] :
# Exponential backoff = 429(too many requesys)
# And 503 = (Service unavailable) errors
time . sleep ( self . backoff_factor * * attempt )
else :
break
except requests . exceptions . RequestException as error :
self . logger . error ( f " Request error: { error } " )
time . sleep ( self . backoff_factor * * attempt )
except Exception as error :
self . logger . error (
f " Unexpected Error: { error } try optimizing your api key and try "
" again " )
raise error from None
raise TimeoutError ( " API Request timed out after multiple retries " )
def run ( self , task : str , img : str ) - > str :
"""
Runs the GPT - 4 Vision API
Parameters :
- - - - - - - - - - -
task : str
The task to be used for the API request
img : str
The image to be used for the API request
Returns :
- - - - - - - -
out : str
The response from the API request
Example :
- - - - - - - -
>> > gpt4vision = GPT4Vision ( )
>> > task = " A painting of a dog "
>> > img = " https://cdn.openai.com/dall-e/encoded/feats/feats_01J9J5ZKJZJY9.png "
>> > answer = gpt4vision . run ( task , img )
>> > print ( answer )
"""
if self . dashboard :
self . print_dashboard ( )
try :
response = self . client . chat . completions . create (
model = self . model ,
model = " gpt-4-vision-preview " ,
messages = [ {
" role " :
" user " ,
" content " : [
{
" type " : " text " ,
" text " : f " { task } "
" text " : task
} ,
{
" type " : " image_url " ,
" image_url " : f " { img } " ,
" image_url " : {
" url " : str ( img ) ,
} ,
} ,
] ,
} ] ,
max_tokens = self . max_tokens ,
)
out = response . choices [ 0 ] . text
out = print ( response . choices [ 0 ] )
# out = self.clean_output(out)
return out
except Exception as error :
print (
colored (
( f " Error when calling GPT4Vision, Error: { error } Try optimizing "
" your key, and try again " ) ,
" red " ,
) )
except openai . OpenAIError as e :
# logger.error(f"OpenAI API error: {e}")
return f " OpenAI API error: Could not process the image. { e } "
except Exception as e :
return f " Unexpected error occurred while processing the image. { e } "
def clean_output ( self , output : str ) :
# Regex pattern to find the Choice object representation in the output
pattern = r " Choice \ (.*? \ (content= \" (.*?) \" .*? \ ) \ ) "
match = re . search ( pattern , output , re . DOTALL )
if match :
# Extract the content from the matched pattern
content = match . group ( 1 )
# Replace escaped quotes to get the clean content
content = content . replace ( r " \" " , ' " ' )
print ( content )
else :
print ( " No content found in the output. " )
async def arun ( self , task : str , img : str ) - > str :
async def arun ( self , task : str , img : str ) :
"""
Asynchronous run method for GPT - 4 Vision
Arun is an async version of run
Parameters :
- - - - - - - - - - -
task : str
The task to be used for the API request
img : str
The image to be used for the API request
Returns :
- - - - - - - -
out : str
The response from the API request
Task : str
The task to run
Img : str
The image to run the task on
Example :
- - - - - - - -
>> > gpt4vision = GPT4Vision ( )
>> > task = " A painting of a dog "
>> > img = " https://cdn.openai.com/dall-e/encoded/feats/feats_01J9J5ZKJZJY9.png "
>> > answer = await gpt4vision . arun ( task , img )
>> > print ( answer )
"""
try :
response = await self . client . chat . completions . create (
model = self . model ,
model = " gpt-4-vision-preview " ,
messages = [ {
" role " :
" user " ,
" content " : [
{
" type " : " text " ,
" text " : f " { task } "
" text " : task
} ,
{
" type " : " image_url " ,
" image_url " : f " { img } " ,
" image_url " : {
" url " : img ,
} ,
} ,
] ,
} ] ,
max_tokens = self . max_tokens ,
)
out = response . choices [ 0 ] . text
return out
except Exception as error :
print (
return print ( response . choices [ 0 ] )
except openai . OpenAIError as e :
# logger.error(f"OpenAI API error: {e}")
return f " OpenAI API error: Could not process the image. { e } "
except Exception as e :
return f " Unexpected error occurred while processing the image. { e } "
def run_batch ( self , tasks_images : List [ Tuple [ str , str ] ] ) - > List [ str ] :
""" Process a batch of tasks and images """
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
futures = [
executor . submit ( self . run , task , img )
for task , img in tasks_images
]
results = [ future . result ( ) for future in futures ]
return results
async def run_batch_async ( self ,
tasks_images : List [ Tuple [ str , str ] ] ) - > List [ str ] :
""" Process a batch of tasks and images asynchronously """
loop = asyncio . get_event_loop ( )
futures = [
loop . run_in_executor ( None , self . run , task , img )
for task , img in tasks_images
]
return await asyncio . gather ( * futures )
async def run_batch_async_with_retries (
self , tasks_images : List [ Tuple [ str , str ] ] ) - > List [ str ] :
""" Process a batch of tasks and images asynchronously with retries """
loop = asyncio . get_event_loop ( )
futures = [
loop . run_in_executor ( None , self . run_with_retries , task , img )
for task , img in tasks_images
]
return await asyncio . gather ( * futures )
def print_dashboard ( self ) :
dashboard = print (
colored (
( f " Error when calling GPT4Vision, Error: { error } Try optimizing "
" your key, and try again " ) ,
" red " ,
f """
GPT4Vision Dashboard
- - - - - - - - - - - - - - - - - - -
Max Retries : { self . max_retries }
Model : { self . model }
Backoff Factor : { self . backoff_factor }
Timeout Seconds : { self . timeout_seconds }
Image Quality : { self . quality }
Max Tokens : { self . max_tokens }
""" ,
" green " ,
) )
return dashboard
def health_check ( self ) :
""" Health check for the GPT4Vision model """
try :
response = requests . get ( " https://api.openai.com/v1/engines " )
return response . status_code == 200
except requests . RequestException as error :
print ( f " Health check failed: { error } " )
return False
def sanitize_input ( self , text : str ) - > str :
"""
Sanitize input to prevent injection attacks .
Parameters :
text : str - The input text to be sanitized .
Returns :
The sanitized text .
"""
# Example of simple sanitization, this should be expanded based on the context and usage
sanitized_text = re . sub ( r " [^ \ w \ s] " , " " , text )
return sanitized_text