File size: 7,969 Bytes
25f22bf
 
 
35c65a3
25f22bf
 
 
35c65a3
 
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35c65a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25f22bf
 
 
 
 
 
 
35c65a3
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35c65a3
 
25f22bf
 
35c65a3
25f22bf
 
 
 
 
 
 
35c65a3
 
 
 
 
 
 
 
 
 
 
25f22bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import re
import json
import unicodedata
import io
from flask import current_app
from gradio_client import Client
import pandas as pd
from PIL import Image
import base64

class ContentService:
    """Service for AI content generation using Hugging Face models."""
    
    def __init__(self, hugging_key=None):
        # Use provided key or fall back to app config
        self.hugging_key = hugging_key or current_app.config.get('HUGGING_KEY')
        # Initialize the Gradio client for content generation
        self.client = Client("Zelyanoth/Linkedin_poster_dev", hf_token=self.hugging_key)
    
    def validate_unicode_content(self, content):
        """Validate Unicode content while preserving original formatting and spaces."""
        if not content or not isinstance(content, str):
            return content
        
        try:
            # Test if content can be encoded as UTF-8
            content.encode('utf-8')
            return content  # Return original content if it's valid UTF-8
        except UnicodeEncodeError:
            try:
                # If encoding fails, try to preserve as much as possible
                return content.encode('utf-8', errors='replace').decode('utf-8')
            except:
                # Ultimate fallback
                return str(content)
    
    def preserve_formatting(self, content):
        """Preserve spaces, line breaks, and paragraph formatting."""
        if not content:
            return content
        
        # Preserve all whitespace characters including spaces, tabs, and newlines
        # This ensures that paragraph breaks and indentation are maintained
        try:
            # Test encoding first
            content.encode('utf-8')
            return content
        except UnicodeEncodeError:
            # Fallback with error replacement but preserve whitespace
            return content.encode('utf-8', errors='replace').decode('utf-8')
    
    def sanitize_content_for_api(self, content):
        """Sanitize content for API calls while preserving original text, spaces, and formatting."""
        if not content:
            return content
        
        # First preserve formatting and spaces
        preserved = self.preserve_formatting(content)
        
        # Only validate Unicode, don't remove spaces or formatting
        validated = self.validate_unicode_content(preserved)
        
        # Only remove null bytes that might cause issues in API calls
        if '\x00' in validated:
            validated = validated.replace('\x00', '')
        
        # Ensure line breaks and spaces are preserved
        validated = validated.replace('\r\n', '\n').replace('\r', '\n')
        
        return validated
    
    def _is_base64_image(self, data):
        """Check if the data is a base64 encoded image string."""
        if not isinstance(data, str):
            return False
        
        # Check if it starts with data URL prefix
        if data.startswith('data:image/'):
            return True
        
        # Try to decode as base64
        try:
            # Extract base64 part if it's a data URL
            if ',' in data:
                base64_part = data.split(',')[1]
            else:
                base64_part = data
            
            # Try to decode
            base64.b64decode(base64_part, validate=True)
            return True
        except Exception:
            return False
    
    def _base64_to_bytes(self, base64_string):
        """Convert a base64 encoded string to bytes."""
        try:
            # If it's a data URL, extract the base64 part
            if base64_string.startswith('data:image/'):
                base64_part = base64_string.split(',')[1]
            else:
                base64_part = base64_string
            
            # Decode base64 to bytes
            return base64.b64decode(base64_part, validate=True)
        except Exception as e:
            current_app.logger.error(f"Failed to decode base64 image: {str(e)}")
            raise Exception(f"Failed to decode base64 image: {str(e)}")
    
    def generate_post_content(self, user_id: str) -> tuple:
        """
        Generate post content using AI.
        
        Args:
            user_id (str): User ID for personalization
            
        Returns:
            tuple: (Generated post content, Image URL or None)
        """
        try:
            # Call the Hugging Face model to generate content
            result = self.client.predict(
                code=user_id,
                api_name="/poster_linkedin"
            )
            
            # Parse the result (assuming it returns a list with content as first element)
            # First try to parse as JSON
            try:
                parsed_result = json.loads(result)
            except json.JSONDecodeError:
                # If JSON parsing fails, check if it's already a Python list/object
                try:
                    # Try to evaluate as Python literal (safe for lists/dicts)
                    import ast
                    parsed_result = ast.literal_eval(result)
                except (ValueError, SyntaxError):
                    # If that fails, treat the result as a plain string
                    parsed_result = [result]
            
            # Extract the first element if it's a list
            if isinstance(parsed_result, list):
                generated_content = parsed_result[0] if parsed_result and parsed_result[0] is not None else "Generated content will appear here..."
                # Extract the second element as image URL if it exists
                image_data = parsed_result[1] if len(parsed_result) > 1 and parsed_result[1] is not None else None
            else:
                generated_content = str(parsed_result) if parsed_result is not None else "Generated content will appear here..."
                image_data = None
                
            # Validate, sanitize, and preserve formatting of the generated content
            sanitized_content = self.sanitize_content_for_api(generated_content)
            
            # Ensure paragraph breaks and formatting are preserved
            final_content = self.preserve_formatting(sanitized_content)
            
            # Handle image data - could be URL or base64
            image_bytes = None
            if image_data:
                if self._is_base64_image(image_data):
                    # Convert base64 to bytes for storage
                    image_bytes = self._base64_to_bytes(image_data)
                else:
                    # It's a URL, keep as string
                    image_bytes = image_data
            
            return (final_content, image_bytes)
            
        except Exception as e:
            error_message = str(e)
            current_app.logger.error(f"Content generation failed: {error_message}")
            raise Exception(f"Content generation failed: {error_message}")
    
    def add_rss_source(self, rss_link: str, user_id: str) -> str:
        """
        Add an RSS source for content generation.
        
        Args:
            rss_link (str): RSS feed URL
            user_id (str): User ID
            
        Returns:
            str: Result message
        """
        try:
            # Call the Hugging Face model to add RSS source
            rss_input = f"{rss_link}__thi_irrh'èçs_my_id__! {user_id}"
            sanitized_rss_input = self.sanitize_content_for_api(rss_input)
            
            result = self.client.predict(
                rss_link=sanitized_rss_input,
                api_name="/ajouter_rss"
            )
            
            # Sanitize and preserve formatting of the result
            sanitized_result = self.sanitize_content_for_api(result)
            return self.preserve_formatting(sanitized_result)
            
        except Exception as e:
            raise Exception(f"Failed to add RSS source: {str(e)}")