In this post, we will see how to implement CRUD operations for an ‘User’ object in Redis, defines as follows:
{
"name": string,
"age": int,
"city": string
}
Before to start, I want to remember that REDIS (Remote Dictionary Server), is an open-source, in-memory data structure store, used as a database, cache, and message broker. It supports various data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, geospatial indexes, and streams.
The common use cases for Redis are:
Caching: the most common use case, reducing the load on databases and improving response times.
Session Storage: storing user session data for web applications.
Real-Time Analytics: suitable for scenarios needing real-time analysis of data, like in gaming, advertising, etc.
For all information, the redis web site is: https://redis.io/
First of all, we define a docker-compose file to create a docker container with an instance of Redis:
version: '3.8' # Specifies the Docker Compose version
services:
redis:
image: redis:latest # Use the latest Redis image
ports:
- "6379:6379" # Map the default Redis port to the same port on the host
volumes:
- redis_data:/data # Persist data using a volume
volumes:
redis_data: # Defines the volume to persist Redis data
Then, using the command docker-compose up -d, we will create the container:
Finally, with the command docker ps, we can verify that everything is working correctly:
Before starting to write code, we have to install Redis Python Client, using the command:
pip install redis
Now, we can start to develop code!
First, we create a config file to store Redis parameters:
[CONFIG.INI]
[redis]
server = localhost
port = 6379
db = 0
Then, we define the RedisRepository file used to manage CRUD operations:
[REDISREPOSITORY.PY]
import configparser
class RedisRepository:
redisclient = None
server = None
port = None
database = None
# method used to get the parameters to connect to Redis
def _get_redis_parameters(self):
# definition of the object used to read the config file
configfile = configparser.ConfigParser()
configfile.read("config.ini")
rediscache = configfile["redis"]
server = rediscache["server"]
port = rediscache["port"]
database = rediscache["db"]
def __init__(self):
# get Redis paramters
self._get_redis_parameters()
Finally, we will add all methods for the CRUD operations!
INSERT USER:
import configparser
import redis
import uuid
class RedisRepository:
redisclient = None
server = None
port = None
database = None
# method used to get the parameters to connect to Redis
def _get_redis_parameters(self):
# definition of the object used to read the config file
configfile = configparser.ConfigParser()
configfile.read("config.ini")
rediscache = configfile["redis"]
self.server = rediscache["server"]
self.port = rediscache["port"]
self.database = rediscache["db"]
def __init__(self):
# get Redis paramters
self._get_redis_parameters()
def insert_user(self, name, age, city):
"""
Creates a new user in Redis with a GUID as the user ID.
:param name: Name of the user.
:param age: Age of the user.
:param city: City of the user.
:return: The GUID of the created user.
"""
user_id = str(uuid.uuid4()) # Generate a GUID for the user ID
user_key = f"user:{user_id}"
user_data = {
"name": name,
"age": age,
"city": city
}
try:
# Using a context manager to handle the Redis connection
# This ensures that the connection is properly closed after
# the block of code is executed, even if an exception occurs:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Hset is a command in Redis that is used to set the value of one or more
# fields in a hash. It's a very versatile and commonly used command in Redis,
# especially when dealing with objects or entities that have multiple attributes
client.hset(user_key, mapping=user_data)
# Add the user key to the set of users
client.sadd("users", user_key)
return user_id
except Exception as error:
print(f"Error during the insert operation, because of: {error}")
return 0
[MAIN.PY]
from RediRepository import RedisRepository
redisrepo = RedisRepository()
for item in range(1, 10):
newId = redisrepo.insert_user(f'Name_{item}', f'City_{item}', 40+item)
if newId == "0":
print("The user hasn't been created!")
else:
print(f"The user has been created with the Id - {newId}")
If we run the file main.py, the following will be the result:
SELECT USER:
def select_all_users(self):
"""
Retrieves all users from Redis.
:return: A list of user data dictionaries.
"""
all_users = []
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Retrieve all user keys from the set
user_keys = client.smembers("users")
for user_key in user_keys:
# Fetch each user's data
user_data = client.hgetall(user_key.decode('utf-8'))
# Convert bytes to string for each field
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
all_users.append(user_data)
except Exception as error:
print(f"Error during the select all users operation: {error}")
return all_users
[MAIN.PY]
from RediRepository import RedisRepository
redisrepo = RedisRepository()
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
# print only name field
print(f"{user['name']}")
If we run the file main.py, the following will be the result:
def select_user_by_name(self, nameinput):
"""
Finds a user in Redis by their name.
:param nameinput: The name of the user to find.
:return: User data if found, else None.
"""
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Retrieve all user keys from the set
user_keys = client.smembers("users")
for user_key in user_keys:
# Fetch each user's data
user_data = client.hgetall(user_key.decode('utf-8'))
# Convert bytes to string for each field
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
if user_data.get("name") == nameinput:
return user_data
except Exception as error:
print(f"Error during the select users by name operation: {error}")
return None
[MAIN.PY]
from RediRepository import RedisRepository
redisrepo = RedisRepository()
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
nametofind = 'Name_1'
userstofind = redisrepo.select_user_by_name(nametofind)
if userstofind:
print(f"User found: {userstofind}")
else:
print(f"No user found with the name {nametofind}")
nametofind = 'Name_100'
userstofind = redisrepo.select_user_by_name(nametofind)
if userstofind:
print(f"User found: {userstofind}")
else:
print(f"No user found with the name {nametofind}")
If we run the file main.py, the following will be the result:
def select_all_users_with_id(self):
"""
Retrieves all users from Redis, including their IDs.
:return: A list of dictionaries, each containing user data along with the user ID.
"""
all_users_with_id = []
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
user_keys = client.smembers("users")
for user_key in user_keys:
# Extract user ID from the key
user_id = user_key.decode('utf-8').split(":")[1]
user_data = client.hgetall(user_key.decode('utf-8'))
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
# Include the user ID in the data dictionary
user_data['id'] = user_id
all_users_with_id.append(user_data)
except Exception as error:
print(f"Error during the retrieval operation: {error}")
return all_users_with_id
[MAIN.PY]
from RediRepository import RedisRepository
redisrepo = RedisRepository()
users_with_ids = redisrepo.select_all_users_with_id()
for user in users_with_ids:
print(user)
If we run the file main.py, the following will be the result:
DELETE USER:
from RediRepository import RedisRepository
redisrepo = RedisRepository()
users_with_ids = redisrepo.select_all_users_with_id()
for user in users_with_ids:
print(user)
def delete_user_by_name(self, nameinput):
"""
Deletes a user in Redis by their name.
:param nameinput: The name of the user to delete.
:return: True if the user was deleted, False otherwise.
"""
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
user_keys = client.smembers("users")
for user_key in user_keys:
user_data = client.hgetall(user_key.decode('utf-8'))
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
if user_data.get("name") == nameinput:
# Delete the user's hash
client.delete(user_key.decode('utf-8'))
# Remove the user key from the set of users
client.srem("users", user_key)
return True
except Exception as error:
print(f"Error during user deletion: {error}")
return False
[MAIN.PY]
from RediRepository import RedisRepository
redisrepo = RedisRepository()
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
nametofind = "Name_1"
if redisrepo.delete_user_by_name(nametofind):
print(f"User {nametofind} deleted successfully.")
else:
print(f"User {nametofind} not found or error in deletion.")
nametofind = "Name_100"
if redisrepo.delete_user_by_name(nametofind):
print(f"User {nametofind} deleted successfully.")
else:
print(f"User {nametofind} not found or error in deletion.")
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
If we run the file main.py, the following will be the result:
UPDATE USER:
def update_user(self, user_id, updated_data):
"""
Updates a user in Redis.
:param user_id: The ID of the user to update.
:param updated_data: A dictionary with the updated user data.
:return: True if the update was successful, False otherwise.
"""
user_key = f"user:{user_id}"
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Check if the user exists
if not client.exists(user_key):
print(f"No user found with ID: {user_id}")
return False
# Update user data
client.hset(user_key, mapping=updated_data)
return True
except Exception as error:
print(f"Error during user update: {error}")
return False
[MAIN.PY]
from RediRepository import RedisRepository
redisrepo = RedisRepository()
# Name_1 -> 0f70825c-be0b-4fe7-b7d6-d3c7e61046dc
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
user_to_update = "0f70825c-be0b-4fe7-b7d6-d3c7e61046dc"
new_data = {
"name": "Name_1_Plus",
"age": "60",
"city": "City_1_Plus"
}
if redisrepo.update_user(user_to_update, new_data):
print(f"User {user_to_update} updated successfully.")
else:
print(f"Failed to update user {user_to_update}.")
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
user_to_update = "AAAAAAAAAA"
new_data = {
"name": "Name_1_Plus",
"age": "60",
"city": "City_1_Plus"
}
if redisrepo.update_user(user_to_update, new_data):
print(f"User {user_to_update} updated successfully.")
else:
print(f"Failed to update user {user_to_update}.")
users = redisrepo.select_all_users()
for user in users:
# print all fields
print(user)
If we run the file main.py, the following will be the result:
Here, the complete version of the RedisRepository file:
[REDISREPOSITORY.PY]
import configparser
import redis
import uuid
class RedisRepository:
redisclient = None
server = None
port = None
database = None
# method used to get the parameters to connect to Redis
def _get_redis_parameters(self):
# definition of the object used to read the config file
configfile = configparser.ConfigParser()
configfile.read("config.ini")
rediscache = configfile["redis"]
self.server = rediscache["server"]
self.port = rediscache["port"]
self.database = rediscache["db"]
def __init__(self):
# get Redis paramters
self._get_redis_parameters()
def insert_user(self, name, age, city):
"""
Creates a new user in Redis with a GUID as the user ID.
:param name: Name of the user.
:param age: Age of the user.
:param city: City of the user.
:return: The GUID of the created user.
"""
user_id = str(uuid.uuid4()) # Generate a GUID for the user ID
user_key = f"user:{user_id}"
user_data = {
"name": name,
"age": age,
"city": city
}
try:
# Using a context manager to handle the Redis connection
# This ensures that the connection is properly closed after
# the block of code is executed, even if an exception occurs:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Hset is a command in Redis that is used to set the value of one or more
# fields in a hash. It's a very versatile and commonly used command in Redis,
# especially when dealing with objects or entities that have multiple attributes
client.hset(user_key, mapping=user_data)
# Add the user key to the set of users
client.sadd("users", user_key)
return user_id
except Exception as error:
print(f"Error during the insert operation, because of: {error}")
return 0
def select_all_users(self):
"""
Retrieves all users from Redis.
:return: A list of user data dictionaries.
"""
all_users = []
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Retrieve all user keys from the set
user_keys = client.smembers("users")
for user_key in user_keys:
# Fetch each user's data
user_data = client.hgetall(user_key.decode('utf-8'))
# Convert bytes to string for each field
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
all_users.append(user_data)
except Exception as error:
print(f"Error during the select all users operation: {error}")
return all_users
def select_user_by_name(self, nameinput):
"""
Finds a user in Redis by their name.
:param nameinput: The name of the user to find.
:return: User data if found, else None.
"""
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Retrieve all user keys from the set
user_keys = client.smembers("users")
for user_key in user_keys:
# Fetch each user's data
user_data = client.hgetall(user_key.decode('utf-8'))
# Convert bytes to string for each field
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
if user_data.get("name") == nameinput:
return user_data
except Exception as error:
print(f"Error during the select users by name operation: {error}")
return None
def delete_user_by_name(self, nameinput):
"""
Deletes a user in Redis by their name.
:param nameinput: The name of the user to delete.
:return: True if the user was deleted, False otherwise.
"""
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
user_keys = client.smembers("users")
for user_key in user_keys:
user_data = client.hgetall(user_key.decode('utf-8'))
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
if user_data.get("name") == nameinput:
# Delete the user's hash
client.delete(user_key.decode('utf-8'))
# Remove the user key from the set of users
client.srem("users", user_key)
return True
except Exception as error:
print(f"Error during user deletion: {error}")
return False
def update_user(self, user_id, updated_data):
"""
Updates a user in Redis.
:param user_id: The ID of the user to update.
:param updated_data: A dictionary with the updated user data.
:return: True if the update was successful, False otherwise.
"""
user_key = f"user:{user_id}"
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
# Check if the user exists
if not client.exists(user_key):
print(f"No user found with ID: {user_id}")
return False
# Update user data
client.hset(user_key, mapping=updated_data)
return True
except Exception as error:
print(f"Error during user update: {error}")
return False
def select_all_users_with_id(self):
"""
Retrieves all users from Redis, including their IDs.
:return: A list of dictionaries, each containing user data along with the user ID.
"""
all_users_with_id = []
try:
with redis.Redis(host=self.server, port=self.port, db=self.database) as client:
user_keys = client.smembers("users")
for user_key in user_keys:
# Extract user ID from the key
user_id = user_key.decode('utf-8').split(":")[1]
user_data = client.hgetall(user_key.decode('utf-8'))
user_data = {k.decode('utf-8'): v.decode('utf-8')
for k, v in user_data.items()}
# Include the user ID in the data dictionary
user_data['id'] = user_id
all_users_with_id.append(user_data)
except Exception as error:
print(f"Error during the retrieval operation: {error}")
return all_users_with_id