67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
|
from loguru import logger
|
||
|
from mysql.connector import connect, Error, OperationalError
|
||
|
from mysql.connector import errors as db_errors
|
||
|
|
||
|
from custom_decorators import singleton
|
||
|
|
||
|
|
||
|
@singleton
|
||
|
class Database:
|
||
|
def __init__(self, config):
|
||
|
self.connection = None
|
||
|
self.config = config
|
||
|
self.connect()
|
||
|
|
||
|
def connect(self):
|
||
|
"""Establish a new database connection."""
|
||
|
try:
|
||
|
self.connection = connect(**self.config)
|
||
|
if self.connection.is_connected():
|
||
|
logger.info("Connected to MySQL database")
|
||
|
except Error as e:
|
||
|
logger.info(f"Error while connecting to MySQL: {e}")
|
||
|
self.connection = None
|
||
|
|
||
|
def get_connection(self):
|
||
|
"""Get the database connection, with reconnection logic."""
|
||
|
if self.connection is None or not self.connection.is_connected():
|
||
|
logger.info("Reconnecting to the database...")
|
||
|
self.connect()
|
||
|
return self.connection
|
||
|
|
||
|
def close_connection(self):
|
||
|
if self.connection and self.connection.is_connected():
|
||
|
self.connection.close()
|
||
|
logger.info("MySQL connection is closed")
|
||
|
|
||
|
def execute_query(self, query, params=None):
|
||
|
"""Execute a query with optional parameters, supports transactions."""
|
||
|
cursor = None
|
||
|
try:
|
||
|
connection = self.get_connection()
|
||
|
cursor = connection.cursor()
|
||
|
cursor.execute(query, params)
|
||
|
return cursor
|
||
|
except OperationalError as e:
|
||
|
logger.info(f"Operational error: {e}. Attempting to reconnect...")
|
||
|
self.connect()
|
||
|
cursor = self.get_connection().cursor()
|
||
|
cursor.execute(query, params)
|
||
|
return cursor
|
||
|
except db_errors.Error as e:
|
||
|
logger.info(f"Database error: {e}")
|
||
|
raise
|
||
|
finally:
|
||
|
if cursor:
|
||
|
cursor.close()
|
||
|
|
||
|
def commit(self):
|
||
|
"""Commit the current transaction."""
|
||
|
if self.connection:
|
||
|
self.connection.commit()
|
||
|
|
||
|
def rollback(self):
|
||
|
"""Rollback the current transaction."""
|
||
|
if self.connection:
|
||
|
self.connection.rollback()
|