In this post, we will see how to manage a SQLite database with Python.
But first of all, what is SQLite?
“SQLite is a lightweight, file-based relational database management system that is widely used due to its simplicity, portability, and ease of integration into various programming languages.
It provides a self-contained, serverless, and zero-configuration architecture, making it an excellent choice for small to medium-sized applications and projects.
SQLite differs from traditional client-server databases in that the entire database is contained within a single file, making it easy to distribute, copy, or backup. It stores data in a structured manner, allowing us to define tables, columns, and relationships similar to other relational databases.”
For all information, we can go the the official website: www.sqlite.org.
One of the significant advantages of SQLite, is its seamless integration with programming languages like Python. The Python Standard Library includes the sqlite3 module, which provides a convenient and intuitive way to interact with SQLite databases.
For this post, we are going to create a class called “CoreSQLite” that we will use to manage all the CRUD operations for a table called “TabUser” created in a DB called “UserDb”
CHECKING DB:
import sqlite3
import os
class CoreSQLite:
connection = None
def __init__(self, dbName):
self.dbName = dbName
self.dbFileName = dbName + '.db'
self.checkDB()
def checkDB(self):
# Get the current working directory
working_dir = os.getcwd()
# Construct the full path to the database file
checkDbFile = os.path.join(working_dir, self.dbFileName)
# checking the existence of the database
if not os.path.exists(checkDbFile):
# Create the database file by opening a connection
self.connection = sqlite3.connect(
'file:' + checkDbFile + '?mode=rwc', uri=True)
# Create the TabUser table
self._createTabUser()
# Close the connection
self.connection.close()
print(
f"SQLite database file '{self.dbFileName}' created successfully.")
else:
print(f"SQLite database file '{self.dbFileName}' already exists.")
def _createTabUser(self):
# Create a cursor object to execute SQL statements
cursor = self.connection.cursor()
# Create the TabUser table with columns Id, Username, and Password
cursor.execute(
'CREATE TABLE TabUser (Id INTEGER PRIMARY KEY AUTOINCREMENT, Username TEXT(250),Password TEXT(20))')
self.connection.commit()
cursor.close()
[MAIN.PY]
from CoreSQLite import CoreSQLite
objDb = CoreSQLite('UserDb')
If we run the main.py file, the following will be the result:
Using a tool like DBeaver, we can check if the table TabUser has been created in the database:
INSERTING A NEW USER:
import sqlite3
import os
class CoreSQLite:
connection = None
cursor = None
def __init__(self, dbName):
self.dbName = dbName
self.dbFileName = dbName + '.db'
self.checkDB()
def checkDB(self): ...
def _createTabUser(self): ...
def _openCursor(self):
# Get the current working directory
working_dir = os.getcwd()
# Construct the full path to the database file
dbFile = os.path.join(working_dir, self.dbFileName)
self.connection = sqlite3.connect(dbFile)
self.cursor = self.connection.cursor()
# Create (Insert) operation
def create_user(self, userName, password):
self._openCursor()
self.cursor.execute(
'INSERT INTO TabUser (username, password) VALUES (?, ?)', (userName, password))
self.connection.commit()
self.cursor.close()
# Close the connection
self.connection.close()
print(f"User {userName} {password} created successfully.")
[MAIN.PY]
from CoreSQLite import CoreSQLite
objDb = CoreSQLite('UserDb')
# creating 20 users
for i in range(1, 20):
objDb.create_user(f"User{i}", f"Pass123_{i}")
If we run the main.py file, the following will be the result:
SELECTING ALL USERS:
import sqlite3
import os
class CoreSQLite:
connection = None
cursor = None
def __init__(self, dbName): ...
def checkDB(self): ...
def _createTabUser(self): ...
def _openCursor(self): ...
# Create (Insert) operation
def create_user(self, userName, password): ...
# Read operation
def get_users(self):
self._openCursor()
self.cursor.execute('SELECT * FROM TabUser')
rows = self.cursor.fetchall()
self.cursor.close()
# Close the connection
self.connection.close()
for row in rows:
print(f"ID: {row[0]}, UserName: {row[1]}, Password: {row[2]}")
[MAIN.PY]
from CoreSQLite import CoreSQLite
objDb = CoreSQLite('UserDb')
objDb.get_users()
If we run the main.py file, the following will be the result:
UPDATING AN USER:
import sqlite3
import os
class CoreSQLite:
connection = None
cursor = None
def __init__(self, dbName): ...
def checkDB(self): ...
def _createTabUser(self): ...
def _openCursor(self): ...
# Create (Insert) operation
def create_user(self, userName, password): ...
# Read operation
def get_users(self): ...
# Update operation
def update_user(self, id, username, password):
self._openCursor()
self.cursor.execute(
'UPDATE TabUser SET username = ?, password = ? WHERE id = ?', (username, password, id))
self.connection.commit()
self.cursor.close()
# Close the connection
self.connection.close()
print("User updated successfully.")
[MAIN.PY]
from CoreSQLite import CoreSQLite
objDb = CoreSQLite('UserDb')
# Create an User
objDb.create_user('DamianoUser', 'Pass123')
# Select Users
objDb.get_users()
# update User
objDb.update_user(1, 'UpdateDamianoUser', 'UpdatePass123')
# Select Users
objDb.get_users()
If we run the main.py file, the following will be the result:
DELETING AN USER:
import sqlite3
import os
class CoreSQLite:
connection = None
cursor = None
def __init__(self, dbName): ...
def checkDB(self): ...
def _openCursor(self): ...
# Create (Insert) operation
def create_user(self, userName, password): ...
# Read operation
def get_users(self): ...
# Update operation
def update_user(self, id, username, password): ...
# Delete operation
def delete_user(self, id):
self._openCursor()
self.cursor.execute('DELETE FROM TabUser WHERE id = ?', (id,))
self.connection.commit()
self.cursor.close()
# Close the connection
self.connection.close()
print("User deleted successfully.")
[MAIN.PY]
from CoreSQLite import CoreSQLite
objDb = CoreSQLite('UserDb')
# Create an User
objDb.create_user('DamianoUser', 'Pass123')
# Select Users
objDb.get_users()
# Delete User
objDb.delete_user(1)
# Select Users
objDb.get_users()
If we run the main.py file, the following will be the result:
[CORESQLITE.PY]
import sqlite3
import os
class CoreSQLite:
connection = None
cursor = None
def __init__(self, dbName):
self.dbName = dbName
self.dbFileName = dbName + '.db'
self.checkDB()
def checkDB(self):
# Get the current working directory
working_dir = os.getcwd()
# Construct the full path to the database file
checkDbFile = os.path.join(working_dir, self.dbFileName)
# checking the existence of the database
if not os.path.exists(checkDbFile):
# Create the database file by opening a connection
self.connection = sqlite3.connect(
'file:' + checkDbFile + '?mode=rwc', uri=True)
# Create the TabUser table
self._createTabUser()
# Close the connection
self.connection.close()
print(
f"SQLite database file '{self.dbFileName}' created successfully.")
else:
print(f"SQLite database file '{self.dbFileName}' already exists.")
def _createTabUser(self):
# Create a cursor object to execute SQL statements
cursor = self.connection.cursor()
# Create the TabUser table with columns Id, Username, and Password
cursor.execute(
'CREATE TABLE TabUser (Id INTEGER PRIMARY KEY AUTOINCREMENT, Username TEXT(250),Password TEXT(20))')
self.connection.commit()
cursor.close()
def _openCursor(self):
# Get the current working directory
working_dir = os.getcwd()
# Construct the full path to the database file
dbFile = os.path.join(working_dir, self.dbFileName)
self.connection = sqlite3.connect(dbFile)
self.cursor = self.connection.cursor()
# Create (Insert) operation
def create_user(self, userName, password):
self._openCursor()
self.cursor.execute(
'INSERT INTO TabUser (username, password) VALUES (?, ?)', (userName, password))
self.connection.commit()
self.cursor.close()
# Close the connection
self.connection.close()
print(f"User {userName} {password} created successfully.")
# Read operation
def get_users(self):
self._openCursor()
self.cursor.execute('SELECT * FROM TabUser')
rows = self.cursor.fetchall()
self.cursor.close()
# Close the connection
self.connection.close()
for row in rows:
print(f"ID: {row[0]}, UserName: {row[1]}, Password: {row[2]}")
# Update operation
def update_user(self, id, username, password):
self._openCursor()
self.cursor.execute(
'UPDATE TabUser SET username = ?, password = ? WHERE id = ?', (username, password, id))
self.connection.commit()
self.cursor.close()
# Close the connection
self.connection.close()
print("User updated successfully.")
# Delete operation
def delete_user(self, id):
self._openCursor()
self.cursor.execute('DELETE FROM TabUser WHERE id = ?', (id,))
self.connection.commit()
self.cursor.close()
# Close the connection
self.connection.close()
print("User deleted successfully.")