File size: 3,014 Bytes
8cd3eec
a052739
3d21336
a052739
 
8cd3eec
a052739
8ab14ef
3d21336
a052739
 
 
3d21336
 
 
 
a052739
8cd3eec
 
 
 
 
 
 
 
 
 
 
 
 
3d21336
 
 
8cd3eec
3d21336
 
 
8cd3eec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a052739
 
8cd3eec
 
 
 
 
3d21336
8cd3eec
 
 
 
 
 
3d21336
8cd3eec
 
 
 
 
3d21336
8cd3eec
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# src/data/repositories/utils.py

from datetime import datetime, timedelta, timezone

from pymongo import ASCENDING
from pymongo.errors import ConnectionFailure, PyMongoError

from src.data.connection import ActionFailed, get_collection, get_database
from src.utils.logger import logger


def create_index(
	collection_name: str,
	field_name: str,
	*,
	unique: bool = False
) -> None:
	"""
	Creates an index on a specified collection.

	Raises:
		ActionFailed: If a database error occurs.
	"""
	try:
		collection = get_collection(collection_name)
		collection.create_index([(field_name, ASCENDING)], unique=unique)
		logger().info(f"Ensured index exists on '{field_name}' for collection '{collection_name}'.")
	except (ConnectionFailure, PyMongoError) as e:
		logger().error(f"Failed to create index on '{collection_name}': {e}")
		raise ActionFailed("A database error occurred while creating an index.") from e

def delete_old_data(
	collection_name: str,
	timestamp_field: str = "updated_at",
	*,
	days: int = 30
) -> int:
	"""
	Deletes documents from a collection older than a specified number of days.

	Args:
		collection_name: The name of the collection to prune.
		timestamp_field: The name of the datetime field to check. Defaults to "updated_at".
		days: The age in days beyond which documents will be deleted.

	Returns:
		The number of documents deleted.

	Raises:
		ActionFailed: If a database error occurs.
	"""
	try:
		collection = get_collection(collection_name)
		cutoff = datetime.now(timezone.utc) - timedelta(days=days)
		result = collection.delete_many({
			timestamp_field: {"$lt": cutoff}
		})
		if result.deleted_count > 0:
			logger().info(f"Deleted {result.deleted_count} old documents from '{collection_name}'.")
		return result.deleted_count
	except (ConnectionFailure, PyMongoError) as e:
		logger().error(f"Failed to delete old data from '{collection_name}': {e}")
		raise ActionFailed("A database error occurred while deleting old data.") from e

def backup_collection(collection_name: str) -> str:
	"""
	Creates a timestamped backup of a collection using an aggregation pipeline.

	Returns:
		The name of the newly created backup collection.

	Raises:
		ActionFailed: If a database error occurs.
	"""
	try:
		db = get_database()
		backup_name = f"{collection_name}_backup_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"

		# This operation is idempotent, so no need to check for existence first.
		# The $out stage will automatically replace the collection if it exists.
		source_collection = get_collection(collection_name)
		pipeline = [{"$match": {}}, {"$out": backup_name}]
		source_collection.aggregate(pipeline)

		doc_count = db[backup_name].count_documents({})
		logger().info(f"Created backup '{backup_name}' with {doc_count} documents.")
		return backup_name
	except (ConnectionFailure, PyMongoError) as e:
		logger().error(f"Failed to back up collection '{collection_name}': {e}")
		raise ActionFailed("A database error occurred during collection backup.") from e