Abhinav Deshpande
Configure LFS
a176aa6 unverified
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from datetime import datetime, timedelta
import json
uri = "mongodb+srv://transyltoonia:meradatabase@transyltoonia.xdrsx1s.mongodb.net/?retryWrites=true&w=majority&appName=transyltoonia"
class DatabaseManager:
def __init__(self, uri=uri, db_name='FlipkartGrid_DB'):
"""
Comprehensive Database Management System
Combines basic and advanced database operations
"""
client = MongoClient(uri, server_api=ServerApi('1'))
try:
client.admin.command('ping')
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
print("An error occurred while connecting to MongoDB: ", e)
try:
# Database Connection
self.client = MongoClient(uri)
self.db = self.client[db_name]
# Collections
self.brand_collection = self.db['brand_recognition']
self.ocr_collection = self.db['ocr']
self.freshness_collection = self.db['freshness']
# Create performance indexes
self._create_indexes()
except Exception as e:
print(f"Database connection error: {e}")
raise
def _create_indexes(self):
"""
Create performance and unique indexes
"""
self.brand_collection.create_index([("brand", 1)], unique=True)
self.ocr_collection.create_index([("brand", 1), ("expiry_date", 1)])
self.freshness_collection.create_index([("produce", 1)])
# Original Basic Methods
from datetime import datetime
def add_brand_record(self, brand, count):
"""
Add or update a brand record with validation.
If the brand exists, increment its count. Otherwise, add a new record.
"""
try:
# Input validation
if not brand or not isinstance(brand, str):
return "Invalid brand name"
if not isinstance(count, (int, float)) or count < 0:
return "Invalid count value"
# Prepare the brand name
brand = brand.strip().title()
# Check if the brand already exists
existing_record = self.brand_collection.find_one({"brand": brand})
if existing_record:
# Increment the count if the brand exists
new_count = existing_record["count"] + count
self.brand_collection.update_one(
{"brand": brand},
{"$set": {"count": new_count, "last_updated": datetime.now()}}
)
return f"Updated brand record: Brand = {brand}, New Count = {new_count}"
else:
# Add a new record if the brand does not exist
record = {
"S.No": self.brand_collection.count_documents({}) + 1,
"timestamp": datetime.now(),
"brand": brand,
"count": count,
"last_updated": datetime.now()
}
self.brand_collection.insert_one(record)
return f"Added brand record: Brand = {brand}, Count = {count}"
except Exception as e:
return f"Error adding brand record: {e}"
def get_brand_records(self, filter_criteria=None, sort_by='timestamp', ascending=False):
"""
Retrieve brand records with flexible filtering and sorting
"""
try:
filter_criteria = filter_criteria or {}
sort_direction = 1 if ascending else -1
records = self.brand_collection.find(filter_criteria).sort(sort_by, sort_direction)
return [
{
"S.No": record['S.No'],
"Brand": record['brand'],
"Count": record['count'],
"Timestamp": record['timestamp']
} for record in records
]
except Exception as e:
return f"Error retrieving brand records: {e}"
def add_ocr_record(self, brand, expiry_date, manufacture_date, mrp):
"""
Add OCR record with comprehensive validation
"""
try:
# Date validation helper
def validate_date(date_str):
try:
return datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
return None
exp_date = validate_date(expiry_date)
man_date = validate_date(manufacture_date)
if not exp_date or not man_date:
return "Invalid date format. Use YYYY-MM-DD"
# Check expiry date is after manufacture date
if exp_date <= man_date:
return "Expiry date must be after manufacture date"
# Validate MRP
try:
mrp = float(mrp)
if mrp <= 0:
return "MRP must be a positive number"
except ValueError:
return "Invalid MRP value"
record = {
"S.No": self.ocr_collection.count_documents({}) + 1,
"timestamp": datetime.now(),
"brand": brand.strip().title(),
"expiry_date": exp_date,
"manufacture_date": man_date,
"mrp": mrp,
"days_to_expiry": (exp_date - datetime.now().date()).days
}
self.ocr_collection.insert_one(record)
return f"Added OCR record: Brand = {brand}, Expiry Date = {expiry_date}"
except Exception as e:
return f"Error adding OCR record: {e}"
def get_ocr_records(self, filter_criteria=None, sort_by='timestamp', ascending=False):
"""
Retrieve OCR records with flexible filtering and sorting
"""
try:
filter_criteria = filter_criteria or {}
sort_direction = 1 if ascending else -1
records = self.ocr_collection.find(filter_criteria).sort(sort_by, sort_direction)
return [
{
"S.No": record['S.No'],
"Brand": record['brand'],
"Expiry Date": record['expiry_date'],
"Manufacture Date": record['manufacture_date'],
"MRP": record['mrp']
} for record in records
]
except Exception as e:
return f"Error retrieving OCR records: {e}"
def add_freshness_record(self, produce, shelf_life, characteristics, eatable):
"""
Add freshness record with enhanced validation
"""
try:
# Input validations
if not produce or not isinstance(produce, str):
return "Invalid produce name"
# Convert to boolean or validate eatable status
if isinstance(eatable, str):
eatable = eatable.lower() in ['true', 'yes', '1']
record = {
"S.No": self.freshness_collection.count_documents({}) + 1,
"timestamp": datetime.now(),
"produce": produce.strip().title(),
"Shelf-Life": shelf_life,
"Characteristics": characteristics,
"Eatable": bool(eatable)
}
self.freshness_collection.insert_one(record)
return f"Added freshness record: Produce = {produce}, Shelf-Life = {shelf_life}"
except Exception as e:
return f"Error adding freshness record: {e}"
# Advanced Analysis Methods
def analyze_brand_trends(self, time_period=30):
"""
Analyze brand trends over a specified time period
"""
try:
cutoff_date = datetime.now() - timedelta(days=time_period)
pipeline = [
{"$match": {"timestamp": {"$gte": cutoff_date}}},
{"$group": {
"_id": "$brand",
"total_count": {"$sum": "$count"},
"avg_count": {"$avg": "$count"},
"first_seen": {"$min": "$timestamp"},
"last_seen": {"$max": "$timestamp"}
}},
{"$sort": {"total_count": -1}}
]
trends = list(self.brand_collection.aggregate(pipeline))
# Enrich the results
for trend in trends:
trend['brand'] = trend.pop('_id')
trend['first_seen'] = trend['first_seen'].strftime('%Y-%m-%d %H:%M:%S')
trend['last_seen'] = trend['last_seen'].strftime('%Y-%m-%d %H:%M:%S')
trend['total_count'] = round(trend['total_count'], 2)
trend['avg_count'] = round(trend['avg_count'], 2)
return trends
except Exception as e:
return f"Error analyzing brand trends: {e}"
# Additional methods from previous implementations
def get_all_brand_records(self):
"""
Retrieve all brand records
"""
try:
records = self.brand_collection.find()
result = [
f"S.No: {record['S.No']}, Brand: {record['brand']}, Count: {record['count']}"
for record in records
]
return "\n".join(result) if result else "No brand records found."
except Exception as e:
return f"An error occurred while fetching brand records: {e}"
def get_all_ocr_records(self):
"""
Retrieve all OCR records
"""
try:
records = self.ocr_collection.find()
result = [
f"S.No: {record['S.No']}, Brand: {record['brand']}, Expiry Date: {record['expiry_date']}, "
f"Manufacture Date: {record['manufacture_date']}, MRP: {record['mrp']}"
for record in records
]
return "\n".join(result) if result else "No OCR records found."
except Exception as e:
return f"An error occurred while fetching OCR records: {e}"
def get_all_freshness_records(self):
"""
Retrieve all freshness records
"""
try:
records = self.freshness_collection.find()
result = [
f"S.No: {record['S.No']}, Produce: {record['produce']}, Shelf-Life: {record['Shelf-Life']}, "
f"Characteristics: {record['Characteristics']}, Eatable: {record['Eatable']}"
for record in records
]
return "\n".join(result) if result else "No freshness records found."
except Exception as e:
return f"An error occurred while fetching freshness records: {e}"
def get_freshness_records(self, filter_criteria=None, sort_by='timestamp', ascending=False):
"""
Retrieve freshness records with flexible filtering and sorting
"""
try:
filter_criteria = filter_criteria or {}
sort_direction = 1 if ascending else -1
records = self.freshness_collection.find(filter_criteria).sort(sort_by, sort_direction)
return [
{
"S.No": record['S.No'],
"Produce": record['produce'],
"Shelf-Life": record['Shelf-Life'],
"Characteristics": record['Characteristics'],
"Eatable": record['Eatable']
} for record in records
]
except Exception as e:
return f"Error retrieving freshness records: {e}"
# Advanced Search and Filtering
def advanced_search(self, collection_name, search_criteria=None, projection=None):
"""
Advanced search method with flexible filtering
"""
try:
collection_map = {
'brand': self.brand_collection,
'ocr': self.ocr_collection,
'freshness': self.freshness_collection
}
if collection_name not in collection_map:
return "Invalid collection name"
search_criteria = search_criteria or {}
projection = projection or {}
results = list(collection_map[collection_name].find(search_criteria, projection))
# Convert ObjectId to string for JSON serialization
for result in results:
if '_id' in result:
result['_id'] = str(result['_id'])
return results
except Exception as e:
return f"Error performing advanced search: {e}"
# More Advanced Methods (Export, Statistical Analysis, etc.)
def export_collection_to_json(self, collection_name, filename=None):
"""
Export a collection to a JSON file
"""
try:
collection_map = {
'brand': self.brand_collection,
'ocr': self.ocr_collection,
'freshness': self.freshness_collection
}
if collection_name not in collection_map:
return "Invalid collection name"
documents = list(collection_map[collection_name].find())
for doc in documents:
doc['_id'] = str(doc['_id'])
if not filename:
filename = f"{collection_name}_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(documents, f, indent=2, default=str)
return f"Successfully exported {len(documents)} documents to {filename}"
except Exception as e:
return f"Error exporting collection: {e}"
def close_connection(self):
"""
Properly close the MongoDB connection
"""
try:
self.client.close()
print("MongoDB connection closed successfully")
except Exception as e:
print(f"Error closing connection: {e}")
# Example usage
def main():
try:
# Initialize database manager
db_manager = DatabaseManager()
# Example of using various methods
print(db_manager.add_brand_record("Nike", 150))
print(db_manager.add_ocr_record("Adidas", "2024-12-31", "2023-06-01", 75.50))
print(db_manager.add_freshness_record("Apple", "14 days", "Red, crisp", True))
# Retrieve records
print("\nBrand Trends:")
print(db_manager.analyze_brand_trends())
# Advanced search example
print("\nAdvanced Search:")
search_results = db_manager.advanced_search('brand',
search_criteria={'count': {'$gt': 100}},
projection={'brand': 1, 'count': 1}
)
print(json.dumps(search_results, indent=2))
except Exception as e:
print(f"An error occurred: {e}")
finally:
db_manager.close_connection()
if __name__ == "__main__":
main()