### credit: https://github.com/dunky11/voicesmith
from typing import Callable, Dict, Tuple

import torch
import torch.nn.functional as F
from coqpit import Coqpit
from torch import nn

from TTS.tts.layers.delightful_tts.conformer import Conformer
from TTS.tts.layers.delightful_tts.encoders import (
    PhonemeLevelProsodyEncoder,
    UtteranceLevelProsodyEncoder,
    get_mask_from_lengths,
)
from TTS.tts.layers.delightful_tts.energy_adaptor import EnergyAdaptor
from TTS.tts.layers.delightful_tts.networks import EmbeddingPadded, positional_encoding
from TTS.tts.layers.delightful_tts.phoneme_prosody_predictor import PhonemeProsodyPredictor
from TTS.tts.layers.delightful_tts.pitch_adaptor import PitchAdaptor
from TTS.tts.layers.delightful_tts.variance_predictor import VariancePredictor
from TTS.tts.layers.generic.aligner import AlignmentNetwork
from TTS.tts.utils.helpers import generate_path, maximum_path, sequence_mask


class AcousticModel(torch.nn.Module):
    def __init__(
        self,
        args: "ModelArgs",
        tokenizer: "TTSTokenizer" = None,
        speaker_manager: "SpeakerManager" = None,
    ):
        super().__init__()
        self.args = args
        self.tokenizer = tokenizer
        self.speaker_manager = speaker_manager

        self.init_multispeaker(args)
        # self.set_embedding_dims()

        self.length_scale = (
            float(self.args.length_scale) if isinstance(self.args.length_scale, int) else self.args.length_scale
        )

        self.emb_dim = args.n_hidden_conformer_encoder
        self.encoder = Conformer(
            dim=self.args.n_hidden_conformer_encoder,
            n_layers=self.args.n_layers_conformer_encoder,
            n_heads=self.args.n_heads_conformer_encoder,
            speaker_embedding_dim=self.embedded_speaker_dim,
            p_dropout=self.args.dropout_conformer_encoder,
            kernel_size_conv_mod=self.args.kernel_size_conv_mod_conformer_encoder,
            lrelu_slope=self.args.lrelu_slope,
        )
        self.pitch_adaptor = PitchAdaptor(
            n_input=self.args.n_hidden_conformer_encoder,
            n_hidden=self.args.n_hidden_variance_adaptor,
            n_out=1,
            kernel_size=self.args.kernel_size_variance_adaptor,
            emb_kernel_size=self.args.emb_kernel_size_variance_adaptor,
            p_dropout=self.args.dropout_variance_adaptor,
            lrelu_slope=self.args.lrelu_slope,
        )
        self.energy_adaptor = EnergyAdaptor(
            channels_in=self.args.n_hidden_conformer_encoder,
            channels_hidden=self.args.n_hidden_variance_adaptor,
            channels_out=1,
            kernel_size=self.args.kernel_size_variance_adaptor,
            emb_kernel_size=self.args.emb_kernel_size_variance_adaptor,
            dropout=self.args.dropout_variance_adaptor,
            lrelu_slope=self.args.lrelu_slope,
        )

        self.aligner = AlignmentNetwork(
            in_query_channels=self.args.out_channels,
            in_key_channels=self.args.n_hidden_conformer_encoder,
        )

        self.duration_predictor = VariancePredictor(
            channels_in=self.args.n_hidden_conformer_encoder,
            channels=self.args.n_hidden_variance_adaptor,
            channels_out=1,
            kernel_size=self.args.kernel_size_variance_adaptor,
            p_dropout=self.args.dropout_variance_adaptor,
            lrelu_slope=self.args.lrelu_slope,
        )

        self.utterance_prosody_encoder = UtteranceLevelProsodyEncoder(
            num_mels=self.args.num_mels,
            ref_enc_filters=self.args.ref_enc_filters_reference_encoder,
            ref_enc_size=self.args.ref_enc_size_reference_encoder,
            ref_enc_gru_size=self.args.ref_enc_gru_size_reference_encoder,
            ref_enc_strides=self.args.ref_enc_strides_reference_encoder,
            n_hidden=self.args.n_hidden_conformer_encoder,
            dropout=self.args.dropout_conformer_encoder,
            bottleneck_size_u=self.args.bottleneck_size_u_reference_encoder,
            token_num=self.args.token_num_reference_encoder,
        )

        self.utterance_prosody_predictor = PhonemeProsodyPredictor(
            hidden_size=self.args.n_hidden_conformer_encoder,
            kernel_size=self.args.predictor_kernel_size_reference_encoder,
            dropout=self.args.dropout_conformer_encoder,
            bottleneck_size=self.args.bottleneck_size_u_reference_encoder,
            lrelu_slope=self.args.lrelu_slope,
        )

        self.phoneme_prosody_encoder = PhonemeLevelProsodyEncoder(
            num_mels=self.args.num_mels,
            ref_enc_filters=self.args.ref_enc_filters_reference_encoder,
            ref_enc_size=self.args.ref_enc_size_reference_encoder,
            ref_enc_gru_size=self.args.ref_enc_gru_size_reference_encoder,
            ref_enc_strides=self.args.ref_enc_strides_reference_encoder,
            n_hidden=self.args.n_hidden_conformer_encoder,
            dropout=self.args.dropout_conformer_encoder,
            bottleneck_size_p=self.args.bottleneck_size_p_reference_encoder,
            n_heads=self.args.n_heads_conformer_encoder,
        )

        self.phoneme_prosody_predictor = PhonemeProsodyPredictor(
            hidden_size=self.args.n_hidden_conformer_encoder,
            kernel_size=self.args.predictor_kernel_size_reference_encoder,
            dropout=self.args.dropout_conformer_encoder,
            bottleneck_size=self.args.bottleneck_size_p_reference_encoder,
            lrelu_slope=self.args.lrelu_slope,
        )

        self.u_bottle_out = nn.Linear(
            self.args.bottleneck_size_u_reference_encoder,
            self.args.n_hidden_conformer_encoder,
        )

        self.u_norm = nn.InstanceNorm1d(self.args.bottleneck_size_u_reference_encoder)
        self.p_bottle_out = nn.Linear(
            self.args.bottleneck_size_p_reference_encoder,
            self.args.n_hidden_conformer_encoder,
        )
        self.p_norm = nn.InstanceNorm1d(
            self.args.bottleneck_size_p_reference_encoder,
        )
        self.decoder = Conformer(
            dim=self.args.n_hidden_conformer_decoder,
            n_layers=self.args.n_layers_conformer_decoder,
            n_heads=self.args.n_heads_conformer_decoder,
            speaker_embedding_dim=self.embedded_speaker_dim,
            p_dropout=self.args.dropout_conformer_decoder,
            kernel_size_conv_mod=self.args.kernel_size_conv_mod_conformer_decoder,
            lrelu_slope=self.args.lrelu_slope,
        )

        padding_idx = self.tokenizer.characters.pad_id
        self.src_word_emb = EmbeddingPadded(
            self.args.num_chars, self.args.n_hidden_conformer_encoder, padding_idx=padding_idx
        )
        self.to_mel = nn.Linear(
            self.args.n_hidden_conformer_decoder,
            self.args.num_mels,
        )

        self.energy_scaler = torch.nn.BatchNorm1d(1, affine=False, track_running_stats=True, momentum=None)
        self.energy_scaler.requires_grad_(False)

    def init_multispeaker(self, args: Coqpit):  # pylint: disable=unused-argument
        """Init for multi-speaker training."""
        self.embedded_speaker_dim = 0
        self.num_speakers = self.args.num_speakers
        self.audio_transform = None

        if self.speaker_manager:
            self.num_speakers = self.speaker_manager.num_speakers

        if self.args.use_speaker_embedding:
            self._init_speaker_embedding()

        if self.args.use_d_vector_file:
            self._init_d_vector()

    @staticmethod
    def _set_cond_input(aux_input: Dict):
        """Set the speaker conditioning input based on the multi-speaker mode."""
        sid, g, lid, durations = None, None, None, None
        if "speaker_ids" in aux_input and aux_input["speaker_ids"] is not None:
            sid = aux_input["speaker_ids"]
            if sid.ndim == 0:
                sid = sid.unsqueeze_(0)
        if "d_vectors" in aux_input and aux_input["d_vectors"] is not None:
            g = F.normalize(aux_input["d_vectors"])  # .unsqueeze_(-1)
            if g.ndim == 2:
                g = g  #  .unsqueeze_(0) # pylint: disable=self-assigning-variable

        if "durations" in aux_input and aux_input["durations"] is not None:
            durations = aux_input["durations"]

        return sid, g, lid, durations

    def get_aux_input(self, aux_input: Dict):
        sid, g, lid, _ = self._set_cond_input(aux_input)
        return {"speaker_ids": sid, "style_wav": None, "d_vectors": g, "language_ids": lid}

    def _set_speaker_input(self, aux_input: Dict):
        d_vectors = aux_input.get("d_vectors", None)
        speaker_ids = aux_input.get("speaker_ids", None)

        if d_vectors is not None and speaker_ids is not None:
            raise ValueError("[!] Cannot use d-vectors and speaker-ids together.")

        if speaker_ids is not None and not hasattr(self, "emb_g"):
            raise ValueError("[!] Cannot use speaker-ids without enabling speaker embedding.")

        g = speaker_ids if speaker_ids is not None else d_vectors
        return g

    # def set_embedding_dims(self):
    #     if self.embedded_speaker_dim > 0:
    #         self.embedding_dims = self.embedded_speaker_dim
    #     else:
    #         self.embedding_dims = 0

    def _init_speaker_embedding(self):
        # pylint: disable=attribute-defined-outside-init
        if self.num_speakers > 0:
            print(" > initialization of speaker-embedding layers.")
            self.embedded_speaker_dim = self.args.speaker_embedding_channels
            self.emb_g = nn.Embedding(self.num_speakers, self.embedded_speaker_dim)

    def _init_d_vector(self):
        # pylint: disable=attribute-defined-outside-init
        if hasattr(self, "emb_g"):
            raise ValueError("[!] Speaker embedding layer already initialized before d_vector settings.")
        self.embedded_speaker_dim = self.args.d_vector_dim

    @staticmethod
    def generate_attn(dr, x_mask, y_mask=None):
        """Generate an attention mask from the linear scale durations.

        Args:
            dr (Tensor): Linear scale durations.
            x_mask (Tensor): Mask for the input (character) sequence.
            y_mask (Tensor): Mask for the output (spectrogram) sequence. Compute it from the predicted durations
                if None. Defaults to None.

        Shapes
           - dr: :math:`(B, T_{en})`
           - x_mask: :math:`(B, T_{en})`
           - y_mask: :math:`(B, T_{de})`
        """
        # compute decode mask from the durations
        if y_mask is None:
            y_lengths = dr.sum(1).long()
            y_lengths[y_lengths < 1] = 1
            y_mask = torch.unsqueeze(sequence_mask(y_lengths, None), 1).to(dr.dtype)
        attn_mask = torch.unsqueeze(x_mask, -1) * torch.unsqueeze(y_mask, 2)
        attn = generate_path(dr, attn_mask.squeeze(1)).to(dr.dtype)
        return attn

    def _expand_encoder_with_durations(
        self,
        o_en: torch.FloatTensor,
        dr: torch.IntTensor,
        x_mask: torch.IntTensor,
        y_lengths: torch.IntTensor,
    ):
        y_mask = torch.unsqueeze(sequence_mask(y_lengths, None), 1).to(o_en.dtype)
        attn = self.generate_attn(dr, x_mask, y_mask)
        o_en_ex = torch.einsum("kmn, kjm -> kjn", [attn.float(), o_en])
        return y_mask, o_en_ex, attn.transpose(1, 2)

    def _forward_aligner(
        self,
        x: torch.FloatTensor,
        y: torch.FloatTensor,
        x_mask: torch.IntTensor,
        y_mask: torch.IntTensor,
        attn_priors: torch.FloatTensor,
    ) -> Tuple[torch.IntTensor, torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:
        """Aligner forward pass.

        1. Compute a mask to apply to the attention map.
        2. Run the alignment network.
        3. Apply MAS to compute the hard alignment map.
        4. Compute the durations from the hard alignment map.

        Args:
            x (torch.FloatTensor): Input sequence.
            y (torch.FloatTensor): Output sequence.
            x_mask (torch.IntTensor): Input sequence mask.
            y_mask (torch.IntTensor): Output sequence mask.
            attn_priors (torch.FloatTensor): Prior for the aligner network map.

        Returns:
            Tuple[torch.IntTensor, torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:
                Durations from the hard alignment map, soft alignment potentials, log scale alignment potentials,
                hard alignment map.

        Shapes:
            - x: :math:`[B, T_en, C_en]`
            - y: :math:`[B, T_de, C_de]`
            - x_mask: :math:`[B, 1, T_en]`
            - y_mask: :math:`[B, 1, T_de]`
            - attn_priors: :math:`[B, T_de, T_en]`

            - aligner_durations: :math:`[B, T_en]`
            - aligner_soft: :math:`[B, T_de, T_en]`
            - aligner_logprob: :math:`[B, 1, T_de, T_en]`
            - aligner_mas: :math:`[B, T_de, T_en]`
        """
        attn_mask = torch.unsqueeze(x_mask, -1) * torch.unsqueeze(y_mask, 2)  # [B, 1, T_en, T_de]
        aligner_soft, aligner_logprob = self.aligner(y.transpose(1, 2), x.transpose(1, 2), x_mask, attn_priors)
        aligner_mas = maximum_path(
            aligner_soft.squeeze(1).transpose(1, 2).contiguous(), attn_mask.squeeze(1).contiguous()
        )
        aligner_durations = torch.sum(aligner_mas, -1).int()
        aligner_soft = aligner_soft.squeeze(1)  # [B, T_max2, T_max]
        aligner_mas = aligner_mas.transpose(1, 2)  # [B, T_max, T_max2] -> [B, T_max2, T_max]
        return aligner_durations, aligner_soft, aligner_logprob, aligner_mas

    def average_utterance_prosody(  # pylint: disable=no-self-use
        self, u_prosody_pred: torch.Tensor, src_mask: torch.Tensor
    ) -> torch.Tensor:
        lengths = ((~src_mask) * 1.0).sum(1)
        u_prosody_pred = u_prosody_pred.sum(1, keepdim=True) / lengths.view(-1, 1, 1)
        return u_prosody_pred

    def forward(
        self,
        tokens: torch.Tensor,
        src_lens: torch.Tensor,
        mels: torch.Tensor,
        mel_lens: torch.Tensor,
        pitches: torch.Tensor,
        energies: torch.Tensor,
        attn_priors: torch.Tensor,
        use_ground_truth: bool = True,
        d_vectors: torch.Tensor = None,
        speaker_idx: torch.Tensor = None,
    ) -> Dict[str, torch.Tensor]:
        sid, g, lid, _ = self._set_cond_input(  # pylint: disable=unused-variable
            {"d_vectors": d_vectors, "speaker_ids": speaker_idx}
        )  # pylint: disable=unused-variable

        src_mask = get_mask_from_lengths(src_lens)  # [B, T_src]
        mel_mask = get_mask_from_lengths(mel_lens)  # [B, T_mel]

        # Token embeddings
        token_embeddings = self.src_word_emb(tokens)  # [B, T_src, C_hidden]
        token_embeddings = token_embeddings.masked_fill(src_mask.unsqueeze(-1), 0.0)

        # Alignment network and durations
        aligner_durations, aligner_soft, aligner_logprob, aligner_mas = self._forward_aligner(
            x=token_embeddings,
            y=mels.transpose(1, 2),
            x_mask=~src_mask[:, None],
            y_mask=~mel_mask[:, None],
            attn_priors=attn_priors,
        )
        dr = aligner_durations  # [B, T_en]

        # Embeddings
        speaker_embedding = None
        if d_vectors is not None:
            speaker_embedding = g
        elif speaker_idx is not None:
            speaker_embedding = F.normalize(self.emb_g(sid))

        pos_encoding = positional_encoding(
            self.emb_dim,
            max(token_embeddings.shape[1], max(mel_lens)),
            device=token_embeddings.device,
        )
        encoder_outputs = self.encoder(
            token_embeddings,
            src_mask,
            speaker_embedding=speaker_embedding,
            encoding=pos_encoding,
        )

        u_prosody_ref = self.u_norm(self.utterance_prosody_encoder(mels=mels, mel_lens=mel_lens))
        u_prosody_pred = self.u_norm(
            self.average_utterance_prosody(
                u_prosody_pred=self.utterance_prosody_predictor(x=encoder_outputs, mask=src_mask),
                src_mask=src_mask,
            )
        )

        if use_ground_truth:
            encoder_outputs = encoder_outputs + self.u_bottle_out(u_prosody_ref)
        else:
            encoder_outputs = encoder_outputs + self.u_bottle_out(u_prosody_pred)

        p_prosody_ref = self.p_norm(
            self.phoneme_prosody_encoder(
                x=encoder_outputs, src_mask=src_mask, mels=mels, mel_lens=mel_lens, encoding=pos_encoding
            )
        )
        p_prosody_pred = self.p_norm(self.phoneme_prosody_predictor(x=encoder_outputs, mask=src_mask))

        if use_ground_truth:
            encoder_outputs = encoder_outputs + self.p_bottle_out(p_prosody_ref)
        else:
            encoder_outputs = encoder_outputs + self.p_bottle_out(p_prosody_pred)

        encoder_outputs_res = encoder_outputs

        pitch_pred, avg_pitch_target, pitch_emb = self.pitch_adaptor.get_pitch_embedding_train(
            x=encoder_outputs,
            target=pitches,
            dr=dr,
            mask=src_mask,
        )

        energy_pred, avg_energy_target, energy_emb = self.energy_adaptor.get_energy_embedding_train(
            x=encoder_outputs,
            target=energies,
            dr=dr,
            mask=src_mask,
        )

        encoder_outputs = encoder_outputs.transpose(1, 2) + pitch_emb + energy_emb
        log_duration_prediction = self.duration_predictor(x=encoder_outputs_res.detach(), mask=src_mask)

        mel_pred_mask, encoder_outputs_ex, alignments = self._expand_encoder_with_durations(
            o_en=encoder_outputs, y_lengths=mel_lens, dr=dr, x_mask=~src_mask[:, None]
        )

        x = self.decoder(
            encoder_outputs_ex.transpose(1, 2),
            mel_mask,
            speaker_embedding=speaker_embedding,
            encoding=pos_encoding,
        )
        x = self.to_mel(x)

        dr = torch.log(dr + 1)

        dr_pred = torch.exp(log_duration_prediction) - 1
        alignments_dp = self.generate_attn(dr_pred, src_mask.unsqueeze(1), mel_pred_mask)  # [B, T_max, T_max2']

        return {
            "model_outputs": x,
            "pitch_pred": pitch_pred,
            "pitch_target": avg_pitch_target,
            "energy_pred": energy_pred,
            "energy_target": avg_energy_target,
            "u_prosody_pred": u_prosody_pred,
            "u_prosody_ref": u_prosody_ref,
            "p_prosody_pred": p_prosody_pred,
            "p_prosody_ref": p_prosody_ref,
            "alignments_dp": alignments_dp,
            "alignments": alignments,  # [B, T_de, T_en]
            "aligner_soft": aligner_soft,
            "aligner_mas": aligner_mas,
            "aligner_durations": aligner_durations,
            "aligner_logprob": aligner_logprob,
            "dr_log_pred": log_duration_prediction.squeeze(1),  # [B, T]
            "dr_log_target": dr.squeeze(1),  # [B, T]
            "spk_emb": speaker_embedding,
        }

    @torch.no_grad()
    def inference(
        self,
        tokens: torch.Tensor,
        speaker_idx: torch.Tensor,
        p_control: float = None,  # TODO # pylint: disable=unused-argument
        d_control: float = None,  # TODO # pylint: disable=unused-argument
        d_vectors: torch.Tensor = None,
        pitch_transform: Callable = None,
        energy_transform: Callable = None,
    ) -> torch.Tensor:
        src_mask = get_mask_from_lengths(torch.tensor([tokens.shape[1]], dtype=torch.int64, device=tokens.device))
        src_lens = torch.tensor(tokens.shape[1:2]).to(tokens.device)  # pylint: disable=unused-variable
        sid, g, lid, _ = self._set_cond_input(  # pylint: disable=unused-variable
            {"d_vectors": d_vectors, "speaker_ids": speaker_idx}
        )  # pylint: disable=unused-variable

        token_embeddings = self.src_word_emb(tokens)
        token_embeddings = token_embeddings.masked_fill(src_mask.unsqueeze(-1), 0.0)

        # Embeddings
        speaker_embedding = None
        if d_vectors is not None:
            speaker_embedding = g
        elif speaker_idx is not None:
            speaker_embedding = F.normalize(self.emb_g(sid))

        pos_encoding = positional_encoding(
            self.emb_dim,
            token_embeddings.shape[1],
            device=token_embeddings.device,
        )
        encoder_outputs = self.encoder(
            token_embeddings,
            src_mask,
            speaker_embedding=speaker_embedding,
            encoding=pos_encoding,
        )

        u_prosody_pred = self.u_norm(
            self.average_utterance_prosody(
                u_prosody_pred=self.utterance_prosody_predictor(x=encoder_outputs, mask=src_mask),
                src_mask=src_mask,
            )
        )
        encoder_outputs = encoder_outputs + self.u_bottle_out(u_prosody_pred).expand_as(encoder_outputs)

        p_prosody_pred = self.p_norm(
            self.phoneme_prosody_predictor(
                x=encoder_outputs,
                mask=src_mask,
            )
        )
        encoder_outputs = encoder_outputs + self.p_bottle_out(p_prosody_pred).expand_as(encoder_outputs)

        encoder_outputs_res = encoder_outputs

        pitch_emb_pred, pitch_pred = self.pitch_adaptor.get_pitch_embedding(
            x=encoder_outputs,
            mask=src_mask,
            pitch_transform=pitch_transform,
            pitch_mean=self.pitch_mean if hasattr(self, "pitch_mean") else None,
            pitch_std=self.pitch_std if hasattr(self, "pitch_std") else None,
        )

        energy_emb_pred, energy_pred = self.energy_adaptor.get_energy_embedding(
            x=encoder_outputs, mask=src_mask, energy_transform=energy_transform
        )
        encoder_outputs = encoder_outputs.transpose(1, 2) + pitch_emb_pred + energy_emb_pred

        log_duration_pred = self.duration_predictor(
            x=encoder_outputs_res.detach(), mask=src_mask
        )  # [B, C_hidden, T_src] -> [B, T_src]
        duration_pred = (torch.exp(log_duration_pred) - 1) * (~src_mask) * self.length_scale  # -> [B, T_src]
        duration_pred[duration_pred < 1] = 1.0  # -> [B, T_src]
        duration_pred = torch.round(duration_pred)  # -> [B, T_src]
        mel_lens = duration_pred.sum(1)  # -> [B,]

        _, encoder_outputs_ex, alignments = self._expand_encoder_with_durations(
            o_en=encoder_outputs, y_lengths=mel_lens, dr=duration_pred.squeeze(1), x_mask=~src_mask[:, None]
        )

        mel_mask = get_mask_from_lengths(
            torch.tensor([encoder_outputs_ex.shape[2]], dtype=torch.int64, device=encoder_outputs_ex.device)
        )

        if encoder_outputs_ex.shape[1] > pos_encoding.shape[1]:
            encoding = positional_encoding(self.emb_dim, encoder_outputs_ex.shape[2], device=tokens.device)

        # [B, C_hidden, T_src], [B, 1, T_src], [B, C_emb], [B, T_src, C_hidden] -> [B, C_hidden, T_src]
        x = self.decoder(
            encoder_outputs_ex.transpose(1, 2),
            mel_mask,
            speaker_embedding=speaker_embedding,
            encoding=encoding,
        )
        x = self.to_mel(x)
        outputs = {
            "model_outputs": x,
            "alignments": alignments,
            # "pitch": pitch_emb_pred,
            "durations": duration_pred,
            "pitch": pitch_pred,
            "energy": energy_pred,
            "spk_emb": speaker_embedding,
        }
        return outputs