Часть 5: Работа с NoSQL базами данных (MongoDB)​

Установка и запуск MongoDB​

Bash:
# Установка MongoDB (Ubuntu/Debian)
sudo apt-get install mongodb

# Или использование Docker
docker run -d -p 27017:27017 --name mongodb mongo

# Установка драйвера Python
pip install pymongo

Пример работы с MongoDB:​

Python:
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
from bson import ObjectId
from typing import Optional, List, Dict, Any

class MongoDBTaskManager:
    """Менеджер задач для MongoDB"""
    
    def __init__(self, connection_string="mongodb://localhost:27017/"):
        self.client = MongoClient(connection_string)
        self.db = self.client.task_database
        self.users = self.db.users
        self.tasks = self.db.tasks
        
        # Создание индексов
        self._create_indexes()
    
    def _create_indexes(self):
        """Создание индексов для оптимизации запросов"""
        self.users.create_index([("username", ASCENDING)], unique=True)
        self.users.create_index([("email", ASCENDING)], unique=True)
        self.tasks.create_index([("user_id", ASCENDING)])
        self.tasks.create_index([("status", ASCENDING)])
        self.tasks.create_index([("due_date", ASCENDING)])
        self.tasks.create_index([("priority", DESCENDING)])
    
    def add_user(self, username: str, email: str, **extra_data) -> Optional[str]:
        """Добавление пользователя"""
        user_data = {
            "username": username,
            "email": email,
            "created_at": datetime.utcnow(),
            **extra_data
        }
        
        try:
            result = self.users.insert_one(user_data)
            print(f"Пользователь добавлен с ID: {result.inserted_id}")
            return str(result.inserted_id)
        except Exception as e:
            print(f"Ошибка добавления пользователя: {e}")
            return None
    
    def add_task(self, user_id: str, title: str, **kwargs) -> Optional[str]:
        """Добавление задачи"""
        task_data = {
            "user_id": user_id,
            "title": title,
            "status": kwargs.get("status", "pending"),
            "priority": kwargs.get("priority", 3),
            "created_at": datetime.utcnow(),
            "updated_at": datetime.utcnow(),
            **{k: v for k, v in kwargs.items() if k not in ["status", "priority"]}
        }
        
        if "due_date" in kwargs and isinstance(kwargs["due_date"], str):
            task_data["due_date"] = datetime.fromisoformat(kwargs["due_date"])
        
        try:
            result = self.tasks.insert_one(task_data)
            print(f"Задача добавлена с ID: {result.inserted_id}")
            return str(result.inserted_id)
        except Exception as e:
            print(f"Ошибка добавления задачи: {e}")
            return None
    
    def get_tasks(self, filters: Dict[str, Any] = None,
                  sort_by: str = "priority",
                  sort_order: int = DESCENDING,
                  limit: int = 100) -> List[Dict]:
        """Получение задач с фильтрацией и сортировкой"""
        query = filters or {}
        
        # Преобразование user_id в ObjectId если нужно
        if "user_id" in query and isinstance(query["user_id"], str):
            query["user_id"] = query["user_id"]
        
        cursor = self.tasks.find(query)
        
        # Сортировка
        if sort_by:
            cursor = cursor.sort(sort_by, sort_order)
        
        # Лимит
        if limit:
            cursor = cursor.limit(limit)
        
        tasks = list(cursor)
        
        # Преобразование ObjectId в строку
        for task in tasks:
            task["_id"] = str(task["_id"])
        
        return tasks
    
    def aggregate_tasks(self, user_id: str) -> Dict[str, Any]:
        """Агрегация данных по задачам пользователя"""
        pipeline = [
            {"$match": {"user_id": user_id}},
            {"$group": {
                "_id": "$status",
                "count": {"$sum": 1},
                "avg_priority": {"$avg": "$priority"}
            }},
            {"$sort": {"count": DESCENDING}}
        ]
        
        results = list(self.tasks.aggregate(pipeline))
        
        # Сводная статистика
        total_tasks = sum(item["count"] for item in results)
        
        return {
            "statistics": results,
            "total_tasks": total_tasks,
            "user_id": user_id
        }
    
    def text_search(self, search_text: str) -> List[Dict]:
        """Полнотекстовый поиск по задачам"""
        # Создание текстового индекса (если еще не создан)
        if "title_text_description_text" not in self.tasks.index_information():
            self.tasks.create_index([
                ("title", "text"),
                ("description", "text")
            ])
        
        results = self.tasks.find(
            {"$text": {"$search": search_text}},
            {"score": {"$meta": "textScore"}}
        ).sort([("score", {"$meta": "textScore"})])
        
        return [{"_id": str(task["_id"]),
                 "title": task["title"],
                 "score": task.get("score", 0)}
                for task in results]
    
    def get_user_with_tasks(self, user_id: str) -> Dict[str, Any]:
        """Получение пользователя с его задачами (аналог JOIN в SQL)"""
        # В MongoDB нет JOIN, используем два запроса
        user = self.users.find_one({"_id": ObjectId(user_id)})
        
        if not user:
            return None
        
        tasks = self.get_tasks({"user_id": user_id})
        
        # Преобразование ObjectId
        user["_id"] = str(user["_id"])
        
        return {
            "user": user,
            "tasks": tasks,
            "task_count": len(tasks)
        }

