Spaces:
Runtime error
Runtime error
File size: 3,371 Bytes
8c45b62 2b377c2 77eadec dddf4f4 2b377c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
import streamlit as st
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import re
import string
import os
os.environ["JAVA_HOME"] = "Java/jdk-1.8/bin"
# Load Spark session
spark = SparkSession.builder\
.appName("HateSpeechDetection")\
.master('local[*]')\
.getOrCreate()
# Load the pre-trained model
loaded_model = PipelineModel.load('LogisticRegression')
# Define the TextTransformer class (as in your code)
class TextTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)
@keyword_only
def __init__(self, input_col: str = "input", output_col: str = "output", ):
super(TextTransformer, self).__init__()
self._setDefault(input_col=None, output_col=None)
kwargs = self._input_kwargs
self.set_params(**kwargs)
@keyword_only
def set_params(self, input_col: str = "input", output_col: str = "output"):
kwargs = self._input_kwargs
self._set(**kwargs)
def get_input_col(self):
return self.getOrDefault(self.input_col)
def get_output_col(self):
return self.getOrDefault(self.output_col)
def _transform(self, df: DataFrame):
def preprocess_text(text, ) -> str:
text = re.sub(r'\d+', '', str(text)).translate(str.maketrans( string.punctuation, ' '*len(string.punctuation)),).strip().lower()
return text
input_col = self.get_input_col()
output_col = self.get_output_col()
# The custom action: concatenate the integer form of the doubles from the Vector
transform_udf = udf(preprocess_text, StringType())
new_df = df.withColumn(output_col, transform_udf(input_col))
return new_df
# Create a Streamlit app
def main():
st.title("Text Classification App")
# User input text
user_input = st.text_area("Enter text here:")
if st.button("Predict"):
if user_input:
# Create a DataFrame with a single column 'free_text' containing the input text
data = [(user_input,)]
columns = ['free_text']
input_df = spark.createDataFrame(data, columns)
# Use the loaded model to make predictions
predictions = loaded_model.transform(input_df)
# Extract the prediction result
result = predictions.select("prediction").collect()[0]["prediction"]
# Map the prediction result to corresponding labels
labels = {0: "CLEAN", 1: "OFFENSIVE", 2: "HATE"}
predicted_class = labels.get(result, "UNKNOWN")
# Display the result
st.success(f"Predicted class: {predicted_class}")
else:
st.warning("Please enter some text.")
if __name__ == "__main__":
main() |