MakeInstruction / self /generate.py
DSXiangLi
a
54f0839
raw
history blame
7.96 kB
# -*-coding:utf-8 -*-
import re
import numpy as np
import os
import json
import random
from self.prompt import self_prompt, gen_few_shot_prompt
from rouge_score import rouge_scorer
from langchain.prompts import PromptTemplate
from functools import partial
from langchain.chains.llm import LLMChain
from langchain.llms import OpenAI
from multiprocessing import Pool
def is_all_chinese(strs):
for _char in strs:
if not '\u4e00' <= _char <= '\u9fa5':
return False
return True
class ChineseTokenizer():
def tokenize(self, text):
tokens = [i.strip() for i in text if i.strip()]
return tokens
class SELF(object):
n_instance = 3 # 指令不够样本来凑,每个指令加入多个instance
prefix = "{id}. 指令:"
blacklist = ['图片', '图像', '文件', '作图', '绘画', '视频', '音频', '音乐', '流程图']
def __init__(self, seed_file, openai_key, n_human, n_machine, n_instruct, prompt):
self.llm = OpenAI(openai_api_key=openai_key, temperature=1,
stop=[f'\n{n_instruct}', '{n_instruct}', '{n_instruct}.'], # 当已生成足够的指令则停止
logit_bias={'50259': -100}, # 不生成最后的停止符#
max_tokens=-1
) # 默认davinci-003
self.n_human, self.n_machine, self.n_instruct = n_human, n_machine, n_instruct
self.n_gen, self.n_keep = 0, 0
self.human_instruction_data = []
self.machine_instruction_data = []
self.scorer = None # rougeL用于文本相似度计算
self.all_instruction_tokens = [] # 全部指令,用于新指令消重
self.all_instruction = [] # 全部指令,用于新指令消重
self.sample_few_shot = None
self.load_seed_task(seed_file)
self.init(prompt)
def load_seed_task(self, seed_file):
instruction_data = []
with open(seed_file, 'r', encoding='UTF8') as f:
for i in f.readlines():
tmp = json.loads(i)
for j in range(min(len(tmp['instances']), SELF.n_instance)):
instruction_data.append({'instruction': tmp['instruction'],
'input': tmp['instances'][j]['input'],
'output': tmp['instances'][j]['output']})
self.human_instruction_data = instruction_data
def init(self, prompt):
if not prompt:
prompt = self_prompt
self.chain = LLMChain(llm=self.llm, prompt=PromptTemplate.from_template(prompt))
self.scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=False, tokenizer=ChineseTokenizer())
self.all_instruction = self.human_instruction_data + self.machine_instruction_data
self.all_instruction_tokens = [self.scorer._tokenizer.tokenize(i['instruction']) for i in
self.all_instruction]
@property
def first_id(self):
# 第一个机器生成的指令id,前面是few-shot样例
return int(self.n_human + min(self.n_machine, len(self.machine_instruction_data)) + 1)
def generate(self):
"""
新指令生成
1. 采样few-shot[n_human + n_machine]
2. 生成指令
3. 解析模型结果得到新的指令样本
"""
# sample
seed_sample = random.sample(self.human_instruction_data, self.n_human)
machine_sample = random.sample(self.machine_instruction_data,
min(self.n_machine, len(self.machine_instruction_data)))
self.sample_few_shot = seed_sample + machine_sample
# build few-shot
few_shot = gen_few_shot_prompt(self.sample_few_shot)
few_shot += SELF.prefix.format(id=self.first_id) # 新生成的指令
# generate
result = self.chain({'few_shot': few_shot, 'n_instruct': self.n_instruct})
return result
def decode_response(self, response):
if response is None:
return []
if '###' not in response['text']:
return []
raw_instruct = SELF.prefix.format(id=self.first_id) + response['text']
raw_instruct = raw_instruct.split('###')
instruction_data = []
for id, inst in enumerate(raw_instruct):
# 因为超长停止的最后一个指令往往被阶段,这里直接丢弃
if id == len(raw_instruct) and response['finish_reason'] == 'length':
continue
splitted_data = re.split(f"{id + self.first_id}\.\s+(指令|输入|输出):", inst)
if len(splitted_data) != 7:
continue # 生成部分缺失或格式错误
else:
inst = splitted_data[2].strip()
input = splitted_data[4].strip()
input = "" if input.lower() == '<无输入>' else input
output = splitted_data[6].strip()
print({'instruction': inst, 'input': input, 'output': output})
# 过滤过长,过断的指令
if len(inst) <= 3 or len(inst) >= 100:
continue
# 过滤有疑似模型无法执行的指令
if any((i in inst for i in SELF.blacklist)):
continue
# 如果指令开头并非中文
if not is_all_chinese(inst[:3]):
continue
instruction_data.append({'instruction': inst, 'input': input, 'output': output})
return instruction_data
def sim_filter(self, instruction_data):
## 过滤和已有指令池相似度过高的指令,保证多样性, 使用Rouge-L最长公共子串
keep_instruction = []
for inst in instruction_data:
inst_tokens = self.scorer._tokenizer.tokenize(inst['instruction'])
with Pool(os.cpu_count()) as p:
rouge_scores = p.map(partial(rouge_scorer._score_lcs, self.all_instruction_tokens), inst_tokens)
rouge_l = [score.fmeasure for score in rouge_scores]
print(rouge_scores)
print(rouge_l)
top10_sim_inst = {
self.all_instruction[i]: rouge_l[i] for i in np.argsort(rouge_l)[-10:][::-1]
}
print(top10_sim_inst)
if max(rouge_l) > 0.7:
continue
inst['most_similar_instructions'] = top10_sim_inst
inst['avg_similarity_score'] = float(np.mean(rouge_l))
self.all_instruction.append(inst['instruction'])
self.all_instruction_tokens.append(inst_tokens)
keep_instruction.append(inst)
return keep_instruction
def step(self):
response = self.generate()
new_instruct_data = self.decode_response(response)
keep_instruct_data = self.sim_filter(new_instruct_data)
self.n_gen += len(new_instruct_data)
self.n_keep += len(keep_instruct_data)
self.machine_instruction_data += keep_instruct_data
return keep_instruct_data # for gradio output only
def dump_file(self, output_file):
with open(output_file, 'w', encoding='UTF8') as f:
for i in self.machine_instruction_data:
f.write(json.dumps(i, ensure_ascii=False) + '\n')
# Only Used for gradio display
def init_instance(seed_file, openai_key, n_human, n_machine, n_instruct, prompt):
# 允许用户输入prompt修改前缀指令命令
if not prompt:
prompt = self_prompt
self_instance = SELF(seed_file.name, openai_key, n_human, n_machine, n_instruct, prompt)
return self_instance
def generate_instruction(self_instance):
keep_instruct_data = self_instance.step()
return (json.dumps(self_instance.sample_few_shot, ensure_ascii=False),
json.dumps(keep_instruct_data, ensure_ascii=False),
f'已生成{self_instance.n_gen} 可用{self_instance.n_keep}')