Spaces:
Runtime error
Runtime error
# -*-coding:utf-8 -*- | |
import re | |
import numpy as np | |
import os | |
import json | |
import random | |
from self.prompt import self_prompt, gen_few_shot_prompt | |
from rouge_score import rouge_scorer | |
from langchain.prompts import PromptTemplate | |
from functools import partial | |
from langchain.chains.llm import LLMChain | |
from langchain.llms import OpenAI | |
from multiprocessing import Pool | |
def is_all_chinese(strs): | |
for _char in strs: | |
if not '\u4e00' <= _char <= '\u9fa5': | |
return False | |
return True | |
class ChineseTokenizer(): | |
def tokenize(self, text): | |
tokens = [i.strip() for i in text if i.strip()] | |
return tokens | |
class SELF(object): | |
n_instance = 3 # 指令不够样本来凑,每个指令加入多个instance | |
prefix = "{id}. 指令:" | |
blacklist = ['图片', '图像', '文件', '作图', '绘画', '视频', '音频', '音乐', '流程图'] | |
def __init__(self, seed_file, openai_key, n_human, n_machine, n_instruct, prompt): | |
self.llm = OpenAI(openai_api_key=openai_key, temperature=1, | |
stop=[f'\n{n_instruct}', '{n_instruct}', '{n_instruct}.'], # 当已生成足够的指令则停止 | |
logit_bias={'50259': -100}, # 不生成最后的停止符# | |
max_tokens=-1 | |
) # 默认davinci-003 | |
self.n_human, self.n_machine, self.n_instruct = n_human, n_machine, n_instruct | |
self.n_gen, self.n_keep = 0, 0 | |
self.human_instruction_data = [] | |
self.machine_instruction_data = [] | |
self.scorer = None # rougeL用于文本相似度计算 | |
self.all_instruction_tokens = [] # 全部指令,用于新指令消重 | |
self.all_instruction = [] # 全部指令,用于新指令消重 | |
self.sample_few_shot = None | |
self.load_seed_task(seed_file) | |
self.init(prompt) | |
def load_seed_task(self, seed_file): | |
instruction_data = [] | |
with open(seed_file, 'r', encoding='UTF8') as f: | |
for i in f.readlines(): | |
tmp = json.loads(i) | |
for j in range(min(len(tmp['instances']), SELF.n_instance)): | |
instruction_data.append({'instruction': tmp['instruction'], | |
'input': tmp['instances'][j]['input'], | |
'output': tmp['instances'][j]['output']}) | |
self.human_instruction_data = instruction_data | |
def init(self, prompt): | |
if not prompt: | |
prompt = self_prompt | |
self.chain = LLMChain(llm=self.llm, prompt=PromptTemplate.from_template(prompt)) | |
self.scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=False, tokenizer=ChineseTokenizer()) | |
self.all_instruction = self.human_instruction_data + self.machine_instruction_data | |
self.all_instruction_tokens = [self.scorer._tokenizer.tokenize(i['instruction']) for i in | |
self.all_instruction] | |
def first_id(self): | |
# 第一个机器生成的指令id,前面是few-shot样例 | |
return int(self.n_human + min(self.n_machine, len(self.machine_instruction_data)) + 1) | |
def generate(self): | |
""" | |
新指令生成 | |
1. 采样few-shot[n_human + n_machine] | |
2. 生成指令 | |
3. 解析模型结果得到新的指令样本 | |
""" | |
# sample | |
seed_sample = random.sample(self.human_instruction_data, self.n_human) | |
machine_sample = random.sample(self.machine_instruction_data, | |
min(self.n_machine, len(self.machine_instruction_data))) | |
self.sample_few_shot = seed_sample + machine_sample | |
# build few-shot | |
few_shot = gen_few_shot_prompt(self.sample_few_shot) | |
few_shot += SELF.prefix.format(id=self.first_id) # 新生成的指令 | |
# generate | |
result = self.chain({'few_shot': few_shot, 'n_instruct': self.n_instruct}) | |
return result | |
def decode_response(self, response): | |
if response is None: | |
return [] | |
if '###' not in response['text']: | |
return [] | |
raw_instruct = SELF.prefix.format(id=self.first_id) + response['text'] | |
raw_instruct = raw_instruct.split('###') | |
instruction_data = [] | |
for id, inst in enumerate(raw_instruct): | |
# 因为超长停止的最后一个指令往往被阶段,这里直接丢弃 | |
if id == len(raw_instruct) and response['finish_reason'] == 'length': | |
continue | |
splitted_data = re.split(f"{id + self.first_id}\.\s+(指令|输入|输出):", inst) | |
if len(splitted_data) != 7: | |
continue # 生成部分缺失或格式错误 | |
else: | |
inst = splitted_data[2].strip() | |
input = splitted_data[4].strip() | |
input = "" if input.lower() == '<无输入>' else input | |
output = splitted_data[6].strip() | |
print({'instruction': inst, 'input': input, 'output': output}) | |
# 过滤过长,过断的指令 | |
if len(inst) <= 3 or len(inst) >= 100: | |
continue | |
# 过滤有疑似模型无法执行的指令 | |
if any((i in inst for i in SELF.blacklist)): | |
continue | |
# 如果指令开头并非中文 | |
if not is_all_chinese(inst[:3]): | |
continue | |
instruction_data.append({'instruction': inst, 'input': input, 'output': output}) | |
return instruction_data | |
def sim_filter(self, instruction_data): | |
## 过滤和已有指令池相似度过高的指令,保证多样性, 使用Rouge-L最长公共子串 | |
keep_instruction = [] | |
for inst in instruction_data: | |
inst_tokens = self.scorer._tokenizer.tokenize(inst['instruction']) | |
with Pool(os.cpu_count()) as p: | |
rouge_scores = p.map(partial(rouge_scorer._score_lcs, self.all_instruction_tokens), inst_tokens) | |
rouge_l = [score.fmeasure for score in rouge_scores] | |
print(rouge_scores) | |
print(rouge_l) | |
top10_sim_inst = { | |
self.all_instruction[i]: rouge_l[i] for i in np.argsort(rouge_l)[-10:][::-1] | |
} | |
print(top10_sim_inst) | |
if max(rouge_l) > 0.7: | |
continue | |
inst['most_similar_instructions'] = top10_sim_inst | |
inst['avg_similarity_score'] = float(np.mean(rouge_l)) | |
self.all_instruction.append(inst['instruction']) | |
self.all_instruction_tokens.append(inst_tokens) | |
keep_instruction.append(inst) | |
return keep_instruction | |
def step(self): | |
response = self.generate() | |
new_instruct_data = self.decode_response(response) | |
keep_instruct_data = self.sim_filter(new_instruct_data) | |
self.n_gen += len(new_instruct_data) | |
self.n_keep += len(keep_instruct_data) | |
self.machine_instruction_data += keep_instruct_data | |
return keep_instruct_data # for gradio output only | |
def dump_file(self, output_file): | |
with open(output_file, 'w', encoding='UTF8') as f: | |
for i in self.machine_instruction_data: | |
f.write(json.dumps(i, ensure_ascii=False) + '\n') | |
# Only Used for gradio display | |
def init_instance(seed_file, openai_key, n_human, n_machine, n_instruct, prompt): | |
# 允许用户输入prompt修改前缀指令命令 | |
if not prompt: | |
prompt = self_prompt | |
self_instance = SELF(seed_file.name, openai_key, n_human, n_machine, n_instruct, prompt) | |
return self_instance | |
def generate_instruction(self_instance): | |
keep_instruct_data = self_instance.step() | |
return (json.dumps(self_instance.sample_few_shot, ensure_ascii=False), | |
json.dumps(keep_instruct_data, ensure_ascii=False), | |
f'已生成{self_instance.n_gen} 可用{self_instance.n_keep}') | |