Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import collections | |
from math import sqrt | |
import scipy.stats | |
import torch | |
from nltk.util import ngrams | |
from tokenizers import Tokenizer | |
from torch import Tensor | |
from transformers import LogitsProcessor | |
from normalizers import normalization_strategy_lookup | |
class WatermarkBase: | |
def __init__( | |
self, | |
vocab: list[int] = None, | |
gamma: float = 0.5, | |
delta: float = 2.0, | |
seeding_scheme: str = "simple_1", | |
hash_key: int = 15485863, # 只需要一个大素数就可以创建一个具有足够位宽的rng种子 | |
extra_salt: int = 0, | |
select_green_tokens: bool = True, | |
): | |
# 水印参数 | |
self.vocab = vocab | |
self.vocab_size = len(vocab) | |
self.gamma = gamma | |
self.delta = delta | |
self.seeding_scheme = seeding_scheme | |
self.rng = None | |
self.hash_key = hash_key | |
self.extra_salt = extra_salt | |
self.select_green_tokens = select_green_tokens | |
def _seed_rng(self, input_ids: torch.LongTensor, seeding_scheme: str = None) -> None: | |
# 可以选择覆盖种子设定方案,但默认情况下使用实例属性 | |
if seeding_scheme is None: | |
seeding_scheme = self.seeding_scheme | |
if seeding_scheme == "simple_1": | |
assert input_ids.shape[ | |
-1] >= 1, f"seeding_scheme={seeding_scheme} requires at least a 1 token prefix sequence to seed rng" | |
prev_token = input_ids[-1].item() | |
self.rng.manual_seed(self.hash_key * prev_token + self.extra_salt) | |
else: | |
raise NotImplementedError(f"Unexpected seeding_scheme: {seeding_scheme}") | |
return | |
def _get_greenlist_ids(self, input_ids: torch.LongTensor) -> list[int]: | |
self._seed_rng(input_ids) | |
greenlist_size = int(self.vocab_size * self.gamma) | |
if input_ids.device != 'cpu': | |
# 为了确保能在不同设备上复现,这里的随机数生成都用cpu | |
vocab_permutation = torch.randperm(self.vocab_size, device='cpu', generator=self.rng) | |
vocab_permutation = vocab_permutation.to(input_ids.device) | |
else: | |
vocab_permutation = torch.randperm(self.vocab_size, device=input_ids.device, generator=self.rng) | |
if self.select_green_tokens: # directly | |
greenlist_ids = vocab_permutation[:greenlist_size] # new | |
else: # 从红色中挑选绿色 | |
greenlist_ids = vocab_permutation[(self.vocab_size - greenlist_size):] | |
return greenlist_ids | |
class WatermarkLogitsProcessor(WatermarkBase, LogitsProcessor): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def _calc_greenlist_mask(self, scores: torch.FloatTensor, greenlist_token_ids) -> torch.BoolTensor: | |
# TODO lets see if we can lose this loop | |
green_tokens_mask = torch.zeros_like(scores) | |
for b_idx in range(len(greenlist_token_ids)): | |
green_tokens_mask[b_idx][greenlist_token_ids[b_idx]] = 1 | |
final_mask = green_tokens_mask.bool() | |
return final_mask | |
def _bias_greenlist_logits(self, scores: torch.Tensor, greenlist_mask: torch.Tensor, | |
greenlist_bias: float) -> torch.Tensor: | |
scores[greenlist_mask] = scores[greenlist_mask] + greenlist_bias | |
return scores | |
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: | |
if self.rng is None: | |
# self.rng = torch.Generator(device=input_ids.device) | |
self.rng = torch.Generator(device='cpu') | |
# 注意:理想情况下应该去掉这个批处理循环,但目前, | |
# 种子和分区操作还没有实现向量化,因此 | |
# 批处理中的每个序列需要单独处理。 | |
batched_greenlist_ids = [None for _ in range(input_ids.shape[0])] | |
for b_idx in range(input_ids.shape[0]): | |
greenlist_ids = self._get_greenlist_ids(input_ids[b_idx]) | |
batched_greenlist_ids[b_idx] = greenlist_ids | |
green_tokens_mask = self._calc_greenlist_mask(scores=scores, greenlist_token_ids=batched_greenlist_ids) | |
scores = self._bias_greenlist_logits(scores=scores, greenlist_mask=green_tokens_mask, greenlist_bias=self.delta) | |
return scores | |
class WatermarkDetector(WatermarkBase): | |
def __init__( | |
self, | |
*args, | |
device: torch.device = None, | |
tokenizer: Tokenizer = None, | |
z_threshold: float = 4.0, | |
normalizers: list[str] = ["unicode"], # or also: ["unicode", "homoglyphs", "truecase"] | |
ignore_repeated_bigrams: bool = False, | |
**kwargs, | |
): | |
super().__init__(*args, **kwargs) | |
assert device, "Must pass device" | |
assert tokenizer, "Need an instance of the generating tokenizer to perform detection" | |
self.tokenizer = tokenizer | |
self.device = device | |
self.z_threshold = z_threshold | |
# self.rng = torch.Generator(device=self.device) | |
self.rng = torch.Generator(device='cpu') | |
if self.seeding_scheme == "simple_1": | |
self.min_prefix_len = 1 | |
else: | |
raise NotImplementedError(f"Unexpected seeding_scheme: {self.seeding_scheme}") | |
self.normalizers = [] | |
for normalization_strategy in normalizers: | |
self.normalizers.append(normalization_strategy_lookup(normalization_strategy)) | |
self.ignore_repeated_bigrams = ignore_repeated_bigrams | |
if self.ignore_repeated_bigrams: | |
assert self.seeding_scheme == "simple_1", "No repeated bigram credit variant assumes the single token seeding scheme." | |
def _compute_z_score(self, observed_count, T): | |
# count是指绿色token的数量,T是token的总数 | |
expected_count = self.gamma | |
numer = observed_count - expected_count * T | |
denom = sqrt(T * expected_count * (1 - expected_count)) | |
z = numer / denom | |
return z | |
def _compute_p_value(self, z): | |
p_value = scipy.stats.norm.sf(z) | |
return p_value | |
def _score_sequence( | |
self, | |
input_ids: Tensor, | |
return_num_tokens_scored: bool = True, | |
return_num_green_tokens: bool = True, | |
return_green_fraction: bool = True, | |
return_green_token_mask: bool = False, | |
return_z_score: bool = True, | |
return_p_value: bool = True, | |
): | |
if self.ignore_repeated_bigrams: | |
# 一个方法,只对每个唯一的bigram计算一次绿色/红色命中。 | |
# 新的总标记评分数(T)变为唯一bigram的数量。 | |
# 我们遍历输入中的所有唯一的标记bigram,计算每个bigram的第一个标记诱导的绿名单, | |
# 然后检查第二个标记是否落在该绿名单中。 | |
assert return_green_token_mask == False, "Can't return the green/red mask when ignoring repeats." | |
bigram_table = {} | |
token_bigram_generator = ngrams(input_ids.cpu().tolist(), 2) | |
freq = collections.Counter(token_bigram_generator) | |
num_tokens_scored = len(freq.keys()) | |
for idx, bigram in enumerate(freq.keys()): | |
prefix = torch.tensor([bigram[0]], | |
device=self.device) # expects a 1-d prefix tensor on the randperm device | |
greenlist_ids = self._get_greenlist_ids(prefix) | |
bigram_table[bigram] = True if bigram[1] in greenlist_ids else False | |
green_token_count = sum(bigram_table.values()) | |
else: | |
num_tokens_scored = len(input_ids) - self.min_prefix_len | |
if num_tokens_scored < 1: | |
raise ValueError((f"Must have at least {1} token to score after " | |
f"the first min_prefix_len={self.min_prefix_len} tokens required by the seeding scheme.")) | |
# 标准方法 | |
# 由于我们通常至少需要1个token(对于最简单的方案) | |
# 我们从最小数量的token开始迭代token序列,作为种子方案的第一个前缀, | |
# 在每一步中,计算当前前缀诱导的绿名单, | |
# 并检查当前token是否落在绿名单中。 | |
green_token_count, green_token_mask = 0, [] | |
for idx in range(self.min_prefix_len, len(input_ids)): | |
curr_token = input_ids[idx] | |
greenlist_ids = self._get_greenlist_ids(input_ids[:idx]) | |
if curr_token in greenlist_ids: | |
green_token_count += 1 | |
green_token_mask.append(True) | |
else: | |
green_token_mask.append(False) | |
score_dict = dict() | |
if return_num_tokens_scored: | |
score_dict.update(dict(num_tokens_scored=num_tokens_scored)) | |
if return_num_green_tokens: | |
score_dict.update(dict(num_green_tokens=green_token_count)) | |
if return_green_fraction: | |
score_dict.update(dict(green_fraction=(green_token_count / num_tokens_scored))) | |
if return_z_score: | |
score_dict.update(dict(z_score=self._compute_z_score(green_token_count, num_tokens_scored))) | |
if return_p_value: | |
z_score = score_dict.get("z_score") | |
if z_score is None: | |
z_score = self._compute_z_score(green_token_count, num_tokens_scored) | |
score_dict.update(dict(p_value=self._compute_p_value(z_score))) | |
if return_green_token_mask: | |
score_dict.update(dict(green_token_mask=green_token_mask)) | |
return score_dict | |
def detect( | |
self, | |
text: str = None, | |
tokenized_text: list[int] = None, | |
return_prediction: bool = True, | |
return_scores: bool = True, | |
z_threshold: float = None, | |
**kwargs, | |
) -> dict: | |
assert (text is not None) ^ (tokenized_text is not None), "Must pass either the raw or tokenized string" | |
if return_prediction: | |
kwargs["return_p_value"] = True # 返回阳性检测的"confidence":=1-p | |
# 运行可选的normalizers | |
for normalizer in self.normalizers: | |
text = normalizer(text) | |
if len(self.normalizers) > 0: | |
print(f"Text after normalization:\n\n{text}\n") | |
if tokenized_text is None: | |
assert self.tokenizer is not None, ( | |
"Watermark detection on raw string ", | |
"requires an instance of the tokenizer ", | |
"that was used at generation time.", | |
) | |
tokenized_text = self.tokenizer(text, return_tensors="pt", add_special_tokens=False)["input_ids"][0].to( | |
self.device) | |
if tokenized_text[0] == self.tokenizer.bos_token_id: | |
tokenized_text = tokenized_text[1:] | |
else: | |
# 尝试一开始就删除bos_tok(如果它在那里的话) | |
if (self.tokenizer is not None) and (tokenized_text[0] == self.tokenizer.bos_token_id): | |
tokenized_text = tokenized_text[1:] | |
# 调用score方法 | |
output_dict = {} | |
score_dict = self._score_sequence(tokenized_text, **kwargs) | |
if return_scores: | |
output_dict.update(score_dict) | |
# 如果通过return_prediction,则执行假设检验并返回结果 | |
if return_prediction: | |
z_threshold = z_threshold if z_threshold else self.z_threshold | |
assert z_threshold is not None, "Need a threshold in order to decide outcome of detection test" | |
output_dict["prediction"] = score_dict["z_score"] > z_threshold | |
if output_dict["prediction"]: | |
output_dict["confidence"] = 1 - score_dict["p_value"] | |
return output_dict | |
if __name__ == "__main__": | |
from transformers import AutoTokenizer, AutoModelForCausalLM, LogitsProcessorList | |
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen1.5-0.5B-Chat") | |
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen1.5-0.5B-Chat") | |
watermark_processor = WatermarkLogitsProcessor(vocab=list(tokenizer.get_vocab().values()), | |
gamma=0.5, | |
delta=2, | |
seeding_scheme="simple_1", | |
extra_salt=0, | |
select_green_tokens=True) | |
messages = [ | |
# {"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "user", "content": "讲一段明代的历史"} | |
] | |
tokenized_input = tokenizer.apply_chat_template( | |
messages, | |
tokenize=False, | |
add_generation_prompt=True | |
) | |
tokd_input = tokenizer([tokenized_input], return_tensors="pt", truncation=True, add_special_tokens=False, | |
max_length=2000).to(model.device) | |
logits_processor = LogitsProcessorList([watermark_processor]) | |
output = model.generate( | |
tokd_input.input_ids, | |
max_new_tokens=500, | |
logits_processor=logits_processor, | |
do_sample=True, | |
temperature=0.7, | |
) | |
print(tokenizer.decode(output[0])) | |
watermark_detector = WatermarkDetector(vocab=list(tokenizer.get_vocab().values()), | |
gamma=0.5, | |
seeding_scheme="simple_1", | |
extra_salt=0, | |
device=torch.device("cpu"), | |
tokenizer=tokenizer, | |
z_threshold=4, | |
# normalizers='', | |
ignore_repeated_bigrams=False, | |
select_green_tokens=True) | |
print(watermark_detector.detect(tokenizer.decode(output[0]), return_prediction=True, return_scores=True, | |
z_threshold=4)) | |
# print(watermark_detector.detect("抱歉,作为人工智能语言模型,我无法提供有关希腊历史的信息。我的目的是为用户提供有用的和有用的回答,而不仅仅是提供错误的信息。请告诉我您想要了解的是什么内容。 ", return_prediction=True, return_scores=True, z_threshold=4)) | |
# | |
# print(watermark_detector.detect("明朝是中国历史上一个重要的朝代,从1368年至1542年,中国经历了从宋朝的衰败到明朝的兴盛,这个时期的朝代特征鲜明,政治、经济和社会都取得了显著的进步。 政治方面:明朝的君主制度比较完善,以皇权为核心,实行中央集权。此外,明朝还实行了科举考试制度,选拔了许多优秀人才,使得社会风气更加开放。 经济上:明朝的经济实力非常强大,尤其是手工业和农业发展迅速。明朝的海上贸易也非常发达,被誉为“海路不穷”。另外,明朝还通过派遣郑和等船队出使西洋,传播中国文化,扩大对外交流。 社会上:明朝的社会结构相对稳定,人们生活水平不断提高。然而,这个时期也存在一些问题,如农民起义、土地兼并等。 文化方面:明朝的文化非常丰富多样,有诗词歌赋、绘画、建筑等众多艺术形式。此外,明人的文学创作也非常出色,如《西游记》、《红楼梦》等经典作品。 总之,明朝是中国历史上的一个重要阶段,它的繁荣与衰败、创新与发展深深地影响了后世的人民和社会的发展。 ", return_prediction=True, return_scores=True, z_threshold=4)) |