Часть 5: Работа с NoSQL базами данных (MongoDB)
Установка и запуск MongoDB
Bash:
# Установка MongoDB (Ubuntu/Debian)
sudo apt-get install mongodb
# Или использование Docker
docker run -d -p 27017:27017 --name mongodb mongo
# Установка драйвера Python
pip install pymongo
Пример работы с MongoDB:
Python:
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
from bson import ObjectId
from typing import Optional, List, Dict, Any
class MongoDBTaskManager:
"""Менеджер задач для MongoDB"""
def __init__(self, connection_string="mongodb://localhost:27017/"):
self.client = MongoClient(connection_string)
self.db = self.client.task_database
self.users = self.db.users
self.tasks = self.db.tasks
# Создание индексов
self._create_indexes()
def _create_indexes(self):
"""Создание индексов для оптимизации запросов"""
self.users.create_index([("username", ASCENDING)], unique=True)
self.users.create_index([("email", ASCENDING)], unique=True)
self.tasks.create_index([("user_id", ASCENDING)])
self.tasks.create_index([("status", ASCENDING)])
self.tasks.create_index([("due_date", ASCENDING)])
self.tasks.create_index([("priority", DESCENDING)])
def add_user(self, username: str, email: str, **extra_data) -> Optional[str]:
"""Добавление пользователя"""
user_data = {
"username": username,
"email": email,
"created_at": datetime.utcnow(),
**extra_data
}
try:
result = self.users.insert_one(user_data)
print(f"Пользователь добавлен с ID: {result.inserted_id}")
return str(result.inserted_id)
except Exception as e:
print(f"Ошибка добавления пользователя: {e}")
return None
def add_task(self, user_id: str, title: str, **kwargs) -> Optional[str]:
"""Добавление задачи"""
task_data = {
"user_id": user_id,
"title": title,
"status": kwargs.get("status", "pending"),
"priority": kwargs.get("priority", 3),
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
**{k: v for k, v in kwargs.items() if k not in ["status", "priority"]}
}
if "due_date" in kwargs and isinstance(kwargs["due_date"], str):
task_data["due_date"] = datetime.fromisoformat(kwargs["due_date"])
try:
result = self.tasks.insert_one(task_data)
print(f"Задача добавлена с ID: {result.inserted_id}")
return str(result.inserted_id)
except Exception as e:
print(f"Ошибка добавления задачи: {e}")
return None
def get_tasks(self, filters: Dict[str, Any] = None,
sort_by: str = "priority",
sort_order: int = DESCENDING,
limit: int = 100) -> List[Dict]:
"""Получение задач с фильтрацией и сортировкой"""
query = filters or {}
# Преобразование user_id в ObjectId если нужно
if "user_id" in query and isinstance(query["user_id"], str):
query["user_id"] = query["user_id"]
cursor = self.tasks.find(query)
# Сортировка
if sort_by:
cursor = cursor.sort(sort_by, sort_order)
# Лимит
if limit:
cursor = cursor.limit(limit)
tasks = list(cursor)
# Преобразование ObjectId в строку
for task in tasks:
task["_id"] = str(task["_id"])
return tasks
def aggregate_tasks(self, user_id: str) -> Dict[str, Any]:
"""Агрегация данных по задачам пользователя"""
pipeline = [
{"$match": {"user_id": user_id}},
{"$group": {
"_id": "$status",
"count": {"$sum": 1},
"avg_priority": {"$avg": "$priority"}
}},
{"$sort": {"count": DESCENDING}}
]
results = list(self.tasks.aggregate(pipeline))
# Сводная статистика
total_tasks = sum(item["count"] for item in results)
return {
"statistics": results,
"total_tasks": total_tasks,
"user_id": user_id
}
def text_search(self, search_text: str) -> List[Dict]:
"""Полнотекстовый поиск по задачам"""
# Создание текстового индекса (если еще не создан)
if "title_text_description_text" not in self.tasks.index_information():
self.tasks.create_index([
("title", "text"),
("description", "text")
])
results = self.tasks.find(
{"$text": {"$search": search_text}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])
return [{"_id": str(task["_id"]),
"title": task["title"],
"score": task.get("score", 0)}
for task in results]
def get_user_with_tasks(self, user_id: str) -> Dict[str, Any]:
"""Получение пользователя с его задачами (аналог JOIN в SQL)"""
# В MongoDB нет JOIN, используем два запроса
user = self.users.find_one({"_id": ObjectId(user_id)})
if not user:
return None
tasks = self.get_tasks({"user_id": user_id})
# Преобразование ObjectId
user["_id"] = str(user["_id"])
return {
"user": user,
"tasks": tasks,
"task_count": len(tasks)
}
def demo_mongodb():
"""Демонстрация работы с MongoDB"""
manager = MongoDBTaskManager()
# Очистка коллекций (для демо)
manager.users.delete_many({})
manager.tasks.delete_many({})
# Добавление пользователя
user_id = manager.add_user(
username="bob",
email="bob@example.com",
profile={"age": 30, "city": "Moscow"}
)
if user_id:
# Добавление задач
tasks_data = [
{"title": "Настроить MongoDB", "priority": 5, "tags": ["database", "nosql"]},
{"title": "Изучить агрегации", "priority": 4, "status": "in_progress"},
{"title": "Написать документацию", "priority": 3, "due_date": "2024-12-31"},
]
for task_data in tasks_data:
manager.add_task(user_id, **task_data)
# Получение задач
print("\nВсе задачи пользователя:")
tasks = manager.get_tasks({"user_id": user_id})
for task in tasks:
print(f"- {task['title']} (Приоритет: {task['priority']})")
# Агрегация
print("\nСтатистика по задачам:")
stats = manager.aggregate_tasks(user_id)
print(f"Всего задач: {stats['total_tasks']}")
for stat in stats['statistics']:
print(f" {stat['_id']}: {stat['count']} задач, средний приоритет: {stat['avg_priority']:.1f}")
# Полнотекстовый поиск
print("\nРезультаты поиска 'MongoDB':")
search_results = manager.text_search("MongoDB")
for result in search_results:
print(f"- {result['title']} (релевантность: {result['score']:.2f})")
# Получение пользователя с задачами
user_with_tasks = manager.get_user_with_tasks(user_id)
print(f"\nПользователь {user_with_tasks['user']['username']} имеет {user_with_tasks['task_count']} задач")
# Запуск демо
# demo_mongodb()
Часть 6: Асинхронная работа с базами данных
Использование asyncpg для PostgreSQL:
Python:
import asyncio
import asyncpg
from datetime import datetime
class AsyncTaskManager:
"""Асинхронный менеджер задач для PostgreSQL"""
def __init__(self, database_url):
self.database_url = database_url
self.pool = None
async def connect(self):
"""Создание пула подключений"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20,
command_timeout=60
)
# Создание таблиц
async with self.pool.acquire() as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS async_users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
await conn.execute('''
CREATE TABLE IF NOT EXISTS async_tasks (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES async_users(id) ON DELETE CASCADE,
title VARCHAR(200) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
async def add_user(self, username, email):
"""Добавление пользователя"""
async with self.pool.acquire() as conn:
try:
user_id = await conn.fetchval(
'INSERT INTO async_users (username, email) VALUES ($1, $2) RETURNING id',
username, email
)
print(f"Асинхронно добавлен пользователь: {username}")
return user_id
except Exception as e:
print(f"Ошибка: {e}")
return None
async def add_task_batch(self, user_id, tasks_data):
"""Пакетное добавление задач"""
async with self.pool.acquire() as conn:
# Использование copy для быстрой вставки
await conn.copy_records_to_table(
'async_tasks',
records=[(user_id, task['title'], 'pending', datetime.utcnow())
for task in tasks_data],
columns=['user_id', 'title', 'status', 'created_at']
)
print(f"Добавлено {len(tasks_data)} задач")
async def get_users_with_task_count(self):
"""Получение пользователей с количеством задач"""
async with self.pool.acquire() as conn:
query = '''
SELECT u.*, COUNT(t.id) as task_count
FROM async_users u
LEFT JOIN async_tasks t ON u.id = t.user_id
GROUP BY u.id
ORDER BY task_count DESC
'''
return await conn.fetch(query)
async def close(self):
"""Закрытие пула подключений"""
if self.pool:
await self.pool.close()
async def demo_async():
"""Демонстрация асинхронной работы"""
# Для теста можно использовать локальную PostgreSQL или тестовую БД
manager = AsyncTaskManager(
"postgresql://user:password@localhost/taskdb"
)
await manager.connect()
try:
# Параллельное добавление пользователей
users = [
("async_user1", "user1@example.com"),
("async_user2", "user2@example.com"),
("async_user3", "user3@example.com"),
]
# Асинхронное создание пользователей
tasks = [manager.add_user(username, email) for username, email in users]
user_ids = await asyncio.gather(*tasks)
# Пакетное добавление задач
tasks_data = [
{"title": f"Задача {i} для пользователя {user_id}"}
for user_id in user_ids if user_id
for i in range(5)
]
if user_ids:
await manager.add_task_batch(user_ids[0], tasks_data[:10])
# Получение статистики
users_stats = await manager.get_users_with_task_count()
print("\nСтатистика пользователей:")
for user in users_stats:
print(f"{user['username']}: {user['task_count']} задач")
finally:
await manager.close()
# Запуск асинхронной демо
# asyncio.run(demo_async())
Часть 7: Лучшие практики и паттерны
1. Использование паттерна Repository:
Python:
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
class Repository(ABC):
"""Абстрактный репозиторий"""
@abstractmethod
def get(self, id: Any):
pass
@abstractmethod
def add(self, entity: Any):
pass
@abstractmethod
def update(self, id: Any, updates: Dict[str, Any]):
pass
@abstractmethod
def delete(self, id: Any):
pass
@abstractmethod
def list(self, filters: Dict[str, Any] = None):
pass
class SQLiteUserRepository(Repository):
"""Реализация репозитория для SQLite"""
def __init__(self, db_path: str):
self.db_path = db_path
def get(self, user_id: int):
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
return dict(row) if row else None
def add(self, user_data: Dict[str, Any]):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO users (username, email) VALUES (?, ?)",
(user_data['username'], user_data['email'])
)
conn.commit()
return cursor.lastrowid
# ... остальные методы
2. Конфигурация базы данных:
Python:
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class DatabaseConfig:
"""Конфигурация базы данных"""
driver: str
host: str
port: int
database: str
username: Optional[str] = None
password: Optional[str] = None
@property
def connection_string(self) -> str:
if self.driver == "sqlite":
return f"sqlite:///{self.database}"
elif self.driver == "postgresql":
return (f"postgresql://{self.username}:{self.password}"
f"@{self.host}:{self.port}/{self.database}")
elif self.driver == "mysql":
return (f"mysql+mysqlconnector://{self.username}:{self.password}"
f"@{self.host}:{self.port}/{self.database}")
else:
raise ValueError(f"Unsupported driver: {self.driver}")
@classmethod
def from_env(cls) -> 'DatabaseConfig':
"""Создание конфигурации из переменных окружения"""
return cls(
driver=os.getenv("DB_DRIVER", "sqlite"),
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "app.db"),
username=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD")
)
# Использование
config = DatabaseConfig.from_env()
print(f"Connection string: {config.connection_string}")
3. Миграции базы данных (Alembic):
Bash:
# Установка Alembic
pip install alembic
# Инициализация
alembic init migrations
# Создание миграции
alembic revision --autogenerate -m "Create users and tasks tables"
# Применение миграций
alembic upgrade head
Часть 8: Тестирование
Python:
import unittest
import tempfile
import os
class TestTaskManager(unittest.TestCase):
"""Тесты для менеджера задач"""
def setUp(self):
# Создание временной базы данных
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.db_path = self.temp_db.name
self.manager = TaskManager(self.db_path)
init_database(self.db_path)
def tearDown(self):
# Удаление временной базы данных
self.temp_db.close()
os.unlink(self.db_path)
def test_add_user(self):
"""Тест добавления пользователя"""
user_id = self.manager.add_user("test", "test@example.com")
self.assertIsNotNone(user_id)
self.assertGreater(user_id, 0)
def test_add_task(self):
"""Тест добавления задачи"""
user_id = self.manager.add_user("test", "test@example.com")
task_id = self.manager.add_task(user_id, "Test task")
self.assertIsNotNone(task_id)
def test_get_tasks(self):
"""Тест получения задач"""
user_id = self.manager.add_user("test", "test@example.com")
self.manager.add_task(user_id, "Task 1")
self.manager.add_task(user_id, "Task 2")
tasks = self.manager.get_tasks(user_id=user_id)
self.assertEqual(len(tasks), 2)
def test_update_task_status(self):
"""Тест обновления статуса задачи"""
user_id = self.manager.add_user("test", "test@example.com")
task_id = self.manager.add_task(user_id, "Test task")
updated = self.manager.update_task_status(task_id, "completed")
self.assertTrue(updated)
tasks = self.manager.get_tasks(user_id=user_id)
self.assertEqual(tasks[0]['status'], 'completed')
# Запуск тестов
if __name__ == '__main__':
unittest.main()