File size: 3,228 Bytes
b78ad07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7381c93
 
 
 
 
 
b78ad07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7381c93
b78ad07
 
 
 
 
 
 
 
 
 
 
7381c93
b78ad07
 
7381c93
 
 
b78ad07
 
 
 
 
 
7381c93
 
 
 
 
 
 
b78ad07
 
 
7381c93
b78ad07
 
 
 
 
 
 
 
7381c93
 
 
b78ad07
 
7381c93
b78ad07
 
7381c93
b78ad07
 
 
 
7381c93
b78ad07
7381c93
b78ad07
7381c93
b78ad07
7381c93
 
b78ad07
 
 
 
 
 
 
 
 
 
7381c93
b78ad07
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Generate a response."""
# pylint:disable=line-too-long, too-many-argument
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
from logzero import logger

# model_name = "microsoft/DialoGPT-large"
# model_name = "microsoft/DialoGPT-small"
# pylint: disable=invalid-name
model_name = "microsoft/DialoGPT-medium"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)


def _convbot(
    text: str,
    max_length: int = 1000,
    do_sample: bool = True,
    top_p: float = 0.95,
    top_k: int = 0,
    temperature: float = 0.75,
) -> str:
    """Generate a reponse.

    Args
        n_retires: retry if response is  "" or the same as previouse resp.

    Returns
        reply
    """
    try:
        chat_history_ids = _convbot.chat_history_ids
    except AttributeError:
        chat_history_ids = ""

    try:
        chat_history_ids = _convbot.chat_history_ids
    except AttributeError:
        chat_history_ids = ""

    input_ids = tokenizer.encode(text + tokenizer.eos_token, return_tensors="pt")
    if chat_history_ids:
        bot_input_ids = torch.cat([chat_history_ids, input_ids], dim=-1)
    else:
        bot_input_ids = input_ids

    # generate a bot response
    chat_history_ids = model.generate(
        bot_input_ids,
        max_length=max_length,
        do_sample=do_sample,
        top_p=top_p,
        top_k=top_k,
        temperature=temperature,
        pad_token_id=tokenizer.eos_token_id,
    )

    output = tokenizer.decode(
        chat_history_ids[:, bot_input_ids.shape[-1]:][0], skip_special_tokens=True
    )
    _convbot.chat_history_ids = chat_history_ids

    return output


def convbot(
    text: str,
    n_retries: int = 3,
    max_length: int = 1000,
    do_sample: bool = True,
    top_p: float = 0.95,
    top_k: int = 0,
    temperature: float = 0.75,
) -> str:
    """Generate a response."""
    try:
        n_retries = int(n_retries)
    except Exception as e:
        logger.error(e)
        raise
    try:
        prev_resp = convbot.prev_resp
    except AttributeError:
        prev_resp = ""

    resp = _convbot(text, max_length, do_sample, top_p, top_k, temperature)

    # retry n_retries if resp is empty
    if not resp.strip():
        idx = 0
        while idx < n_retries:
            idx += 1
            _convbot.chat_history_ids = ""
            resp = _convbot(text, max_length, do_sample, top_p, top_k, temperature)
            if resp.strip():
                break
        else:
            logger.warning("bot acting up (empty response), something has gone awry")

    # check repeated responses
    if resp.strip() == prev_resp:
        idx = 0
        while idx < n_retries:
            idx += 1
            resp = _convbot(text, max_length, do_sample, top_p, top_k, temperature)
            if resp.strip() != prev_resp:
                break
        else:
            logger.warning("bot acting up (repeating), something has gone awry")

    convbot.prev_resp = resp

    return resp


def main():
    print("Bot: Talk to me")
    while 1:
        text = input("You: ")
        resp = _convbot(text)
        print("Bot: ", resp)


if __name__ == "__main__":
    main()