# -*-coding:utf-8 -*- import re import numpy as np import os import json import random from self.prompt import self_prompt, gen_few_shot_prompt from rouge_score import rouge_scorer from langchain.prompts import PromptTemplate from functools import partial from langchain.chains.llm import LLMChain from langchain.llms import OpenAI from multiprocessing import Pool def is_all_chinese(strs): for _char in strs: if not '\u4e00' <= _char <= '\u9fa5': return False return True class ChineseTokenizer(): def tokenize(self, text): tokens = [i.strip() for i in text if i.strip()] return tokens class SELF(object): n_instance = 3 # 指令不够样本来凑,每个指令加入多个instance prefix = "{id}. 指令:" blacklist = ['图片', '图像', '文件', '作图', '绘画', '视频', '音频', '音乐', '流程图'] def __init__(self, seed_file, openai_key, n_human, n_machine, n_instruct, prompt): self.llm = OpenAI(openai_api_key=openai_key, temperature=1, stop=[f'\n{n_instruct}', '{n_instruct}', '{n_instruct}.'], # 当已生成足够的指令则停止 logit_bias={'50259': -100}, # 不生成最后的停止符# max_tokens=-1 ) # 默认davinci-003 self.n_human, self.n_machine, self.n_instruct = n_human, n_machine, n_instruct self.n_gen, self.n_keep = 0, 0 self.human_instruction_data = [] self.machine_instruction_data = [] self.scorer = None # rougeL用于文本相似度计算 self.all_instruction_tokens = [] # 全部指令,用于新指令消重 self.all_instruction = [] # 全部指令,用于新指令消重 self.sample_few_shot = None self.load_seed_task(seed_file) self.init(prompt) def load_seed_task(self, seed_file): instruction_data = [] with open(seed_file, 'r', encoding='UTF8') as f: for i in f.readlines(): tmp = json.loads(i) for j in range(min(len(tmp['instances']), SELF.n_instance)): instruction_data.append({'instruction': tmp['instruction'], 'input': tmp['instances'][j]['input'], 'output': tmp['instances'][j]['output']}) self.human_instruction_data = instruction_data def init(self, prompt): if not prompt: prompt = self_prompt self.chain = LLMChain(llm=self.llm, prompt=PromptTemplate.from_template(prompt)) self.scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=False, tokenizer=ChineseTokenizer()) self.all_instruction = self.human_instruction_data + self.machine_instruction_data self.all_instruction_tokens = [self.scorer._tokenizer.tokenize(i['instruction']) for i in self.all_instruction] @property def first_id(self): # 第一个机器生成的指令id,前面是few-shot样例 return int(self.n_human + min(self.n_machine, len(self.machine_instruction_data)) + 1) def generate(self): """ 新指令生成 1. 采样few-shot[n_human + n_machine] 2. 生成指令 3. 解析模型结果得到新的指令样本 """ # sample seed_sample = random.sample(self.human_instruction_data, self.n_human) machine_sample = random.sample(self.machine_instruction_data, min(self.n_machine, len(self.machine_instruction_data))) self.sample_few_shot = seed_sample + machine_sample # build few-shot few_shot = gen_few_shot_prompt(self.sample_few_shot) few_shot += SELF.prefix.format(id=self.first_id) # 新生成的指令 # generate result = self.chain({'few_shot': few_shot, 'n_instruct': self.n_instruct}) return result def decode_response(self, response): if response is None: return [] if '###' not in response['text']: return [] raw_instruct = SELF.prefix.format(id=self.first_id) + response['text'] raw_instruct = raw_instruct.split('###') instruction_data = [] for id, inst in enumerate(raw_instruct): # 因为超长停止的最后一个指令往往被阶段,这里直接丢弃 if id == len(raw_instruct) and response['finish_reason'] == 'length': continue splitted_data = re.split(f"{id + self.first_id}\.\s+(指令|输入|输出):", inst) if len(splitted_data) != 7: continue # 生成部分缺失或格式错误 else: inst = splitted_data[2].strip() input = splitted_data[4].strip() input = "" if input.lower() == '<无输入>' else input output = splitted_data[6].strip() print({'instruction': inst, 'input': input, 'output': output}) # 过滤过长,过断的指令 if len(inst) <= 3 or len(inst) >= 100: continue # 过滤有疑似模型无法执行的指令 if any((i in inst for i in SELF.blacklist)): continue # 如果指令开头并非中文 if not is_all_chinese(inst[:3]): continue instruction_data.append({'instruction': inst, 'input': input, 'output': output}) return instruction_data def sim_filter(self, instruction_data): ## 过滤和已有指令池相似度过高的指令,保证多样性, 使用Rouge-L最长公共子串 keep_instruction = [] for inst in instruction_data: inst_tokens = self.scorer._tokenizer.tokenize(inst['instruction']) with Pool(os.cpu_count()) as p: rouge_scores = p.map(partial(rouge_scorer._score_lcs, self.all_instruction_tokens), inst_tokens) rouge_l = [score.fmeasure for score in rouge_scores] print(rouge_scores) print(rouge_l) top10_sim_inst = { self.all_instruction[i]: rouge_l[i] for i in np.argsort(rouge_l)[-10:][::-1] } print(top10_sim_inst) if max(rouge_l) > 0.7: continue inst['most_similar_instructions'] = top10_sim_inst inst['avg_similarity_score'] = float(np.mean(rouge_l)) self.all_instruction.append(inst['instruction']) self.all_instruction_tokens.append(inst_tokens) keep_instruction.append(inst) return keep_instruction def step(self): response = self.generate() new_instruct_data = self.decode_response(response) keep_instruct_data = self.sim_filter(new_instruct_data) self.n_gen += len(new_instruct_data) self.n_keep += len(keep_instruct_data) self.machine_instruction_data += keep_instruct_data return keep_instruct_data # for gradio output only def dump_file(self, output_file): with open(output_file, 'w', encoding='UTF8') as f: for i in self.machine_instruction_data: f.write(json.dumps(i, ensure_ascii=False) + '\n') # Only Used for gradio display def init_instance(seed_file, openai_key, n_human, n_machine, n_instruct, prompt): # 允许用户输入prompt修改前缀指令命令 if not prompt: prompt = self_prompt self_instance = SELF(seed_file.name, openai_key, n_human, n_machine, n_instruct, prompt) return self_instance def generate_instruction(self_instance): keep_instruct_data = self_instance.step() return (json.dumps(self_instance.sample_few_shot, ensure_ascii=False), json.dumps(keep_instruct_data, ensure_ascii=False), f'已生成{self_instance.n_gen} 可用{self_instance.n_keep}')