对于以下代码,spark2.2.0运行正常:
import json
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType
from pyspark.sql.functions import from_json
def func(value, score):
values = {}
for i in range(len(value)):
if value[i] in values:
values[value[i]] = values[value[i]] + score[i]
else:
values[value[i]] = score[i]
res = []
for k, v in values.items():
res.append({'value': k, 'score': v})
return json.dumps(res, ensure_ascii=False)
x = [{'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'xxx', 'score' : 0.8, 'ts' : '1508737410941'}, {'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'yyy', 'score' : 0.9, 'ts' : '1508737410941'}, {'user' : '86209203000685', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'zzz', 'score' : 0.8, 'ts' : '1508717416320'}]
df = spark.createDataFrame(x)
df = df.groupBy(df['user'], df['domain'], df['subdomain']).agg(f.collect_list(df['value']).alias('value'), f.collect_list(df['score']).alias('score'))
df = df.select(df['user'], df['domain'], df['subdomain'], f.UserDefinedFunction(func, StringType())(df['value'], df['score']).alias('values'))
df.collect()
schema = ArrayType(StructType([StructField('value', StringType()), StructField('score', DoubleType())]))
df = df.select(df['user'], df['domain'], df['subdomain'], from_json(df['values'], schema).alias('values'))
df.collect()
但是spark2.1.0运行报错:java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
这个问题比较坑,2.1.0不支持ArrayType。
没有评论:
发表评论