In this post, we will see how to manage a MongoDB database with Python using pymongo.
In detail, we will perform the CRUD operations on a document called ‘User’ defined as follows:
{
"name": string,
"age": int,
"city": string
}
We start writing a Docker-compose file to create a docker container, with an instance of MongoDB:
version: '3'
services:
# MongoDB container definition
dockermongo:
# name docker image
image: mongo
# username and password Admin definition
environment:
- MONGO_INITDB_ROOT_USERNAME=admindb
- MONGO_INITDB_ROOT_PASSWORD=pass123
# volume definition
volumes:
- dbmongo:/data/db
ports:
- 27017:27017
volumes:
dbmongo:
driver: local
If we run it, the following will be the result:
Finally, with the command docker ps, we can check if the container is running:
Now, we will install pymongo library:
py -m pip install pymongo
We have installed all components and now, we can start to write the Python code.
First of all, we create a config file where we will insert the MongoDB parameters:
[CONFIG.INI]
[mongodb]
server = localhost
user = admindb
password = pass123
database = dbuser
port = 27017
Then, we define a Class called ‘User’ that represents the document in MongoDB:
[USER.PY]
class User:
def __init__(self, nameinput, ageinput, cityinput):
self.name = nameinput
self.age = ageinput
self.city = cityinput
Finally, we create a file called ‘MongoRepository’ that we will use to manage all CRUD operations:
[MONGOREPOSITORY.PY]
import configparser
from pymongo import MongoClient
from User import User
class MongoRepository:
connectionstring = None
dbname = None
client = None
db = None
collectionusers = None
# method used to get the connection string to MongoDB
# and to take the Db name
def _getconnection(self):
# definition of the object used to read the config file
configfile = configparser.ConfigParser()
configfile.read("config.ini")
mongodb = configfile["mongodb"]
server = mongodb["server"]
user = mongodb["user"]
password = mongodb["password"]
database = mongodb["database"]
port = mongodb["port"]
self.connectionstring = f"mongodb://{user}:{password}@{server}:{port}"
self.dbname = database
def __init__(self):
# get connection string
self._getconnection()
# definition of client, db and collection
self.client = MongoClient(self.connectionstring)
self.db = self.client[self.dbname]
# get the collection called 'users'
self.collectionusers = self.db['users']
INSERT NEW USERS:
def saveusers(self, lstuser):
self.collectionusers.insert_many(lstuser)
[MAIN.PY]
from MongoRepository import MongoRepository
import random
# creating instance of MongoRepository
repo = MongoRepository()
# Preparing 100 documents to insert
lstusers = [
{
"name": f"User{i}",
"age": random.randint(18, 60), # Random age between 18 and 60
# Random city
"city": random.choice(["London", "Rome", "Paris", "Monaco", "Amsterdam"])
}
for i in range(100)
]
try:
# save list of users
repo.saveusers(lstusers)
print("All data added in MongoDB")
except Exception as error:
print("Something went wrong")
print(error)
If we run the file main.py, the following will be the result:
def saveuser(self, user):
newuser = {"name": user.name, "age": user.age, "city": user.city}
self.collectionusers.insert_one(newuser)
[MAIN.PY]
from MongoRepository import MongoRepository
from User import User
import random
# creating instance of MongoRepository
repo = MongoRepository()
user = User("TestName", 44, "London")
user2 = User("TestName2", 56, "Rome")
user3 = User("TestName3", 23, "Amsterdam")
repo.saveuser(user)
repo.saveuser(user2)
repo.saveuser(user3)
repo.selectall()
If we run the file main.py, the following will be the result:
SELECT USERS:
def selectall(self):
for user in self.collectionusers.find().sort('name', ASCENDING):
print(
f"{user.get('name', 'N/A')} - {user.get('age', 'N/A')} - {user.get('city', 'N/A')}")
[MAIN.PY]
from MongoRepository import MongoRepository
import random
# creating instance of MongoRepository
repo = MongoRepository()
repo.selectall()
If we run the file main.py, the following will be the result:
def selectforagegreater50(self):
for user in self.collectionusers.find({"age": {"$gt": 50}}).sort('name', ASCENDING):
print(
f"{user.get('name', 'N/A')} - {user.get('age', 'N/A')} - {user.get('city', 'N/A')}")
[MAIN.PY]
from MongoRepository import MongoRepository
import random
# creating instance of MongoRepository
repo = MongoRepository()
repo.selectforagegreater50()
If we run the file main.py, the following will be the result:
DELETE USERS:
def deleteallusers(self):
self.collectionusers.delete_many({})
[MAIN.PY]
from MongoRepository import MongoRepository
import random
# creating instance of MongoRepository
repo = MongoRepository()
repo.deleteallusers()
repo.selectall()
If we run the file main.py, the following will be the result:
def deleteuserswithagelessvalueinput(self, age):
self.collectionusers.delete_many({"age": {"$lt": age}})
[MAIN.PY]
from MongoRepository import MongoRepository
from User import User
import random
# creating instance of MongoRepository
repo = MongoRepository()
repo.selectall()
print("\n\n")
repo.deleteuserswithagelessvalueinput(40)
repo.selectall()
If we run the file main.py, the following will be the result:
UPDATE USER:
def updateuser(self, user):
self.collectionusers.update_one({"name": user.name}, {
"$set": {"age": user.age}})
[MAIN.PY]
from MongoRepository import MongoRepository
from User import User
import random
# creating instance of MongoRepository
repo = MongoRepository()
repo.selectforagegreater50()
print("\n\n")
user = User("TestName2", 18, "Rome")
repo.updateuser(user)
repo.selectall()
If we run the file main.py, the following will be the result: