# import streamlit as st # import torch # # Tiêu đề của ứng dụng # st.title('Hiển thị hình ảnh từ URL') # # URL hình ảnh mẫu # image_url = "https://upload.wikimedia.org/wikipedia/commons/4/47/PNG_transparency_demonstration_1.png" # # Hiển thị hình ảnh # st.image(image_url, caption='Hình ảnh từ URL', use_column_width=True) import streamlit as st import torch from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline import spacy import json import pandas as pd import re from transformers.models.roberta.modeling_roberta import * class MRCQuestionAnswering(RobertaPreTrainedModel): config_class = RobertaConfig def _reorder_cache(self, past, beam_idx): pass _keys_to_ignore_on_load_unexpected = [r"pooler"] _keys_to_ignore_on_load_missing = [r"position_ids"] def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.roberta = RobertaModel(config, add_pooling_layer=False) self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels) self.init_weights() def forward( self, input_ids=None, words_lengths=None, start_idx=None, end_idx=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, start_positions=None, end_positions=None, span_answer_ids=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.roberta( input_ids, attention_mask=attention_mask, token_type_ids=None, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) sequence_output = outputs[0] context_embedding = sequence_output batch_size = input_ids.shape[0] max_sub_word = input_ids.shape[1] max_word = words_lengths.shape[1] align_matrix = torch.zeros((batch_size, max_word, max_sub_word)) for i, sample_length in enumerate(words_lengths): for j in range(len(sample_length)): start_idx = torch.sum(sample_length[:j]) align_matrix[i][j][start_idx: start_idx + sample_length[j]] = 1 if sample_length[j] > 0 else 0 align_matrix = align_matrix.to(context_embedding.device) context_embedding_align = torch.bmm(align_matrix, context_embedding) logits = self.qa_outputs(context_embedding_align) start_logits, end_logits = logits.split(1, dim=-1) start_logits = start_logits.squeeze(-1).contiguous() end_logits = end_logits.squeeze(-1).contiguous() total_loss = None if start_positions is not None and end_positions is not None: if len(start_positions.size()) > 1: start_positions = start_positions.squeeze(-1) if len(end_positions.size()) > 1: end_positions = end_positions.squeeze(-1) ignored_index = start_logits.size(1) start_positions = start_positions.clamp(0, ignored_index) end_positions = end_positions.clamp(0, ignored_index) loss_fct = CrossEntropyLoss(ignore_index=ignored_index) start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) total_loss = (start_loss + end_loss) / 2 if not return_dict: output = (start_logits, end_logits) + outputs[2:] return ((total_loss,) + output) if total_loss is not None else output return QuestionAnsweringModelOutput( loss=total_loss, start_logits=start_logits, end_logits=end_logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) from nltk import word_tokenize from transformers.models.auto import MODEL_FOR_QUESTION_ANSWERING_MAPPING def tokenize_function(example, tokenizer): question_word = word_tokenize(example["question"]) context_word = word_tokenize(example["context"]) question_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in question_word] context_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in context_word] valid = True if len([j for i in question_sub_words_ids + context_sub_words_ids for j in i]) > tokenizer.model_max_length - 1: valid = False question_sub_words_ids = [[tokenizer.bos_token_id]] + question_sub_words_ids + [[tokenizer.eos_token_id]] context_sub_words_ids = context_sub_words_ids + [[tokenizer.eos_token_id]] input_ids = [j for i in question_sub_words_ids + context_sub_words_ids for j in i] if len(input_ids) > tokenizer.model_max_length: valid = False words_lengths = [len(item) for item in question_sub_words_ids + context_sub_words_ids] return { "input_ids": input_ids, "words_lengths": words_lengths, "valid": valid } def data_collator(samples, tokenizer): if len(samples) == 0: return {} def collate_tokens(values, pad_idx, eos_idx=None, left_pad=False, move_eos_to_beginning=False): size = max(v.size(0) for v in values) res = values[0].new(len(values), size).fill_(pad_idx) def copy_tensor(src, dst): assert dst.numel() == src.numel() if move_eos_to_beginning: assert src[-1] == eos_idx dst[0] = eos_idx dst[1:] = src[:-1] else: dst.copy_(src) for i, v in enumerate(values): copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)]) return res input_ids = collate_tokens([torch.tensor(item['input_ids']) for item in samples], pad_idx=tokenizer.pad_token_id) attention_mask = torch.zeros_like(input_ids) for i in range(len(samples)): attention_mask[i][:len(samples[i]['input_ids'])] = 1 words_lengths = collate_tokens([torch.tensor(item['words_lengths']) for item in samples], pad_idx=0) batch_samples = { 'input_ids': input_ids, 'attention_mask': attention_mask, 'words_lengths': words_lengths, } return batch_samples def extract_answer(inputs, outputs, tokenizer): plain_result = [] for sample_input, start_logit, end_logit in zip(inputs, outputs.start_logits, outputs.end_logits): sample_words_length = sample_input['words_lengths'] input_ids = sample_input['input_ids'] answer_start = sum(sample_words_length[:torch.argmax(start_logit)]) answer_end = sum(sample_words_length[:torch.argmax(end_logit) + 1]) if answer_start <= answer_end: answer = tokenizer.convert_tokens_to_string( tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end])) if answer == tokenizer.bos_token: answer = '' else: answer = '' score_start = torch.max(torch.softmax(start_logit, dim=-1)).cpu().detach().numpy().tolist() score_end = torch.max(torch.softmax(end_logit, dim=-1)).cpu().detach().numpy().tolist() plain_result.append({ "answer": answer, "score_start": score_start, "score_end": score_end }) return plain_result # Load mô hình Phobert model_checkpoint = "minhdang14902/Roberta_edu" tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) model = MRCQuestionAnswering.from_pretrained(model_checkpoint) # Load mô hình Roberta from transformers import AutoModelForSequenceClassification model_sentiment = AutoModelForSequenceClassification.from_pretrained('minhdang14902/PhoBert_Edu') tokenizer_sentiment = AutoTokenizer.from_pretrained('minhdang14902/PhoBert_Edu') chatbot_sentiment = pipeline("sentiment-analysis", model=model_sentiment, tokenizer=tokenizer_sentiment) import spacy import json # Khởi tạo mô hình spaCy tiếng Việt nlp = spacy.load('vi_core_news_lg') import pandas as pd def load_json_file(filename): with open(filename) as f: file = json.load(f) return file filename = './data/QA_Legal_converted_merged.json' intents = load_json_file(filename) def create_df(): df = pd.DataFrame({ 'Pattern' : [], 'Tag' : [] }) return df df = create_df() def extract_json_info(json_file, df): for intent in json_file['intents']: for pattern in intent['patterns']: sentence_tag = [pattern, intent['tag']] df.loc[len(df.index)] = sentence_tag return df df = extract_json_info(intents, df) df2 = df.copy() labels = df2['Tag'].unique().tolist() labels = [s.strip() for s in labels] num_labels = len(labels) id2label = {i: label for i, label in enumerate(labels)} label2id = {v: k for k, v in id2label.items()} def preprocess(text, df): def remove_numbers_and_special_chars(text): text = re.sub(r'\d+', '', text) text = re.sub(r'[^\w\s]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text text = text.lower() text = remove_numbers_and_special_chars(text) text_nlp = nlp(text) filtered_sentence = [token.text for token in text_nlp if not token.is_stop] text = ' '.join(filtered_sentence) return text def predict(text): new_text = preprocess(text, df2) probs = chatbot_sentiment(new_text) predicted_label = max(probs, key=lambda x: x['score'])['label'] return predicted_label # Thiết lập giao diện người dùng bằng Streamlit st.title("Vietnamese Legal Q&A Chatbot") st.write("Nhập câu hỏi của bạn về các vấn đề pháp lý:") user_question = st.text_input("Câu hỏi:") if st.button("Gửi câu hỏi"): if user_question: st.write("Câu hỏi của bạn:", user_question) # Tìm câu trả lời từ tập dữ liệu intents found_intent = None for intent in intents['intents']: if user_question.lower() in [pattern.lower() for pattern in intent['patterns']]: found_intent = intent break if found_intent: answer = found_intent['responses'][0] st.write("Câu trả lời:", answer) else: result = predict(user_question) if result: st.write("Thẻ dự đoán:", result) # Tạo đầu vào cho mô hình QA qa_inputs = [{ 'context': found_intent['responses'][0] if found_intent else 'Tôi không có thông tin phù hợp.', 'question': user_question }] qa_features = [] for qa_input in qa_inputs: feature = tokenize_function(qa_input, tokenizer) if feature["valid"]: qa_features.append(feature) qa_batch = data_collator(qa_features, tokenizer) with torch.no_grad(): outputs = model(**qa_batch) answers = extract_answer(qa_features, outputs, tokenizer) best_answer = max(answers, key=lambda x: (x['score_start'] + x['score_end']) / 2) st.write("Câu trả lời:", best_answer['answer'])