Spaces:
Runtime error
Runtime error
File size: 7,962 Bytes
66786b8 8bfb070 66786b8 5f6c383 66786b8 8bfb070 66786b8 54f0839 66786b8 8bfb070 66786b8 8c2caa0 66786b8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# -*-coding:utf-8 -*-
import re
import numpy as np
import os
import json
import random
from self.prompt import self_prompt, gen_few_shot_prompt
from rouge_score import rouge_scorer
from langchain.prompts import PromptTemplate
from functools import partial
from langchain.chains.llm import LLMChain
from langchain.llms import OpenAI
from multiprocessing import Pool
def is_all_chinese(strs):
for _char in strs:
if not '\u4e00' <= _char <= '\u9fa5':
return False
return True
class ChineseTokenizer():
def tokenize(self, text):
tokens = [i.strip() for i in text if i.strip()]
return tokens
class SELF(object):
n_instance = 3 # 指令不够样本来凑,每个指令加入多个instance
prefix = "{id}. 指令:"
blacklist = ['图片', '图像', '文件', '作图', '绘画', '视频', '音频', '音乐', '流程图']
def __init__(self, seed_file, openai_key, n_human, n_machine, n_instruct, prompt):
self.llm = OpenAI(openai_api_key=openai_key, temperature=1,
stop=[f'\n{n_instruct}', '{n_instruct}', '{n_instruct}.'], # 当已生成足够的指令则停止
logit_bias={'50259': -100}, # 不生成最后的停止符#
max_tokens=-1
) # 默认davinci-003
self.n_human, self.n_machine, self.n_instruct = n_human, n_machine, n_instruct
self.n_gen, self.n_keep = 0, 0
self.human_instruction_data = []
self.machine_instruction_data = []
self.scorer = None # rougeL用于文本相似度计算
self.all_instruction_tokens = [] # 全部指令,用于新指令消重
self.all_instruction = [] # 全部指令,用于新指令消重
self.sample_few_shot = None
self.load_seed_task(seed_file)
self.init(prompt)
def load_seed_task(self, seed_file):
instruction_data = []
with open(seed_file, 'r', encoding='UTF8') as f:
for i in f.readlines():
tmp = json.loads(i)
for j in range(min(len(tmp['instances']), SELF.n_instance)):
instruction_data.append({'instruction': tmp['instruction'],
'input': tmp['instances'][j]['input'],
'output': tmp['instances'][j]['output']})
self.human_instruction_data = instruction_data
def init(self, prompt):
if not prompt:
prompt = self_prompt
self.chain = LLMChain(llm=self.llm, prompt=PromptTemplate.from_template(prompt))
self.scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=False, tokenizer=ChineseTokenizer())
self.all_instruction = self.human_instruction_data + self.machine_instruction_data
self.all_instruction_tokens = [self.scorer._tokenizer.tokenize(i['instruction']) for i in
self.all_instruction]
@property
def first_id(self):
# 第一个机器生成的指令id,前面是few-shot样例
return int(self.n_human + min(self.n_machine, len(self.machine_instruction_data)) + 1)
def generate(self):
"""
新指令生成
1. 采样few-shot[n_human + n_machine]
2. 生成指令
3. 解析模型结果得到新的指令样本
"""
# sample
seed_sample = random.sample(self.human_instruction_data, self.n_human)
machine_sample = random.sample(self.machine_instruction_data,
min(self.n_machine, len(self.machine_instruction_data)))
self.sample_few_shot = seed_sample + machine_sample
# build few-shot
few_shot = gen_few_shot_prompt(self.sample_few_shot)
few_shot += SELF.prefix.format(id=self.first_id) # 新生成的指令
# generate
result = self.chain({'few_shot': few_shot, 'n_instruct': self.n_instruct})
return result
def decode_response(self, response):
if response is None:
return []
if '###' not in response['text']:
return []
raw_instruct = SELF.prefix.format(id=self.first_id) + response['text']
raw_instruct = raw_instruct.split('###')
instruction_data = []
for id, inst in enumerate(raw_instruct):
# 因为超长停止的最后一个指令往往被阶段,这里直接丢弃
if id == len(raw_instruct) and response['finish_reason'] == 'length':
continue
splitted_data = re.split(f"{id + self.first_id}\.\s+(指令|输入|输出):", inst)
if len(splitted_data) != 7:
continue # 生成部分缺失或格式错误
else:
inst = splitted_data[2].strip()
input = splitted_data[4].strip()
input = "" if input.lower() == '<无输入>' else input
output = splitted_data[6].strip()
print({'instruction': inst, 'input': input, 'output': output})
# 过滤过长,过断的指令
if len(inst) <= 3 or len(inst) >= 100:
continue
# 过滤有疑似模型无法执行的指令
if any((i in inst for i in SELF.blacklist)):
continue
# 如果指令开头并非中文
if not is_all_chinese(inst[:3]):
continue
instruction_data.append({'instruction': inst, 'input': input, 'output': output})
return instruction_data
def sim_filter(self, instruction_data):
## 过滤和已有指令池相似度过高的指令,保证多样性, 使用Rouge-L最长公共子串
keep_instruction = []
for inst in instruction_data:
inst_tokens = self.scorer._tokenizer.tokenize(inst['instruction'])
with Pool(os.cpu_count()) as p:
rouge_scores = p.map(partial(rouge_scorer._score_lcs, self.all_instruction_tokens), inst_tokens)
rouge_l = [score.fmeasure for score in rouge_scores]
print(rouge_scores)
print(rouge_l)
top10_sim_inst = {
self.all_instruction[i]: rouge_l[i] for i in np.argsort(rouge_l)[-10:][::-1]
}
print(top10_sim_inst)
if max(rouge_l) > 0.7:
continue
inst['most_similar_instructions'] = top10_sim_inst
inst['avg_similarity_score'] = float(np.mean(rouge_l))
self.all_instruction.append(inst['instruction'])
self.all_instruction_tokens.append(inst_tokens)
keep_instruction.append(inst)
return keep_instruction
def step(self):
response = self.generate()
new_instruct_data = self.decode_response(response)
keep_instruct_data = self.sim_filter(new_instruct_data)
self.n_gen += len(new_instruct_data)
self.n_keep += len(keep_instruct_data)
self.machine_instruction_data += keep_instruct_data
return keep_instruct_data # for gradio output only
def dump_file(self, output_file):
with open(output_file, 'w', encoding='UTF8') as f:
for i in self.machine_instruction_data:
f.write(json.dumps(i, ensure_ascii=False) + '\n')
# Only Used for gradio display
def init_instance(seed_file, openai_key, n_human, n_machine, n_instruct, prompt):
# 允许用户输入prompt修改前缀指令命令
if not prompt:
prompt = self_prompt
self_instance = SELF(seed_file.name, openai_key, n_human, n_machine, n_instruct, prompt)
return self_instance
def generate_instruction(self_instance):
keep_instruct_data = self_instance.step()
return (json.dumps(self_instance.sample_few_shot, ensure_ascii=False),
json.dumps(keep_instruct_data, ensure_ascii=False),
f'已生成{self_instance.n_gen} 可用{self_instance.n_keep}')
|