2024-11-13 08:13:06 +00:00
|
|
|
from loguru import logger
|
2024-11-19 12:07:35 +00:00
|
|
|
from mysql.connector import connect as connect_mysql, Error, OperationalError
|
2024-11-13 08:13:06 +00:00
|
|
|
from mysql.connector import errors as db_errors
|
|
|
|
|
|
|
|
from custom_decorators import singleton
|
|
|
|
|
|
|
|
|
|
|
|
@singleton
|
|
|
|
class Database:
|
|
|
|
def __init__(self, config):
|
|
|
|
self.connection = None
|
|
|
|
self.config = config
|
|
|
|
self.connect()
|
|
|
|
|
|
|
|
def connect(self):
|
|
|
|
"""Establish a new database connection."""
|
|
|
|
try:
|
2024-11-19 12:07:35 +00:00
|
|
|
self.connection = connect_mysql(**self.config)
|
2024-11-13 08:13:06 +00:00
|
|
|
if self.connection.is_connected():
|
|
|
|
logger.info("Connected to MySQL database")
|
2024-11-19 12:07:35 +00:00
|
|
|
except AttributeError as e:
|
|
|
|
e.args = (f'{e.args[0]} in MYSQL configuration',)
|
|
|
|
raise
|
2024-11-13 08:13:06 +00:00
|
|
|
except Error as e:
|
|
|
|
logger.info(f"Error while connecting to MySQL: {e}")
|
|
|
|
self.connection = None
|
|
|
|
|
|
|
|
def get_connection(self):
|
|
|
|
"""Get the database connection, with reconnection logic."""
|
|
|
|
if self.connection is None or not self.connection.is_connected():
|
|
|
|
logger.info("Reconnecting to the database...")
|
|
|
|
self.connect()
|
|
|
|
return self.connection
|
|
|
|
|
|
|
|
def close_connection(self):
|
|
|
|
if self.connection and self.connection.is_connected():
|
|
|
|
self.connection.close()
|
|
|
|
logger.info("MySQL connection is closed")
|
|
|
|
|
|
|
|
def execute_query(self, query, params=None):
|
|
|
|
"""Execute a query with optional parameters, supports transactions."""
|
|
|
|
cursor = None
|
|
|
|
try:
|
|
|
|
connection = self.get_connection()
|
|
|
|
cursor = connection.cursor()
|
|
|
|
cursor.execute(query, params)
|
|
|
|
return cursor
|
|
|
|
except OperationalError as e:
|
|
|
|
logger.info(f"Operational error: {e}. Attempting to reconnect...")
|
|
|
|
self.connect()
|
|
|
|
cursor = self.get_connection().cursor()
|
|
|
|
cursor.execute(query, params)
|
|
|
|
return cursor
|
|
|
|
except db_errors.Error as e:
|
|
|
|
logger.info(f"Database error: {e}")
|
|
|
|
raise
|
|
|
|
|
|
|
|
def commit(self):
|
|
|
|
"""Commit the current transaction."""
|
|
|
|
if self.connection:
|
|
|
|
self.connection.commit()
|
|
|
|
|
|
|
|
def rollback(self):
|
|
|
|
"""Rollback the current transaction."""
|
|
|
|
if self.connection:
|
|
|
|
self.connection.rollback()
|