spark integration with udf seems to fail

#3
by pradeepbill - opened

hi there, please see below code snippets and error. any idea how to fix ?

Thanks
Pradeep

create sample data frame:

from pyspark.sql.types import StringType, DoubleType

x = '{ "name":"John", "age":30, "city":"Old York"}'
y = '{ "name":"John", "age":30, "city":"New York"}'
z = '{ "name":"John", "age":30, "city":" York"}'

df=spark.createDataFrame([x,y,z], StringType()).toDF("json_cols")
display(df)

create udf :

General Imports

import logging
import time
from datetime import date, timedelta
import os
os.environ["GIT_PYTHON_REFRESH"] = "quiet"
import numpy as np
import pandas as pd
import math
from typing import Iterator

Dimensionality

from pyspark.ml.feature import PCA
from pyspark.ml.functions import vector_to_array

Databricks and Spark Imports

import pyspark
import pyspark.sql.functions as F
import mlflow
import mlflow.pyfunc
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import *
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler, Normalizer, FeatureHasher, ElementwiseProduct
from dataclasses import asdict
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Summarizer

#imports for newton embeds
import datasets
import torch
from transformers import AutoModel, AutoTokenizer
from sentence_transformers import SentenceTransformer
from sentence_transformers.util import cos_sim
from transformers import LongformerTokenizer, LongformerModel
from transformers import AutoTokenizer, AutoModel
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import col

model = SentenceTransformer("nomic-ai/nomic-embed-text-v1-ablated", trust_remote_code=True)
broadcast_model = sc.broadcast(model)

@F.pandas_udf(returnType=ArrayType(DoubleType()))
def nomicEmbed(c:pd.Series)->pd.Series:
#model = SentenceTransformer("nomic-ai/nomic-embed-text-v1-ablated", trust_remote_code=True)
embeddings = broadcast_model.value.encode(c,batch_size=8)
return pd.Series(embeddings.tolist())

Apply the UDF and select the embeddings as a new column in the dataframe

run it:
embed_df = df.limit(100).withColumn("embeddings", nomicEmbed(col("json_cols")))

Display the result

display(embed_df)

error:

if I instantiate the model inside the UDF , it seems to work , but thats not scalable, the above code seems to fail with error if I instantiate model out side the UDF
File "", line 19, in nomicEmbed
File "/databricks/spark/python/pyspark/broadcast.py", line 288, in value
self._value = self.load_from_path(self._path)
File "/databricks/spark/python/pyspark/broadcast.py", line 235, in load_from_path
return self.load(f)
File "/databricks/spark/python/pyspark/broadcast.py", line 274, in load
gc.enable()
ModuleNotFoundError: No module named 'transformers_modules'

Nomic AI org

I haven't used spark in a while, but this seems like an issue with downloading the right packages. Are you sure transformers is installed?

hi Zach, I think I have the right packages because it works fine if I instantiate the model like this inside the UDF, but I can not do this for multiple records in the data frame, it runs out of memory real quick as it instantiates the model for each record

@F.pandas_udf(returnType=ArrayType(DoubleType()))
def nomicEmbed(c:pd.Series)->pd.Series:
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1-ablated", trust_remote_code=True)
embeddings = broadcast_model.value.encode(c,batch_size=8)
return pd.Series(embeddings.tolist())

if it helps, if I choose this model , it works fine
model = SentenceTransformer('thenlper/gte-large')

Nomic AI org

Can you post the full error log from above?

Sign up or log in to comment