File size: 8,381 Bytes
4f591e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import re
import pandas as pd
from typing import Dict, List, Union, Tuple
from cefrpy import CEFRSpaCyAnalyzer, CEFRLevel
import spacy

def extract_feedback_with_clean_quotes(feedback_str: str) -> Dict[str, Union[str, List[str]]]:
    section_map = {
        "Task Response feedback": "TR_feedback",
        "Coherence and Cohesion feedback": "CC_feedback",
        "Lexical Resource feedback": "LR_feedback",
        "Grammatical Range and Accuracy feedback": "GRA_feedback",
        "Is off topic": "is_off_topic",
        "Word limit satisfied": "word_limit",
        "Corrected essay": "Corrected_essay"
    }
    
    result = {v: None for v in section_map.values()}
    quote_results = {f"{v}_quotes": [] for v in section_map.values() if v.endswith('_feedback')}

    section_pattern = r'"(?P<header>(?:[^"]|\\")+)"\s*:\s*"(?P<content>(?:[^"]|\\")*)"'
    
    for match in re.finditer(section_pattern, feedback_str):
        header = match.group('header')
        content = match.group('content').replace('\\"', '"')
        
        if header in section_map:
            key = section_map[header]
            result[key] = content
            
            # Extract and clean quoted phrases for feedback sections
            if key.endswith('_feedback'):
                quotes = re.findall(r"'(.*?)'", content)
                clean_quotes = []
                for quote in quotes:
                    # Remove trailing punctuation
                    cleaned = re.sub(r'[.,;:!?]+$', '', quote.strip())
                    if cleaned:  # Only keep non-empty strings
                        clean_quotes.append(cleaned)
                quote_results[f"{key}_quotes"] = clean_quotes
    
    # Handle special cases
    for orig, new in [("Is off topic", "is_off_topic"), 
                     ("Word limit satisfied", "word_limit")]:
        if result[new] is None:
            match = re.search(rf'{orig}\s*:\s*"([^"]+)"', feedback_str)
            if match:
                result[new] = match.group(1)
    
    # Handle corrected essay (multi-line)
    if result["Corrected_essay"] is None:
        essay_match = re.search(
            r'"Corrected essay"\s*:\s*"(.*?)"(?=\s*[,\]}]|$)',
            feedback_str, 
            re.DOTALL
        )
        if essay_match:
            result["Corrected_essay"] = essay_match.group(1).replace('\\"', '"')
    
    return pd.Series({**result, **quote_results})


def extract_feedback_keys_values(feedback_str):
    try:
        # Map the feedback sections to standardized column names
        section_map = {
            '"Task Response feedback"': 'TR_feedback',
            '"Coherence and Cohesion feedback"': 'CC_feedback',
            '"Lexical Resource feedback"': 'LR_feedback',
            '"Grammatical Range and Accuracy feedback"': 'GRA_feedback',
            '"Corrected essay"': 'Corrected_essay'
        }
        result = {v: None for v in section_map.values()}  # Initialize with None
        for original_section, new_key in section_map.items():
            # Find the start of the section
            start = feedback_str.find(original_section)
            if start == -1:
                continue  
            # Find the end of this section (either next section or end of string)
            end = len(feedback_str)
            for other_section in section_map:
                if other_section != original_section:
                    other_start = feedback_str.find(other_section, start + 1)
                    if other_start != -1 and other_start < end:
                        end = other_start
            section_content = feedback_str[start:end].strip()
            key_end = section_content.find(':')
            if key_end == -1:
                continue
            value = section_content[key_end+1:].strip().strip(' ,')
            if value.startswith('"') and value.endswith('"'):
                value = value[1:-1]
            result[new_key] = value
        return pd.Series(result)  # Return as Series for DataFrame expansion
    except Exception as e:
        print(f"Error processing feedback: {e}")
        return pd.Series({k: None for k in section_map.values()})
    

