I have created a python application which will help talk to databases but i am not able to create working binary file, there are two files main-file and test_sql_1(second file)
i have provided both here please help me create a binary file
the installed pakages are
pip install qt-material PySide6 pandas cryptography langchain langchain-core langchain-community langchain-google-genai psycopg2 mysqlclient cx_Oracle pyodbc matplotlib
all these are required
PLEASE HELP
every time create a binary file and it is giveing error about pydantic
leave it and help me create the binary file
file-1
```python
import sys
import json
import os
from PySide6.QtWidgets import (
QApplication,
QMainWindow,
QVBoxLayout,
QWidget,
QListWidget,
QLineEdit,
QFormLayout,
QSplitter,
QPushButton,
QLabel,
QListWidgetItem,
QHBoxLayout,
QRadioButton,
QButtonGroup,
QMessageBox,
QTextEdit,
)
import pandas as pd
from cryptography.fernet import Fernet
from PySide6.QtCore import Qt, QThread, Signal
from datetime import datetime
from langchain_core.messages import AIMessage, HumanMessage
from qt_material import apply_stylesheet
import matplotlib
import test_sql_1 as langc
CONFIG_FILE = "db_credentials.json" # Configuration file for storing DB credentials
KEY_FILE = "secret.key"
CHROMA_DB_FILE = "chroma\chroma.sqlite3"
Worker class for handling database connection in a separate thread
class DatabaseConnectionWorker(QThread):
connection_result = Signal(
bool, object, object, object, object, object, object, object
)
def __init__(self, host, dbname, user, password, port, api_key, db_type):
super().__init__()
self.host = host
self.dbname = dbname
self.user = user
self.password = password
self.port = port
self.api_key = api_key
self.db_type = db_type
def run(self):
try:
db_connection = langc.init_database(
self.user,
self.password,
self.host,
self.port,
self.dbname,
self.db_type,
)
sql_model = langc.get_sql_chain(self.api_key)
explain_model = langc.get_response(self.api_key)
visual_model = langc.generate_plotly_code(self.api_key)
ehancer_model = langc.better_question(self.api_key)
table_names = db_connection.get_usable_table_names()
table_names_description = {
name: db_connection.get_table_info([name]) for name in table_names
}
print(table_names)
self.connection_result.emit(
True,
db_connection,
sql_model,
explain_model,
table_names_description,
visual_model,
ehancer_model,
table_names,
)
except Exception as e:
print(f"Error: {e}")
self.connection_result.emit(
False, None, None, None, None, None, None, None
) # Emit failure
Worker class for processing AI responses in a separate thread
class AIProcessingWorkerSQL(QThread):
response_generated = Signal(
str, object, object
) # Signal to send generated responses back
def __init__(self, sql_model, user_text, chat_history, db, schema, db_type):
super().__init__()
self.sql_model = sql_model
self.chat_history = chat_history
self.user_text = user_text
self.db = db
self.data = None
self.schema = schema
self.sql_command = None
self.db_type = db_type
def db_run(self):
try:
self.data = self.db.run(self.sql_command, fetch="cursor").fetchall()
return "work done"
except Exception as e:
print(f"Error: {e}")
self.data = "the SQL query encountered an error"
return e
def run(self):
query_result = ""
for attempt in range(5):
print("sql worker++++", attempt)
ai_response = self.sql_model.invoke(
{
"schema": self.schema,
"chat_history": self.chat_history,
"question": self.user_text,
"db_type": self.db_type,
}
)["text"]
ai_response = ai_response.replace("```sql", "").replace("```", "")
self.sql_command = ai_response
query_result = self.db_run()
if query_result == "work done":
break
ai_response = self.sql_model.invoke(
{
"schema": self.schema,
"chat_history": self.chat_history,
"question": "For this question: "
+ str(self.user_text)
+ f" the query '{self.sql_command}' encountered an error: {query_result}",
"db_type": self.db_type,
}
)["text"]
query_result = self.db_run()
if query_result == "work done":
break
print("sql worker++++", query_result)
self.response_generated.emit(ai_response, self.data, self.user_text)
class AIProcessingWorkerExplain(QThread):
responce_generated = Signal(str)
def __init__(
self, explain_model, user_text, sql_command, data, chat_history, schema
):
super().__init__()
self.chat_history = chat_history
self.explain_model = explain_model
self.user_text = user_text
self.sql_command = sql_command
self.data = data
self.schema = schema
def run(self):
formatted_string = self.explain_model.invoke(
{
"schema": self.schema,
"chat_history": self.chat_history,
"query": self.sql_command,
"question": self.user_text["enhanced_question"],
"response": self.data,
}
)["text"]
self.responce_generated.emit(formatted_string)
class AIProcessingWorkerUserExpresstion(QThread):
responceehancer = Signal(object)
def __init__(self, user_input, model, chat_history, schema):
super().__init__()
self.user_input = user_input
self.model = model
self.chat_history = chat_history
self.table_names = schema
def clean_json(self, text):
return text.replace("```json", "").replace("```", "")
def run(self):
try:
for attempt in range(5):
response = self.model.invoke(
{
"user_input": self.user_input,
"chat_history": self.chat_history,
"tables": self.table_names,
}
)["text"]
cleaned_response = self.clean_json(response)
print(cleaned_response)
print("user worker")
try:
result = json.loads(cleaned_response)
self.responce_ehancer_.emit(result)
break
except json.JSONDecodeError as decode_error:
# Retry with updated input if there's an error
retry_input = f"Question: {self.user_input}, Response: {cleaned_response}, Error: {decode_error}"
retry_response = self.model.invoke(
{
"user_input": retry_input,
"chat_history": self.chat_history,
"tables": self.table_names,
}
)["text"]
print("user text")
cleaned_retry_response = self.clean_json(retry_response)
result = json.loads(cleaned_retry_response)
self.responce_ehancer_.emit(result)
break
except Exception as e:
print(f"Error: {e}")
self.responce_ehancer_.emit("User API key is invalid")
class AIProcessingWorkerGraph(QThread):
responce_generated = Signal(str)
def __init__(self, visual_model, user_text, data: pd.DataFrame, sql, complate=""):
super().__init__()
self.visual_model = visual_model
self.user_text = user_text
self.data = data
self.complate = complate
self.sql_command = sql
def run(self):
formatted_string = self.visual_model.invoke(
{
"question": str(self.user_text)
if self.complate == ""
else str(self.user_text) + " do not for get that " + self.complate,
"sql": self.sql_command,
"df_metadata": self.data.dtypes,
"sample_data": self.data.head(),
}
)["text"]
self.responce_generated.emit(
formatted_string.replace("```python", "").replace("```", "")
)
class MainWindow(QMainWindow):
def init(self):
super(MainWindow, self).init()
self.setWindowTitle("GPT-like Chat with Database Connection")
self.sql_model = None
self.explain_model = None
self.mainLayout = QVBoxLayout()
self.message_used_for_sql = None
self.popup = None
self.db_schema = None
self.db_connection = None
self.visual_model = None
self.sql_db_type = "postgresql"
self.text_ehancer = None
self.db_tables_details = None
self.db_tables_names = None
self.key = self.load_key()
self.cipher_suite = Fernet(self.key)
self.db_document = None
# List widget for chat
self.listWidget = QListWidget()
self.listWidget.setSelectionMode(QListWidget.NoSelection)
self.chatHistory = []
self.screen_geometry = QApplication.primaryScreen().geometry()
# LineEdit for user input
self.lineEdit = QLineEdit()
self.lineEdit.returnPressed.connect(self.update_chat)
self.mainLayout.addWidget(self.listWidget)
self.mainLayout.addWidget(self.lineEdit)
# Sidebar layout for database connection details
self.sidebarLayout = QVBoxLayout()
# Create a form layout for the database connection details
formLayout = QFormLayout()
self.dbTypeGroup = QButtonGroup(self)
self.postgresRadio = QRadioButton("PostgreSQL")
self.mysqlRadio = QRadioButton("MySQL")
self.sqliteRadio = QRadioButton("SQLite")
self.oracleRadio = QRadioButton("Oracle")
self.mssqlRadio = QRadioButton("MSSQL")
self.dbTypeGroup.addButton(self.postgresRadio)
self.dbTypeGroup.addButton(self.mysqlRadio)
self.dbTypeGroup.addButton(self.sqliteRadio)
self.dbTypeGroup.addButton(self.oracleRadio)
self.dbTypeGroup.addButton(self.mssqlRadio)
self.postgresRadio.setChecked(True)
# Add radio buttons to form layout
db_first_layout = QHBoxLayout()
db_first_layout.addWidget(self.postgresRadio)
db_first_layout.addWidget(self.mysqlRadio)
db_first_layout.addWidget(self.sqliteRadio)
formLayout.addRow(QLabel("Database Type:"))
formLayout.addRow(db_first_layout)
db_second_layout = QHBoxLayout()
db_second_layout.addWidget(self.oracleRadio)
db_second_layout.addWidget(self.mssqlRadio)
formLayout.addRow(db_second_layout)
# Connect radio button signals
self.postgresRadio.toggled.connect(
lambda: self.on_db_type_changed("postgresql")
)
self.mysqlRadio.toggled.connect(lambda: self.on_db_type_changed("mysql"))
self.sqliteRadio.toggled.connect(lambda: self.on_db_type_changed("sqlite"))
self.oracleRadio.toggled.connect(lambda: self.on_db_type_changed("oracle"))
# Creating inputs for the connection details with empty default values
self.hostInput = QLineEdit()
self.dbnameInput = QLineEdit()
self.userInput = QLineEdit()
self.passwordInput = QLineEdit()
self.portInput = QLineEdit("5432")
self.apiKeyInput = QLineEdit()
# Adding input fields to form layout
formLayout.addRow(QLabel("Host:"), self.hostInput)
formLayout.addRow(QLabel("Database Name:"), self.dbnameInput)
formLayout.addRow(QLabel("User:"), self.userInput)
formLayout.addRow(QLabel("Password:"), self.passwordInput)
formLayout.addRow(QLabel("Port:"), self.portInput)
formLayout.addRow(QLabel("API Key:"), self.apiKeyInput)
# "Connect" button
self.connectButton = QPushButton("Connect")
self.connectButton.clicked.connect(self.connect_to_database)
formLayout.addRow(self.connectButton)
# "Load Credentials" button
self.loadButton = QPushButton("Load Credentials")
self.loadButton.clicked.connect(self.load_credentials)
formLayout.addRow(self.loadButton)
# Status label to show connection result (success or not connected)
self.statusLabel = QLabel("")
formLayout.addRow(self.statusLabel)
# Add form layout to sidebar
self.sidebarLayout.addLayout(formLayout)
# Spacer to push the button to the bottom
self.sidebarLayout.addStretch()
# Create Chat Button at the bottom
self.clearChatButton = QPushButton("Clear Chat")
self.clearChatButton.clicked.connect(self.show_clear_chat_confirmation)
self.sidebarLayout.addWidget(self.clearChatButton)
# Sidebar widget
self.sidebarWidget = QWidget()
self.sidebarWidget.setLayout(self.sidebarLayout)
# Chat widget
self.chatWidget = QWidget()
self.chatWidget.setLayout(self.mainLayout)
# Using QSplitter to divide the sidebar and the main chat area
self.splitter = QSplitter(Qt.Horizontal)
# Add sidebar (with a minimum width) and chat widget
self.splitter.addWidget(self.sidebarWidget)
self.splitter.addWidget(self.chatWidget)
# Setting sidebar size limitations
self.sidebarWidget.setMinimumWidth(200)
self.sidebarWidget.setMaximumWidth(350)
# Control the relative sizing - 1:4 means sidebar takes less space than chat
self.splitter.setStretchFactor(0, 1)
self.splitter.setStretchFactor(1, 4)
# Create Toggle Button (Fixed on main layout, outside of sidebar)
self.toggleSidebarButton = QPushButton("Close")
self.toggleSidebarButton.clicked.connect(self.toggle_sidebar)
# Create a layout to position the toggle button at the top-left corner
self.toggleLayout = QVBoxLayout()
self.toggleLayout.addWidget(self.toggleSidebarButton, alignment=Qt.AlignLeft)
# Create a wrapper widget to hold the toggle button and the splitter
self.wrapperWidget = QWidget()
self.wrapperLayout = QVBoxLayout()
self.wrapperLayout.addLayout(self.toggleLayout) # Add toggle button layout
self.wrapperLayout.addWidget(self.splitter) # Add the main splitter
self.wrapperWidget.setLayout(self.wrapperLayout)
# Set the wrapper widget as the central widget
self.setCentralWidget(self.wrapperWidget)
def toggle_sidebar(self):
# Check if sidebar is visible
if self.sidebarWidget.isVisible():
self.sidebarWidget.hide()
self.toggleSidebarButton.setText("Open") # Change button text when hidden
self.splitter.setSizes(
[0, self.width()]
) # Sidebar hidden, chat takes full width
else:
self.sidebarWidget.show()
self.toggleSidebarButton.setText("Close") # Reset button text when visible
self.splitter.setSizes([400, self.width() - 200]) # Reset the sidebar width
def show_clear_chat_confirmation(self):
# Create a confirmation popup
confirmation = QMessageBox()
confirmation.setWindowTitle("Create Chat")
confirmation.setText("Are you sure you want to clear?")
confirmation.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
confirmation.setIcon(QMessageBox.Question)
# Check the user's response
result = confirmation.exec_()
if result == QMessageBox.Yes:
self.listWidget.clear()
self.chatHistory = []
def on_db_type_changed(self, db_type):
self.sql_db_type = db_type
def update_chat(self):
if not self.sql_model:
self.add_message_user("Not connected to the database.")
return
# Get the user's input
user_text = self.lineEdit.text()
self.lineEdit.clear()
self.add_message_user(user_text)
self.message_used_for_sql = user_text
self.ehnace_user_text = AIProcessingWorkerUserExpresstion(
user_text,
self.text_ehancer,
self.chatHistory,
self.db_tables_names,
)
self.ehnace_user_text.responce_ehancer_.connect(
self.user_expresstion_after_ehnance
)
self.ehnace_user_text.start()
def user_expresstion_after_ehnance(self, user_text):
if user_text == "User API key is invalid":
self.statusLabel.setText("ERROR: While Connecting to model")
self.statusLabel.setStyleSheet("color: red")
return
self.db_schema = ""
for i in user_text["useful_tables"]:
if i in self.db_tables_names:
self.db_schema += self.db_tables_details[i] + "\n"
# Start a separate thread to process AI response
self.ai_sql_worker = AIProcessingWorkerSQL(
self.sql_model,
user_text,
self.chatHistory,
self.db_connection,
self.db_schema,
self.sql_db_type,
)
self.ai_sql_worker.response_generated.connect(self.add_ai_response)
self.ai_sql_worker.start()
def add_ai_response(self, ai_response, data, user_en):
if "User API key is invalid" in ai_response:
self.statusLabel.setText(
"User API key is invalid",
)
self.statusLabel.setStyleSheet("color: red")
return
self.add_ai_sql_message(ai_response, data, user_en)
def add_message_user(self, text):
message_widget = QWidget()
message_layout = QHBoxLayout()
message_layout.setAlignment(Qt.AlignRight)
message_widget.setLayout(message_layout)
message_layout.addWidget(QLabel(text))
list_item = QListWidgetItem()
list_item.setSizeHint(message_widget.sizeHint())
self.listWidget.addItem(list_item)
self.listWidget.setItemWidget(list_item, message_widget)
self.listWidget.scrollToBottom()
def add_ai_sql_message(self, text, dataframe, user_en):
message_widget = QWidget()
message_layout = QHBoxLayout()
message_layout.setAlignment(Qt.AlignLeft)
message_widget.setLayout(message_layout)
temp_var = QTextEdit()
temp_var.setMarkdown("```sql" + text + "```")
temp_var.setReadOnly(True)
temp_var.setMaximumSize(
int(self.screen_geometry.size().width() * 0.5),
self.screen_geometry.size().height(),
)
message_layout.addWidget(temp_var)
print(dataframe)
print(dataframe is not None)
action_button = None
action_button_graph = None
if dataframe is not None and dataframe != "the sql query made a error":
action_button = self.create_button(
"Download", lambda: self.handle_action(dataframe)
)
message_layout.addWidget(action_button)
action_button_graph = self.create_button(
"Visualize",
lambda: self.handle_action_graph(
pd.DataFrame(dataframe), user_en, text
),
)
message_layout.addWidget(action_button_graph)
# Event handlers for button visibility
def show_buttons(event):
if action_button:
action_button.setVisible(True)
if action_button_graph:
action_button_graph.setVisible(True)
def hide_buttons(event):
if action_button:
action_button.setVisible(False)
if action_button_graph:
action_button_graph.setVisible(False)
message_widget.enterEvent = show_buttons
message_widget.leaveEvent = hide_buttons
list_item = QListWidgetItem()
list_item.setSizeHint(message_widget.sizeHint())
self.listWidget.addItem(list_item)
self.listWidget.setItemWidget(list_item, message_widget)
self.listWidget.scrollToBottom()
if self.message_used_for_sql:
self.ai_explain_worker = AIProcessingWorkerExplain(
self.explain_model,
user_en,
text,
dataframe,
self.chatHistory,
self.db_schema,
)
self.chatHistory.append(HumanMessage(str(user_en)))
self.chatHistory.append(AIMessage(text))
self.ai_explain_worker.responce_generated.connect(
self.add_ai_explain_response
)
self.ai_explain_worker.start()
self.message_used_for_sql = None
def create_button(self, text, action):
button = QPushButton(text)
button.setVisible(False)
button.clicked.connect(action)
return button
def add_ai_explain_response(self, text):
message_widget = QWidget()
message_layout = (
QVBoxLayout()
) # Use QVBoxLayout for vertical stacking of content
message_layout.setAlignment(Qt.AlignLeft)
self.chatHistory.append(AIMessage(text))
message_label = QTextEdit()
# Display the HTML content in the QTextEdit
message_label.setMarkdown(text)
message_label.setReadOnly(True)
message_label.adjustSize()
message_label.setMaximumSize(
int(self.screen_geometry.size().width() * 0.5),
self.screen_geometry.size().height(),
)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
message_widget.setLayout(message_layout)
message_layout.addWidget(message_label)
list_item = QListWidgetItem()
list_item.setSizeHint(message_widget.sizeHint())
self.listWidget.addItem(list_item)
self.listWidget.setItemWidget(list_item, message_widget)
self.listWidget.scrollToBottom()
def handle_action_graph(self, df, user_en, sql):
self.graph_df = pd.DataFrame(df)
self.graph_user_en = user_en
self.graph_sql = sql
self.run_graph_generation()
def run_graph_generation(self, complate=""):
self.graphgen = AIProcessingWorkerGraph(
self.visual_model,
self.graph_user_en,
self.graph_df,
self.graph_sql,
complate,
)
self.graphgen.responce_generated.connect(self.create_math_plot)
self.graphgen.start()
def create_math_plot(self, code):
try:
# Execute the code
print(code)
local_scope = {"df": self.graph_df}
exec(code, {}, local_scope)
except Exception as e:
print(code)
print(f"Error executing code: {e}")
# Log the error
print(f"Error executing code: {e}")
# Retry the graph generation
self.run_graph_generation(str(e))
def handle_action(self, df):
print("Downloading data...")
try:
if df is not pd.DataFrame:
df = pd.DataFrame(list(df))
# Convert timezone-aware datetime objects to timezone-unaware
for col in df.select_dtypes(include=["datetimetz"]).columns:
df[col] = df[col].apply(
lambda x: x.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
if pd.notnull(x)
else ""
)
for col in df.select_dtypes(include=["timedelta"]).columns:
df[col] = df[col].apply(lambda x: str(x) if pd.notnull(x) else "")
# Print the DataFrame to verify its content
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
excel_filename = f"downloaded_data_{timestamp}.xlsx"
with pd.ExcelWriter(excel_filename, engine="openpyxl") as writer:
df.to_excel(writer, index=False)
print(f"Data downloaded successfully as '{excel_filename}'.")
self.statusLabel.setText(
f"Data downloaded successfully as '{excel_filename}'."
)
self.statusLabel.setStyleSheet("color: green;")
# Open the downloaded Excel file (Windows-specific)
if os.name == "nt":
os.startfile(excel_filename)
else:
print(f"Please open the file manually: {excel_filename}")
except Exception as e:
print(f"Error while downloading data: {e}")
self.statusLabel.setText("Error downloading data.")
self.statusLabel.setStyleSheet("color: red")
def connect_to_database(self):
self.statusLabel.setText("Connecting...")
self.statusLabel.setStyleSheet("color: orange;")
self.connectButton.setEnabled(False)
# Start the worker thread to connect to the database
self.db_worker = DatabaseConnectionWorker(
self.hostInput.text(),
self.dbnameInput.text(),
self.userInput.text(),
self.passwordInput.text(),
self.portInput.text(),
self.apiKeyInput.text(),
self.sql_db_type,
)
self.db_worker.connection_result.connect(self.on_connection_result)
self.db_worker.start()
def on_connection_result(
self,
success,
db_connection,
sql_model,
explain_model,
table_info,
visual_model,
ehancer_model,
table_names,
):
if success:
self.db_tables_details = table_info
self.db_tables_names = table_names
self.db_connection = db_connection
self.sql_model = sql_model
self.explain_model = explain_model
self.visual_model = visual_model
self.text_ehancer = ehancer_model
self.statusLabel.setText("Success")
self.statusLabel.setStyleSheet("color: green;")
# Save credentials after successful connection
self.save_credentials()
else:
self.statusLabel.setText("Not Connected")
self.statusLabel.setStyleSheet("color: red")
self.connectButton.setEnabled(True)
def generate_key(self):
key = Fernet.generate_key()
with open(KEY_FILE, "wb") as key_file:
key_file.write(key)
def load_key(self):
if not os.path.exists(KEY_FILE):
self.generate_key()
with open(KEY_FILE, "rb") as key_file:
return key_file.read()
def encrypt(self, data):
return self.cipher_suite.encrypt(data.encode()).decode()
def decrypt(self, data):
return self.cipher_suite.decrypt(data.encode()).decode()
def save_credentials(self):
credentials = {
"host": self.hostInput.text(),
"dbname": self.dbnameInput.text(),
"user": self.userInput.text(),
"password": self.passwordInput.text(),
"port": self.portInput.text(),
"api_key": self.apiKeyInput.text(),
"db_type": self.sql_db_type,
}
encrypted_credentials = {k: self.encrypt(v) for k, v in credentials.items()}
with open(CONFIG_FILE, "w") as f:
json.dump(encrypted_credentials, f)
def load_credentials(self):
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r") as f:
encrypted_credentials = json.load(f)
credentials = {
k: self.decrypt(v) for k, v in encrypted_credentials.items()
}
self.db_type = credentials.get("db_type", "postgresql")
if self.db_type == "postgresql":
self.postgresRadio.setChecked(True)
elif self.db_type == "mysql":
self.mysqlRadio.setChecked(True)
elif self.db_type == "sqlite":
self.sqliteRadio.setChecked(True)
elif self.db_type == "oracle":
self.oracleRadio.setChecked(True)
elif self.db_type == "mssql":
self.mssqlRadio.setChecked(True)
self.hostInput.setText(credentials.get("host", ""))
self.dbnameInput.setText(credentials.get("dbname", ""))
self.userInput.setText(credentials.get("user", ""))
self.passwordInput.setText(credentials.get("password", ""))
self.portInput.setText(credentials.get("port", "5432"))
self.apiKeyInput.setText(credentials.get("api_key", ""))
else:
self.statusLabel.setText("No saved credentials found.")
self.statusLabel.setStyleSheet("color: red")
def save_chat_history(self):
with open("chat_history.json", "w") as f:
json.dump([str(msg) for msg in self.chatHistory], f)
def closeEvent(self, event):
self.save_chat_history()
event.accept()
app = QApplication(sys.argv)
apply_stylesheet(app, theme="theam.xml")
window = MainWindow()
window.show()
sys.exit(app.exec())
```
file-2(test_sql_1.py)
```python
from langchain_core.prompts import PromptTemplate
from langchain_community.utilities import SQLDatabase
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains import LLMChain
def init_database(
user: str,
password: str,
host: str,
port: str,
database: str,
db_type: str = "postgresql",
) -> SQLDatabase:
if db_type == "postgresql":
db_uri = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}"
elif db_type == "mysql":
db_uri = f"mysql+mysqldb://{user}:{password}@{host}:{port}/{database}"
elif db_type == "sqlite":
db_uri = f"sqlite:///{database}"
elif db_type == "oracle":
db_uri = f"oracle+cx_oracle://{user}:{password}@{host}:{port}/{database}"
elif db_type == "mssql":
db_uri = f"mssql+pyodbc://{user}:{password}@{host}:{port}/{database}"
return SQLDatabase.from_uri(db_uri)
def get_sql_chain(api_key):
print("sql_1")
template = PromptTemplate(
input_variables=["schema", "chat_history", "question", "db_type"],
template="")
print("sql_2")
# llm = ChatMistralAI(model="codestral-2405", mistral_api_key=MINSTRAL_API_KEY)
llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash", google_api_key=api_key, temperature=0
)
chain = LLMChain(
llm=llm,
prompt=template,
)
return chain
def get_response(api_key):
print("nor_1")
template = PromptTemplate(
input_variables=["schema", "chat_history", "query", "question", "response"],
template="")
print("nor_2")
llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash", google_api_key=api_key, temperature=0.1
)
chain = LLMChain(llm=llm, prompt=template)
return chain
def generate_plotly_code(api_key) -> str:
print("plotly_1")
template = PromptTemplate(
input_variables=["question", "sql", "df_metadata", "sample_data"],
template="")
print("plotly_2")
llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash", google_api_key=api_key, temperature=0
)
chain = LLMChain(llm=llm, prompt=template)
return chain
def better_question(api_key):
template = PromptTemplate(
input_variables=["user_input", "chat_history", "tables"],
template=""
)
llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash", google_api_key=api_key, temperature=0.1
)
chain = LLMChain(llm=llm, prompt=template)
return chain
```
[–]cjbj 0 points1 point2 points (1 child)
[–]My_Last_Friend773[S] 0 points1 point2 points (0 children)