defmodule Medicode.ClassificationServer do @moduledoc """ GenServer responsible for classifying transcription text """ use GenServer alias AudioTagger.KeywordFinder alias Medicode.Coding alias Medicode.Coding.CodeVectorMatch alias Medicode.Transcriptions @registry :transcription_registry def start_link(%{chunk: chunk, name: name}) do GenServer.start_link(__MODULE__, {:chunk, chunk}, name: via_tuple(name)) end @doc """ This function will be called by the supervisor to retrieve the specification of the child process.The child process is configured to restart only if it terminates abnormally. """ def child_spec(process_name) do %{ id: __MODULE__, start: {__MODULE__, :start_link, [process_name]}, restart: :transient } end @impl GenServer def init({:chunk, _chunk} = init_arg) do {:ok, init_arg, {:continue, :start}} end @impl GenServer def handle_continue(:start, {:chunk, chunk} = state) do Phoenix.PubSub.broadcast( :medicode_pubsub, "transcriptions:#{chunk.transcription_id}", {:classification_started, chunk.id} ) classify_chunk(chunk) find_keywords(chunk) {:stop, :normal, state} end @impl GenServer def terminate(reason, {:chunk, chunk} = _state) do Phoenix.PubSub.broadcast( :medicode_pubsub, "transcriptions:#{chunk.transcription_id}", {:classification_finished, chunk.id} ) reason end defp via_tuple(name), do: {:via, Registry, {@registry, name}} defp classify_chunk(chunk) do attrs = chunk |> Coding.process_chunk() |> Enum.map(fn %CodeVectorMatch{ code: code, cosine_similarity: cosine_similarity, weighting: weighting } -> code_vector = Coding.get_code_vector_by_code!(code) weighting_as_string = Enum.map(weighting, &Atom.to_string/1) %{ transcription_chunk_id: chunk.id, code_vector_id: code_vector.id, cosine_similarity: cosine_similarity, weighting: weighting_as_string, inserted_at: {:placeholder, :timestamp}, updated_at: {:placeholder, :timestamp} } end) Transcriptions.replace_all_code_vectors_for_chunk(chunk, attrs) end defp find_keywords(chunk) do %{entities: entities} = Nx.Serving.batched_run(Medicode.TokenClassificationServing, chunk.text) phrases = KeywordFinder.cleanup_phrases(entities) # Then, we use one of two processes to determine which to show as keywords chunk.text |> determine_keywords(phrases) |> Enum.map(fn %{label: label, score: score} -> # TODO: Replace loop with an insert_all call and check for conflicts # so that duplicate keywords are ignored. Transcriptions.create_keyword_for_chunk(%{ transcription_chunk_id: chunk.id, keyword: label, score: score }) end) end # This clause handles cases where there is transcribed text, but no phrases were found. defp determine_keywords(_text, []), do: [] defp determine_keywords(text, phrases) do # 1. A slower process that looks to classify the text by the extracted phrases. # serving = KeywordFinder.prepare_zero_shot_classification_serving(phrases) # %{predictions: predictions} = Nx.Serving.run(serving, text) # 2. A fast process finding the phrase closest in vector space to the whole text. KeywordFinder.find_most_similar_label(text, phrases, 2) end end