2017年10月27日星期五

spark 2.1.0 from_json使用中的问题

对于以下代码,spark2.2.0运行正常:
import json
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType
from pyspark.sql.functions import from_json

def func(value, score):
  values = {}
  for i in range(len(value)):
    if value[i] in values:
      values[value[i]] = values[value[i]] + score[i]
    else:
      values[value[i]] = score[i]
  res = []
  for k, v in values.items():
    res.append({'value': k, 'score': v})
  return json.dumps(res, ensure_ascii=False)

x = [{'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'xxx', 'score' : 0.8, 'ts' : '1508737410941'}, {'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'yyy', 'score' : 0.9, 'ts' : '1508737410941'}, {'user' : '86209203000685', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'zzz', 'score' : 0.8, 'ts' : '1508717416320'}]
df = spark.createDataFrame(x)
df = df.groupBy(df['user'], df['domain'], df['subdomain']).agg(f.collect_list(df['value']).alias('value'), f.collect_list(df['score']).alias('score'))
df = df.select(df['user'], df['domain'], df['subdomain'], f.UserDefinedFunction(func, StringType())(df['value'], df['score']).alias('values'))
df.collect()
schema = ArrayType(StructType([StructField('value', StringType()), StructField('score', DoubleType())]))
df = df.select(df['user'], df['domain'], df['subdomain'], from_json(df['values'], schema).alias('values'))
df.collect()

但是spark2.1.0运行报错:java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType

这个问题比较坑,2.1.0不支持ArrayType。

没有评论:

发表评论