def demo_mongodb():
    """Демонстрация работы с MongoDB"""
    manager = MongoDBTaskManager()
    
    # Очистка коллекций (для демо)
    manager.users.delete_many({})
    manager.tasks.delete_many({})
    
    # Добавление пользователя
    user_id = manager.add_user(
        username="bob",
        email="bob@example.com",
        profile={"age": 30, "city": "Moscow"}
    )
    
    if user_id:
        # Добавление задач
        tasks_data = [
            {"title": "Настроить MongoDB", "priority": 5, "tags": ["database", "nosql"]},
            {"title": "Изучить агрегации", "priority": 4, "status": "in_progress"},
            {"title": "Написать документацию", "priority": 3, "due_date": "2024-12-31"},
        ]
        
        for task_data in tasks_data:
            manager.add_task(user_id, **task_data)
        
        # Получение задач
        print("\nВсе задачи пользователя:")
        tasks = manager.get_tasks({"user_id": user_id})
        for task in tasks:
            print(f"- {task['title']} (Приоритет: {task['priority']})")
        
        # Агрегация
        print("\nСтатистика по задачам:")
        stats = manager.aggregate_tasks(user_id)
        print(f"Всего задач: {stats['total_tasks']}")
        for stat in stats['statistics']:
            print(f"  {stat['_id']}: {stat['count']} задач, средний приоритет: {stat['avg_priority']:.1f}")
        
        # Полнотекстовый поиск
        print("\nРезультаты поиска 'MongoDB':")
        search_results = manager.text_search("MongoDB")
        for result in search_results:
            print(f"- {result['title']} (релевантность: {result['score']:.2f})")
        
        # Получение пользователя с задачами
        user_with_tasks = manager.get_user_with_tasks(user_id)
        print(f"\nПользователь {user_with_tasks['user']['username']} имеет {user_with_tasks['task_count']} задач")

# Запуск демо
# demo_mongodb()

Часть 6: Асинхронная работа с базами данных​

Использование asyncpg для PostgreSQL:​

Python:
import asyncio
import asyncpg
from datetime import datetime

