File size: 6,158 Bytes
10597c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
from dataclasses import dataclass
import logging
import os
from abc import ABC
from typing import Optional
import torch
import json
from transformers import (
AutoModelForSeq2SeqLM,
AutoTokenizer,
)
from ts.torch_handler.base_handler import BaseHandler
logger = logging.getLogger(__name__)
MAX_TOKEN_LENGTH_ERR = {
"code": 422,
"type" : "MaxTokenLengthError",
"message": "Max token length exceeded",
}
class EngCopHandler(BaseHandler, ABC):
@dataclass
class GenerationConfig:
max_length: int = 20
max_new_tokens: Optional[int] = None
min_length: int = 0
min_new_tokens: Optional[int] = None
early_stopping: bool = True
do_sample: bool = False
num_beams: int = 1
num_beam_groups: int = 1
top_k: int = 50
top_p: float = 0.95
temperature: float = 1.0
diversity_penalty: float = 0.0
def __init__(self):
super(EngCopHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
"""In this initialize function, the HF large model is loaded and
partitioned using DeepSpeed.
Args:
ctx (context): It is a JSON Object containing information
pertaining to the model artifacts parameters.
"""
logger.info("Start initialize")
self.manifest = ctx.manifest
properties = ctx.system_properties
model_dir = properties.get("model_dir")
serialized_file = self.manifest["model"]["serializedFile"]
model_pt_path = os.path.join(model_dir, serialized_file)
setup_config_path = os.path.join(model_dir, "setup_self.config.json")
if os.path.isfile(setup_config_path):
with open(setup_config_path) as setup_config_path:
self.setup_config = json.load(setup_config_path)
seed = int(42)
torch.manual_seed(seed)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info("Device: %s", self.device)
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_dir)
self.model.to(self.device)
self.model.eval()
self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
self.config = EngCopHandler.GenerationConfig(
max_new_tokens=128,
min_new_tokens=1,
num_beams=5,
)
self.initialized = True
logger.info("Init done")
def preprocess(self, requests):
preprocessed_data = []
for data in requests:
data_item = data.get("data")
if data_item is None:
data_item = data.get("body")
if isinstance(data_item, (bytes, bytearray)):
data_item = data_item.decode("utf-8")
preprocessed_data.append(data_item)
logger.info("preprocessed_data %s: ", preprocessed_data)
return preprocessed_data
def inference(self, data):
indices = {}
batch = []
for i, item in enumerate(data):
tokens = self.tokenizer(item, return_tensors="pt", padding=True)
if len(tokens.input_ids.squeeze()) > self.tokenizer.model_max_length:
logger.info("Skipping token %s for index %s", tokens, i)
continue
indices[i] = len(batch)
batch.append(data[i])
logger.info("inference batch: %s", batch)
result = self.batch_translate(batch)
return [
degreekify(result[indices[i]]) if i in indices else None
for i in range(len(data))
]
def postprocess(self, output):
return output
def handle(self, requests, context):
logger.info("requests %s: ", requests)
preprocessed = self.preprocess(requests)
inference_data = self.inference(preprocessed)
postprocessed = self.postprocess(inference_data)
logger.info("inference result: %s", postprocessed)
responses = [
{"code": 200, "translation": translation}
if translation
else MAX_TOKEN_LENGTH_ERR
for translation in postprocessed
]
return responses
def batch_translate(self, input_sentences, output_confidence=False):
if len(input_sentences) == 0:
return []
inputs = self.tokenizer(input_sentences, return_tensors="pt", padding=True).to(
self.device
)
output_scores, return_dict_in_generate = output_confidence, output_confidence
outputs = self.model.generate(
**inputs,
max_length=self.config.max_length,
max_new_tokens=self.config.max_new_tokens,
min_length=self.config.min_length,
min_new_tokens=self.config.min_new_tokens,
early_stopping=self.config.early_stopping,
do_sample=self.config.do_sample,
num_beams=self.config.num_beams,
num_beam_groups=self.config.num_beam_groups,
top_k=self.config.top_k,
top_p=self.config.top_p,
temperature=self.config.temperature,
diversity_penalty=self.config.diversity_penalty,
output_scores=output_scores,
return_dict_in_generate=True,
)
translated_text = self.tokenizer.batch_decode(
outputs.sequences, skip_special_tokens=True
)
return translated_text
GREEK_TO_COPTIC = {
"α": "ⲁ",
"β": "ⲃ",
"γ": "ⲅ",
"δ": "ⲇ",
"ε": "ⲉ",
"ϛ": "ⲋ",
"ζ": "ⲍ",
"η": "ⲏ",
"θ": "ⲑ",
"ι": "ⲓ",
"κ": "ⲕ",
"λ": "ⲗ",
"μ": "ⲙ",
"ν": "ⲛ",
"ξ": "ⲝ",
"ο": "ⲟ",
"π": "ⲡ",
"ρ": "ⲣ",
"σ": "ⲥ",
"τ": "ⲧ",
"υ": "ⲩ",
"φ": "ⲫ",
"χ": "ⲭ",
"ψ": "ⲯ",
"ω": "ⲱ",
"s": "ϣ",
"f": "ϥ",
"k": "ϧ",
"h": "ϩ",
"j": "ϫ",
"c": "ϭ",
"t": "ϯ",
}
def degreekify(greek_text):
chars = []
for c in greek_text:
l_c = c.lower()
chars.append(GREEK_TO_COPTIC.get(l_c, l_c))
return "".join(chars)
|