pyspark - 从 PySpark 读取 Elasticsearch 索引

谁能告诉我为什么这个 PySpark 的测试脚本出错了? (python 3.6.8,hadoop 3.3.1,火花 3.2.1,elasticsearch-hadoop 7.14)

from pyspark.sql import SparkSession, SQLContext

myspark = SparkSession.builder \
  .appName("My test.") \
  .master("spark://xx.xx.xx:7077") \
  .config("es.nodes", "xx.xx.xx.xx") \
  .config("es.port", "9200") \
  .config("es.net.http.auth.user", "xxxx") \
  .config("es.net.http.auth.pass", "xxxx") \
  .getOrCreate()

mycontext = SQLContext(myspark)
myquery = '{ "query": { "match_all": {} }}'

myreader = mycontext.read.format("org.elasticsearch.spark.sql") \
  .option("es.nodes", "xx.xx.xx.xx") \
  .option("es.port", "9200") \
  .option("es.net.http.auth.user", "xxxx") \
  .option("es.net.http.auth.pass", "xxxx") \
  .option("es.query", myquery)

myframe = myreader.load("myindex")

我在 .load() 上遇到的错误是:

py4j.protocol.Py4JJavaError: An error occurred while calling 039.load.
: java.lang.NoClassDefFoundError: scala/Product$class
     at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
     at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
     ...

我还有一个使用较旧的 SparkConf()SparkContext().newAPIHadoopRDD() 的测试片段,它可以很好地连接到相同的 spark 主服务器和弹性集群。这样就排除了我的类路径或防火墙或身份验证的许多潜在问题。

回答1

为了使用 spark 3.2.1,您需要 8.2.0 的 elasticsearch-hadoop 版本。

https://www.elastic.co/guide/en/elasticsearch/hadoop/8.2/eshadoop-8.2.0.html

相似文章