class AsyncTaskManager:
    """Асинхронный менеджер задач для PostgreSQL"""
    
    def __init__(self, database_url):
        self.database_url = database_url
        self.pool = None
    
    async def connect(self):
        """Создание пула подключений"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
        
        # Создание таблиц
        async with self.pool.acquire() as conn:
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS async_users (
                    id SERIAL PRIMARY KEY,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS async_tasks (
                    id SERIAL PRIMARY KEY,
                    user_id INTEGER REFERENCES async_users(id) ON DELETE CASCADE,
                    title VARCHAR(200) NOT NULL,
                    status VARCHAR(20) DEFAULT 'pending',
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
    
    async def add_user(self, username, email):
        """Добавление пользователя"""
        async with self.pool.acquire() as conn:
            try:
                user_id = await conn.fetchval(
                    'INSERT INTO async_users (username, email) VALUES ($1, $2) RETURNING id',
                    username, email
                )
                print(f"Асинхронно добавлен пользователь: {username}")
                return user_id
            except Exception as e:
                print(f"Ошибка: {e}")
                return None
    
    async def add_task_batch(self, user_id, tasks_data):
        """Пакетное добавление задач"""
        async with self.pool.acquire() as conn:
            # Использование copy для быстрой вставки
            await conn.copy_records_to_table(
                'async_tasks',
                records=[(user_id, task['title'], 'pending', datetime.utcnow())
                        for task in tasks_data],
                columns=['user_id', 'title', 'status', 'created_at']
            )
            print(f"Добавлено {len(tasks_data)} задач")
    
    async def get_users_with_task_count(self):
        """Получение пользователей с количеством задач"""
        async with self.pool.acquire() as conn:
            query = '''
                SELECT u.*, COUNT(t.id) as task_count
                FROM async_users u
                LEFT JOIN async_tasks t ON u.id = t.user_id
                GROUP BY u.id
                ORDER BY task_count DESC
            '''
            
            return await conn.fetch(query)
    
    async def close(self):
        """Закрытие пула подключений"""
        if self.pool:
            await self.pool.close()

async def demo_async():
    """Демонстрация асинхронной работы"""
    # Для теста можно использовать локальную PostgreSQL или тестовую БД
    manager = AsyncTaskManager(
        "postgresql://user:password@localhost/taskdb"
    )
    
    await manager.connect()
    
    try:
        # Параллельное добавление пользователей
        users = [
            ("async_user1", "user1@example.com"),
            ("async_user2", "user2@example.com"),
            ("async_user3", "user3@example.com"),
        ]
        
        # Асинхронное создание пользователей
        tasks = [manager.add_user(username, email) for username, email in users]
        user_ids = await asyncio.gather(*tasks)
        
        # Пакетное добавление задач
        tasks_data = [
            {"title": f"Задача {i} для пользователя {user_id}"}
            for user_id in user_ids if user_id
            for i in range(5)
        ]
        
        if user_ids:
            await manager.add_task_batch(user_ids[0], tasks_data[:10])
        
        # Получение статистики
        users_stats = await manager.get_users_with_task_count()
        print("\nСтатистика пользователей:")
        for user in users_stats:
            print(f"{user['username']}: {user['task_count']} задач")
    
    finally:
        await manager.close()

# Запуск асинхронной демо
# asyncio.run(demo_async())

Часть 7: Лучшие практики и паттерны​

1. Использование паттерна Repository:​

Python:
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any

class Repository(ABC):
    """Абстрактный репозиторий"""
    
    @abstractmethod
    def get(self, id: Any):
        pass
    
    @abstractmethod
    def add(self, entity: Any):
        pass
    
    @abstractmethod
    def update(self, id: Any, updates: Dict[str, Any]):
        pass
    
    @abstractmethod
    def delete(self, id: Any):
        pass
    
    @abstractmethod
    def list(self, filters: Dict[str, Any] = None):
        pass

class SQLiteUserRepository(Repository):
    """Реализация репозитория для SQLite"""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
    
    def get(self, user_id: int):
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
            row = cursor.fetchone()
            return dict(row) if row else None
    
    def add(self, user_data: Dict[str, Any]):
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(
                "INSERT INTO users (username, email) VALUES (?, ?)",
                (user_data['username'], user_data['email'])
            )
            conn.commit()
            return cursor.lastrowid
    
    # ... остальные методы

2. Конфигурация базы данных:​

Python:
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class DatabaseConfig:
    """Конфигурация базы данных"""
    driver: str
    host: str
    port: int
    database: str
    username: Optional[str] = None
    password: Optional[str] = None
    
    @property
    def connection_string(self) -> str:
        if self.driver == "sqlite":
            return f"sqlite:///{self.database}"
        elif self.driver == "postgresql":
            return (f"postgresql://{self.username}:{self.password}"
                   f"@{self.host}:{self.port}/{self.database}")
        elif self.driver == "mysql":
            return (f"mysql+mysqlconnector://{self.username}:{self.password}"
                   f"@{self.host}:{self.port}/{self.database}")
        else:
            raise ValueError(f"Unsupported driver: {self.driver}")
    
    @classmethod
    def from_env(cls) -> 'DatabaseConfig':
        """Создание конфигурации из переменных окружения"""
        return cls(
            driver=os.getenv("DB_DRIVER", "sqlite"),
            host=os.getenv("DB_HOST", "localhost"),
            port=int(os.getenv("DB_PORT", "5432")),
            database=os.getenv("DB_NAME", "app.db"),
            username=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD")
        )

# Использование
config = DatabaseConfig.from_env()
print(f"Connection string: {config.connection_string}")

3. Миграции базы данных (Alembic):​

Bash:
# Установка Alembic
pip install alembic

# Инициализация
alembic init migrations

# Создание миграции
alembic revision --autogenerate -m "Create users and tasks tables"

# Применение миграций
alembic upgrade head

Часть 8: Тестирование​

Python:
import unittest
import tempfile
import os

class TestTaskManager(unittest.TestCase):
    """Тесты для менеджера задач"""
    
    def setUp(self):
        # Создание временной базы данных
        self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
        self.db_path = self.temp_db.name
        self.manager = TaskManager(self.db_path)
        init_database(self.db_path)
    
    def tearDown(self):
        # Удаление временной базы данных
        self.temp_db.close()
        os.unlink(self.db_path)
    
    def test_add_user(self):
        """Тест добавления пользователя"""
        user_id = self.manager.add_user("test", "test@example.com")
        self.assertIsNotNone(user_id)
        self.assertGreater(user_id, 0)
    
    def test_add_task(self):
        """Тест добавления задачи"""
        user_id = self.manager.add_user("test", "test@example.com")
        task_id = self.manager.add_task(user_id, "Test task")
        self.assertIsNotNone(task_id)
    
    def test_get_tasks(self):
        """Тест получения задач"""
        user_id = self.manager.add_user("test", "test@example.com")
        self.manager.add_task(user_id, "Task 1")
        self.manager.add_task(user_id, "Task 2")
        
        tasks = self.manager.get_tasks(user_id=user_id)
        self.assertEqual(len(tasks), 2)
    
    def test_update_task_status(self):
        """Тест обновления статуса задачи"""
        user_id = self.manager.add_user("test", "test@example.com")
        task_id = self.manager.add_task(user_id, "Test task")
        
        updated = self.manager.update_task_status(task_id, "completed")
        self.assertTrue(updated)
        
        tasks = self.manager.get_tasks(user_id=user_id)
        self.assertEqual(tasks[0]['status'], 'completed')

# Запуск тестов
if __name__ == '__main__':
    unittest.main()