def create_train_input(row):
    feedback_parts = [
        f"Task Response Feedback: {row['TR_feedback']}",
        f"Coherence and Cohesion Feedback: {row['CC_feedback']}",
        f"Lexical Resource Feedback: {row['LR_feedback']}",
        f"Grammatical Range and Accuracy Feedback: {row['GRA_feedback']}", 
        f"The essay has {row['word_count']} words and {row['paragraph_count']} paragraphs.",
        f"The CEFR statistics of this essay: {row['cefr_stat']}"
    ]
    feedback_str = "\n".join(feedback_parts)
    
    return (
        "{{TOPIC}}\n" + row['topic'] + 
        "\n{{ESSAY}}\n" + row['essay'] + 
        "\n{{CORRECTED_ESSAY}}\n" + row['Corrected_essay'] + 
        "\n{{FEEDBACK}}\n" + feedback_str
    )

column_mapping = {
    'Task Response': 'TR_score',
    'Coherence and Cohesion': 'CC_score',
    'Lexical Resource': 'LR_score',
    'Grammatical Range and Accuracy': 'GRA_score'
}


nlp = spacy.load("en_core_web_sm")

def get_cefr_stats(text):
    if not isinstance(text, str) or not text.strip():
        return {f'{level}_words': 0 for level in ['A1','A2','B1','B2','C1','C2','unknown']} | {'total_words': 0}
    
    ABBREVIATION_MAPPING = {
        "'m": "am",
        "'s": "is",
        "'re": "are",
        "'ve": "have",
        "'d": "had",
        "n't": "not",
        "'ll": "will"
    }

    ENTITY_TYPES_TO_SKIP_CEFR = {
        'QUANTITY', 'MONEY', 'LANGUAGE', 'LAW',
        'WORK_OF_ART', 'PRODUCT', 'GPE',
        'ORG', 'FAC', 'PERSON'
    }

    def get_word_level_count_statistic(level_tokens: List[Tuple[str, str, bool, float, int, int]]) -> dict:
        """Safe counting of CEFR levels with error handling"""
        difficulty_levels_count = [0] * 6
        unknown_count = 0
        result = {}
        
        for token in level_tokens:
            try:
                level = token[3]
                if level is None:
                    unknown_count += 1
                    continue
                    
                # Safely handle level conversion
                try:
                    level_round = round(float(level))
                    if 1 <= level_round <= 6:
                        difficulty_levels_count[level_round - 1] += 1
                    else:
                        unknown_count += 1
                except (ValueError, TypeError):
                    unknown_count += 1
                    
            except Exception as e:
                print(f"Error processing token: {e}")
                unknown_count += 1
        
        # Convert to CEFR level names
        for i in range(1, 7):
            result[f'{CEFRLevel(i)}_words'] = difficulty_levels_count[i - 1]
        result['unknown_words'] = unknown_count
        result['total_words'] = sum(difficulty_levels_count) + unknown_count
        
        # Calculate percentages
        if result['total_words'] > 0:
            for i in range(1, 7):
                result[f'{CEFRLevel(i)}_pct'] = (difficulty_levels_count[i - 1] / result['total_words']) * 100
            result['unknown_pct'] = (unknown_count / result['total_words']) * 100
        else:
            for i in range(1, 7):
                result[f'{CEFRLevel(i)}_pct'] = 0.0
            result['unknown_pct'] = 0.0
            
        return result

    try:
        # Handle encoding errors by cleaning the text first
        clean_text = text.encode('ascii', errors='ignore').decode('ascii')
        doc = nlp(clean_text)
        text_analyzer = CEFRSpaCyAnalyzer(
            entity_types_to_skip=ENTITY_TYPES_TO_SKIP_CEFR,
            abbreviation_mapping=ABBREVIATION_MAPPING
        )
        tokens = text_analyzer.analize_doc(doc)
        ans = str(get_word_level_count_statistic(tokens))
        return ans
        
    except Exception as e:
        print(f"Error analyzing text: {e}")
        return str({f'{level}_words': 0 for level in ['A1','A2','B1','B2','C1','C2','unknown']} | {'total_words': 0})
    
    

def replace_single_newlines(text):
    # Replace \n not preceded by \n or not followed by \n
    return re.sub(r'(?<!\n)\n(?!\n)', '\\\\n\\\\n', text)
# feedback_data = extract_feedback_with_clean_quotes(feedback_text)
# print(feedback_data["LR_feedback_quotes"])