File size: 3,371 Bytes
8c45b62
2b377c2
 
 
 
 
 
 
 
 
 
 
 
 
77eadec
 
dddf4f4
2b377c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import streamlit as st
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql import DataFrame
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

import re
import string
import os

os.environ["JAVA_HOME"] = "Java/jdk-1.8/bin"

# Load Spark session
spark = SparkSession.builder\
    .appName("HateSpeechDetection")\
    .master('local[*]')\
    .getOrCreate()

# Load the pre-trained model
loaded_model = PipelineModel.load('LogisticRegression')

# Define the TextTransformer class (as in your code)
class TextTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output", ):
        super(TextTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        self.set_params(**kwargs)


    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)


    def _transform(self, df: DataFrame):
        def preprocess_text(text, ) -> str:
            text = re.sub(r'\d+', '', str(text)).translate(str.maketrans( string.punctuation, ' '*len(string.punctuation)),).strip().lower()
            return text
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        # The custom action: concatenate the integer form of the doubles from the Vector
        transform_udf = udf(preprocess_text, StringType())
        new_df = df.withColumn(output_col, transform_udf(input_col))
        return new_df

# Create a Streamlit app
def main():
    st.title("Text Classification App")

    # User input text
    user_input = st.text_area("Enter text here:")

    if st.button("Predict"):
        if user_input:
            # Create a DataFrame with a single column 'free_text' containing the input text
            data = [(user_input,)]
            columns = ['free_text']
            input_df = spark.createDataFrame(data, columns)

            # Use the loaded model to make predictions
            predictions = loaded_model.transform(input_df)

            # Extract the prediction result
            result = predictions.select("prediction").collect()[0]["prediction"]

            # Map the prediction result to corresponding labels
            labels = {0: "CLEAN", 1: "OFFENSIVE", 2: "HATE"}
            predicted_class = labels.get(result, "UNKNOWN")

            # Display the result
            st.success(f"Predicted class: {predicted_class}")
        else:
            st.warning("Please enter some text.")

if __name__ == "__main__":
    main()