Python – Management of a SQLite database

By | 02/08/2023

In this post, we will see how to manage a SQLite database with Python.
But first of all, what is SQLite?
“SQLite is a lightweight, file-based relational database management system that is widely used due to its simplicity, portability, and ease of integration into various programming languages.
It provides a self-contained, serverless, and zero-configuration architecture, making it an excellent choice for small to medium-sized applications and projects.
SQLite differs from traditional client-server databases in that the entire database is contained within a single file, making it easy to distribute, copy, or backup. It stores data in a structured manner, allowing us to define tables, columns, and relationships similar to other relational databases.”

For all information, we can go the the official website: www.sqlite.org.

One of the significant advantages of SQLite, is its seamless integration with programming languages like Python. The Python Standard Library includes the sqlite3 module, which provides a convenient and intuitive way to interact with SQLite databases.
For this post, we are going to create a class called “CoreSQLite” that we will use to manage all the CRUD operations for a table called “TabUser” created in a DB called “UserDb”

CHECKING DB:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import sqlite3
import os
 
 
class CoreSQLite:
    connection = None
 
    def __init__(self, dbName):
        self.dbName = dbName
        self.dbFileName = dbName + '.db'
        self.checkDB()
 
    def checkDB(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        checkDbFile = os.path.join(working_dir,  self.dbFileName)
        # checking the existence of the database
        if not os.path.exists(checkDbFile):
            # Create the database file by opening a connection
            self.connection = sqlite3.connect(
                'file:' + checkDbFile + '?mode=rwc', uri=True)
            # Create the TabUser table
            self._createTabUser()
            # Close the connection
            self.connection.close()
            print(
                f"SQLite database file '{self.dbFileName}' created successfully.")
        else:
            print(f"SQLite database file '{self.dbFileName}' already exists.")
 
    def _createTabUser(self):
        # Create a cursor object to execute SQL statements
        cursor = self.connection.cursor()
        # Create the TabUser table with columns Id, Username, and Password
        cursor.execute(
            'CREATE TABLE TabUser (Id INTEGER PRIMARY KEY AUTOINCREMENT, Username TEXT(250),Password TEXT(20))')
        self.connection.commit()
        cursor.close()


[MAIN.PY]

1
2
3
from CoreSQLite import CoreSQLite
 
objDb = CoreSQLite('UserDb')

If we run the main.py file, the following will be the result:


Using a tool like DBeaver, we can check if the table TabUser has been created in the database:



INSERTING A NEW USER:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import sqlite3
import os
 
 
class CoreSQLite:
    connection = None
    cursor = None
 
    def __init__(self, dbName):
        self.dbName = dbName
        self.dbFileName = dbName + '.db'
        self.checkDB()
 
    def checkDB(self): ...
 
    def _createTabUser(self): ...
 
    def _openCursor(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        dbFile = os.path.join(working_dir,  self.dbFileName)
        self.connection = sqlite3.connect(dbFile)
        self.cursor = self.connection.cursor()
 
    # Create (Insert) operation
    def create_user(self, userName, password):
        self._openCursor()
        self.cursor.execute(
            'INSERT INTO TabUser (username, password) VALUES (?, ?)', (userName, password))
        self.connection.commit()
        self.cursor.close()    
        # Close the connection
        self.connection.close()
        print(f"User {userName} {password} created successfully.")


[MAIN.PY]

1
2
3
4
5
6
7
from CoreSQLite import CoreSQLite
 
objDb = CoreSQLite('UserDb')
 
# creating 20 users
for i in range(1, 20):
    objDb.create_user(f"User{i}", f"Pass123_{i}")

If we run the main.py file, the following will be the result:



SELECTING ALL USERS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import sqlite3
import os
 
 
class CoreSQLite:
    connection = None
    cursor = None
 
    def __init__(self, dbName): ...
 
    def checkDB(self): ...
 
    def _createTabUser(self): ...
 
    def _openCursor(self): ...
 
    # Create (Insert) operation
    def create_user(self, userName, password): ...
 
    # Read operation
    def get_users(self):
        self._openCursor()
        self.cursor.execute('SELECT * FROM TabUser')
        rows = self.cursor.fetchall()
        self.cursor.close()
        # Close the connection
        self.connection.close()
        for row in rows:
            print(f"ID: {row[0]}, UserName: {row[1]}, Password: {row[2]}")


[MAIN.PY]

1
2
3
4
5
from CoreSQLite import CoreSQLite
 
objDb = CoreSQLite('UserDb')
 
objDb.get_users()

If we run the main.py file, the following will be the result:



UPDATING AN USER:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import sqlite3
import os
 
 
class CoreSQLite:
    connection = None
    cursor = None
 
    def __init__(self, dbName): ...
 
    def checkDB(self): ...
 
    def _createTabUser(self): ...
 
    def _openCursor(self): ...
 
    # Create (Insert) operation
    def create_user(self, userName, password): ...
 
    # Read operation
    def get_users(self): ...
 
    # Update operation
    def update_user(self, id, username, password):
        self._openCursor()
        self.cursor.execute(
            'UPDATE TabUser SET username = ?, password = ? WHERE id = ?', (username, password, id))
        self.connection.commit()
        self.cursor.close() 
        # Close the connection
        self.connection.close()
        print("User updated successfully.")


[MAIN.PY]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from CoreSQLite import CoreSQLite
 
objDb = CoreSQLite('UserDb')
 
# Create an User
objDb.create_user('DamianoUser', 'Pass123')
 
# Select Users
objDb.get_users()
 
# update User
objDb.update_user(1, 'UpdateDamianoUser', 'UpdatePass123')
 
# Select Users
objDb.get_users()

If we run the main.py file, the following will be the result:



DELETING AN USER:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import sqlite3
import os
 
 
class CoreSQLite:
    connection = None
    cursor = None
 
    def __init__(self, dbName): ...
 
    def checkDB(self): ...
 
    def _openCursor(self): ...
 
    # Create (Insert) operation
    def create_user(self, userName, password): ...
 
    # Read operation
    def get_users(self): ...
 
    # Update operation
    def update_user(self, id, username, password): ...
 
    # Delete operation
    def delete_user(self, id):
        self._openCursor()
        self.cursor.execute('DELETE FROM TabUser WHERE id = ?', (id,))
        self.connection.commit()
        self.cursor.close()
        # Close the connection
        self.connection.close()
        print("User deleted successfully.")


[MAIN.PY]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from CoreSQLite import CoreSQLite
 
objDb = CoreSQLite('UserDb')
 
# Create an User
objDb.create_user('DamianoUser', 'Pass123')
 
# Select Users
objDb.get_users()
 
# Delete User
objDb.delete_user(1)
 
# Select Users
objDb.get_users()

If we run the main.py file, the following will be the result:



[CORESQLITE.PY]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import sqlite3
import os
 
 
class CoreSQLite:
    connection = None
    cursor = None
 
    def __init__(self, dbName):
        self.dbName = dbName
        self.dbFileName = dbName + '.db'
        self.checkDB()
 
    def checkDB(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        checkDbFile = os.path.join(working_dir,  self.dbFileName)
        # checking the existence of the database
        if not os.path.exists(checkDbFile):
            # Create the database file by opening a connection
            self.connection = sqlite3.connect(
                'file:' + checkDbFile + '?mode=rwc', uri=True)
            # Create the TabUser table
            self._createTabUser()
            # Close the connection
            self.connection.close()
            print(
                f"SQLite database file '{self.dbFileName}' created successfully.")
        else:
            print(f"SQLite database file '{self.dbFileName}' already exists.")
 
    def _createTabUser(self):
        # Create a cursor object to execute SQL statements
        cursor = self.connection.cursor()
        # Create the TabUser table with columns Id, Username, and Password
        cursor.execute(
            'CREATE TABLE TabUser (Id INTEGER PRIMARY KEY AUTOINCREMENT, Username TEXT(250),Password TEXT(20))')
        self.connection.commit()
        cursor.close()
 
    def _openCursor(self):
        # Get the current working directory
        working_dir = os.getcwd()
        # Construct the full path to the database file
        dbFile = os.path.join(working_dir,  self.dbFileName)
        self.connection = sqlite3.connect(dbFile)
        self.cursor = self.connection.cursor()
 
    # Create (Insert) operation
    def create_user(self, userName, password):
        self._openCursor()
        self.cursor.execute(
            'INSERT INTO TabUser (username, password) VALUES (?, ?)', (userName, password))
        self.connection.commit()
        self.cursor.close()   
        # Close the connection
        self.connection.close()
        print(f"User {userName} {password} created successfully.")
 
    # Read operation
    def get_users(self):
        self._openCursor()
        self.cursor.execute('SELECT * FROM TabUser')
        rows = self.cursor.fetchall()
        self.cursor.close()
        # Close the connection
        self.connection.close()
        for row in rows:
            print(f"ID: {row[0]}, UserName: {row[1]}, Password: {row[2]}")
 
    # Update operation
    def update_user(self, id, username, password):
        self._openCursor()
        self.cursor.execute(
            'UPDATE TabUser SET username = ?, password = ? WHERE id = ?', (username, password, id))
        self.connection.commit()
        self.cursor.close()   
        # Close the connection
        self.connection.close()
        print("User updated successfully.")
 
    # Delete operation
    def delete_user(self, id):
        self._openCursor()
        self.cursor.execute('DELETE FROM TabUser WHERE id = ?', (id,))
        self.connection.commit()
        self.cursor.close()   
        # Close the connection
        self.connection.close()
        print("User deleted successfully.")



Leave a Reply

Your email address will not be published. Required fields are marked *