OzTianlu commited on
Commit
dc694a3
·
verified ·
1 Parent(s): c065d33

Upload modeling_hf.py

Browse files
Files changed (1) hide show
  1. 0_CollinsSTWrapper/modeling_hf.py +212 -0
0_CollinsSTWrapper/modeling_hf.py ADDED
@@ -0,0 +1,212 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Collins-RoPE 极简 Embedding 模型(HuggingFace 原生实现)
3
+ 架构:Hash Embedding (2-Universal + Sign Hash) -> RoPE -> Transformer Encoder -> Mean Pooling
4
+ 目标参数量:~2M
5
+ """
6
+
7
+ import math
8
+ from dataclasses import dataclass
9
+ from typing import Optional
10
+
11
+ import torch
12
+ import torch.nn as nn
13
+ import torch.nn.functional as F
14
+ from transformers import PretrainedConfig, PreTrainedModel
15
+ from transformers.modeling_outputs import BaseModelOutput
16
+
17
+
18
+ class CollinsConfig(PretrainedConfig):
19
+ model_type = "collins"
20
+
21
+ def __init__(
22
+ self,
23
+ vocab_size: int = 30522,
24
+ num_buckets: int = 2048,
25
+ hidden_size: int = 256,
26
+ num_hidden_layers: int = 3,
27
+ num_attention_heads: int = 8,
28
+ intermediate_size: int = 1024,
29
+ hidden_dropout_prob: float = 0.1,
30
+ attention_probs_dropout_prob: float = 0.1,
31
+ max_position_embeddings: int = 512,
32
+ # 2-Universal Hash 固定种子(保证 load 后哈希一致)
33
+ hash_seed: int = 42,
34
+ **kwargs,
35
+ ):
36
+ super().__init__(**kwargs)
37
+ self.vocab_size = vocab_size
38
+ self.num_buckets = num_buckets
39
+ self.hidden_size = hidden_size
40
+ self.num_hidden_layers = num_hidden_layers
41
+ self.num_attention_heads = num_attention_heads
42
+ self.intermediate_size = intermediate_size
43
+ self.hidden_dropout_prob = hidden_dropout_prob
44
+ self.attention_probs_dropout_prob = attention_probs_dropout_prob
45
+ self.max_position_embeddings = max_position_embeddings
46
+ self.hash_seed = hash_seed
47
+
48
+
49
+ class CollinsHashEmbedding(nn.Module):
50
+ """
51
+ 2-Universal Hash + Sign Hash 压缩 Embedding。
52
+ 哈希参数从 config.hash_seed 确定性生成,保证 save/load 后一致。
53
+ """
54
+
55
+ def __init__(self, config: CollinsConfig):
56
+ super().__init__()
57
+ self.num_buckets = config.num_buckets
58
+ self.hidden_size = config.hidden_size
59
+
60
+ self.hash_table = nn.Parameter(
61
+ torch.randn(config.num_buckets, config.hidden_size)
62
+ / math.sqrt(config.hidden_size)
63
+ )
64
+
65
+ prime = 2147483647 # 梅森素数 2^31 - 1
66
+ rng = torch.Generator()
67
+ rng.manual_seed(config.hash_seed)
68
+ a1 = torch.randint(1, prime, (1,), generator=rng, dtype=torch.long)
69
+ b1 = torch.randint(0, prime, (1,), generator=rng, dtype=torch.long)
70
+ a2 = torch.randint(1, prime, (1,), generator=rng, dtype=torch.long)
71
+ b2 = torch.randint(0, prime, (1,), generator=rng, dtype=torch.long)
72
+
73
+ self.register_buffer("prime", torch.tensor(prime, dtype=torch.long))
74
+ self.register_buffer("a1", a1)
75
+ self.register_buffer("b1", b1)
76
+ self.register_buffer("a2", a2)
77
+ self.register_buffer("b2", b2)
78
+
79
+ def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
80
+ x = input_ids.long()
81
+ bucket_idx = ((x * self.a1 + self.b1) % self.prime) % self.num_buckets
82
+ sign = ((x * self.a2 + self.b2) % self.prime) % 2
83
+ sign = (sign * 2 - 1).float()
84
+ return self.hash_table[bucket_idx] * sign.unsqueeze(-1)
85
+
86
+
87
+ class CollinsModel(PreTrainedModel):
88
+ """
89
+ Collins-RoPE Encoder,输出 last_hidden_state 和 pooler_output。
90
+ 使用 transformers.models.bert 的 BertEncoder + RoPE 替换 BertEmbeddings。
91
+ """
92
+
93
+ config_class = CollinsConfig
94
+ base_model_prefix = "collins"
95
+ supports_gradient_checkpointing = True
96
+
97
+ def __init__(self, config: CollinsConfig):
98
+ super().__init__(config)
99
+ self.config = config
100
+
101
+ self.embeddings = CollinsHashEmbedding(config)
102
+
103
+ # 直接复用 HF BertEncoder(含 Multi-Head Attention + FFN + LayerNorm)
104
+ from transformers.models.bert.modeling_bert import BertEncoder, BertConfig
105
+
106
+ bert_cfg = BertConfig(
107
+ hidden_size=config.hidden_size,
108
+ num_hidden_layers=config.num_hidden_layers,
109
+ num_attention_heads=config.num_attention_heads,
110
+ intermediate_size=config.intermediate_size,
111
+ hidden_dropout_prob=config.hidden_dropout_prob,
112
+ attention_probs_dropout_prob=config.attention_probs_dropout_prob,
113
+ max_position_embeddings=config.max_position_embeddings,
114
+ # 关闭 Bert 自带的位置编码,我们用 RoPE
115
+ position_embedding_type="relative_key_query",
116
+ )
117
+ bert_cfg._attn_implementation = "eager"
118
+ self.encoder = BertEncoder(bert_cfg)
119
+
120
+ # RoPE 频率缓冲(无参数)
121
+ dim = config.hidden_size
122
+ inv_freq = 1.0 / (
123
+ 10000 ** (torch.arange(0, dim, 2).float() / dim)
124
+ )
125
+ t = torch.arange(config.max_position_embeddings).float()
126
+ freqs = torch.einsum("i,j->ij", t, inv_freq)
127
+ self.register_buffer("rope_cos", freqs.cos())
128
+ self.register_buffer("rope_sin", freqs.sin())
129
+
130
+ self.post_init()
131
+
132
+ def _apply_rope(self, x: torch.Tensor) -> torch.Tensor:
133
+ seq_len = x.shape[1]
134
+ cos = self.rope_cos[:seq_len].unsqueeze(0)
135
+ sin = self.rope_sin[:seq_len].unsqueeze(0)
136
+ x1, x2 = x[..., 0::2], x[..., 1::2]
137
+ return torch.cat([x1 * cos - x2 * sin, x1 * sin + x2 * cos], dim=-1)
138
+
139
+ def get_extended_attention_mask(self, attention_mask: torch.Tensor) -> torch.Tensor:
140
+ # BertEncoder 需要 [B, 1, 1, L] 形式的 mask,0 = 保留,-inf = 忽略
141
+ extended = attention_mask[:, None, None, :]
142
+ extended = (1.0 - extended.float()) * torch.finfo(torch.float32).min
143
+ return extended
144
+
145
+ def forward(
146
+ self,
147
+ input_ids: torch.Tensor,
148
+ attention_mask: Optional[torch.Tensor] = None,
149
+ return_dict: bool = True,
150
+ ):
151
+ if attention_mask is None:
152
+ attention_mask = torch.ones_like(input_ids)
153
+
154
+ x = self.embeddings(input_ids) # [B, L, D]
155
+ x = self._apply_rope(x) # [B, L, D]
156
+
157
+ ext_mask = self.get_extended_attention_mask(attention_mask)
158
+ encoder_out = self.encoder(x, attention_mask=ext_mask)
159
+ hidden_states = encoder_out.last_hidden_state # [B, L, D]
160
+
161
+ # Mean Pooling
162
+ mask = attention_mask.unsqueeze(-1).float()
163
+ pooled = (hidden_states * mask).sum(1) / mask.sum(1).clamp(min=1e-9)
164
+ pooled = F.normalize(pooled, p=2, dim=-1)
165
+
166
+ if not return_dict:
167
+ return (hidden_states, pooled)
168
+
169
+ return BaseModelOutput(
170
+ last_hidden_state=hidden_states,
171
+ hidden_states=None,
172
+ attentions=None,
173
+ ), pooled
174
+
175
+
176
+ class CollinsSTWrapper(nn.Module):
177
+ """
178
+ sentence-transformers 5.x 兼容包装层。
179
+ 持有 tokenizer,实现 tokenize() 接口,同时注入 sentence_embedding。
180
+ """
181
+
182
+ def __init__(self, collins_model: CollinsModel, tokenizer_name_or_path: str = "bert-base-uncased", max_seq_length: int = 128):
183
+ super().__init__()
184
+ from transformers import AutoTokenizer
185
+ self.collins_model = collins_model
186
+ self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path)
187
+ self.max_seq_length = max_seq_length
188
+
189
+ def tokenize(self, texts: list[str], padding: str | bool = True) -> dict:
190
+ return self.tokenizer(
191
+ texts,
192
+ padding=padding,
193
+ truncation=True,
194
+ max_length=self.max_seq_length,
195
+ return_tensors="pt",
196
+ )
197
+
198
+ def forward(self, features: dict) -> dict:
199
+ input_ids = features["input_ids"]
200
+ attention_mask = features.get("attention_mask", None)
201
+ _, pooled = self.collins_model(input_ids, attention_mask)
202
+ features["sentence_embedding"] = pooled
203
+ return features
204
+
205
+ def save(self, output_path: str):
206
+ self.collins_model.save_pretrained(output_path)
207
+ self.tokenizer.save_pretrained(output_path)
208
+
209
+ @staticmethod
210
+ def load(input_path: str) -> "CollinsSTWrapper":
211
+ model = CollinsModel.from_pretrained(input_path)
212
+ return CollinsSTWrapper(model, tokenizer_name_or_path=input_path)