2017年10月31日星期二
spark合并小文件
spark使用FileUtil.copyMerge来进行小文件合并:https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html
pyspark中dataframe union的一个问题
>>> a = [{'a': 1, 'b': 2}]
>>> x = spark.createDataFrame(a)
>>> b = sc.parallelize([(3, 4)])
>>> y = spark.createDataFrame(b, ['b', 'a'])
>>> x.collect()
[Row(a=1, b=2)]
>>> y.collect()
[Row(b=3, a=4)]
>>> z = x.union(y)
>>> z.collect()
[Row(a=1, b=2), Row(a=3, b=4)]
正确的结果应该是[Row(a=1, b=2), Row(a=4, b=3)],但实际输出的结果第二个Row的a和b反了。猜测DataFrame的union是按照顺序来的,并不是按照column的名称对应的。
Also as standard in SQL, this function resolves columns by position (not by name). Spark 2.3提供了unionByName可以解决问题,目前解决办法是把x与y的字段名排序要一样才行。
>>> x = spark.createDataFrame(a)
>>> b = sc.parallelize([(3, 4)])
>>> y = spark.createDataFrame(b, ['b', 'a'])
>>> x.collect()
[Row(a=1, b=2)]
>>> y.collect()
[Row(b=3, a=4)]
>>> z = x.union(y)
>>> z.collect()
[Row(a=1, b=2), Row(a=3, b=4)]
正确的结果应该是[Row(a=1, b=2), Row(a=4, b=3)],但实际输出的结果第二个Row的a和b反了。猜测DataFrame的union是按照顺序来的,并不是按照column的名称对应的。
Also as standard in SQL, this function resolves columns by position (not by name). Spark 2.3提供了unionByName可以解决问题,目前解决办法是把x与y的字段名排序要一样才行。
2017年10月30日星期一
spark读写mongodb的一个问题
从mongodb的某个collection中读取了df,做了一些操作后又overwrite写回该collection会有问题。因为在写的时候才action,猜测可能因为分布式的同时读写造成的问题。
问题确认:
将df cache,在回写之前先做一次action,让结果缓存到内存,然后再写mongo没有问题。
解决:
从一个collection读,写到另一个 collection
问题确认:
将df cache,在回写之前先做一次action,让结果缓存到内存,然后再写mongo没有问题。
解决:
从一个collection读,写到另一个 collection
mongodb启动问题
在配置中修改了dbPath,无法启动,需要将修改的dbPath的用户和组设置为mongodb:mongodb即可。
还有可能在日志里面发现socket permission的问题,删除/tmp/mongodb-27017.sock后再sudo service mongod start
还有可能在日志里面发现socket permission的问题,删除/tmp/mongodb-27017.sock后再sudo service mongod start
2017年10月29日星期日
npm install 速度慢
换为国内镜像:
npm install --registry=http://registry.npm.taobao.org
永久设置:
npm config set registry http://registry.npm.taobao.org
npm install --registry=http://registry.npm.taobao.org
永久设置:
npm config set registry http://registry.npm.taobao.org
ubuntu下E: Unable to locate package问题
刚安装了ubuntu 16.04系统,sudo apt install tmux时候遇到E: Unable to locate package tmux的问题,解决:
sudo apt-get update
sudo apt-get update
2017年10月27日星期五
tensorflow serving 运算特别慢
之前从源码编译的方式安装了tensorflow serving,但是部署到线上发现特别吃cpu,并且速度很慢。根据https://github.com/tensorflow/serving/issues/456,猜测应该是编译选项设置的问题。
后来换成了通过apt-get安装方式的二进制文件,问题解决。
后来换成了通过apt-get安装方式的二进制文件,问题解决。
spark 2.1.0 from_json使用中的问题
对于以下代码,spark2.2.0运行正常:
import json
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType
from pyspark.sql.functions import from_json
def func(value, score):
values = {}
for i in range(len(value)):
if value[i] in values:
values[value[i]] = values[value[i]] + score[i]
else:
values[value[i]] = score[i]
res = []
for k, v in values.items():
res.append({'value': k, 'score': v})
return json.dumps(res, ensure_ascii=False)
x = [{'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'xxx', 'score' : 0.8, 'ts' : '1508737410941'}, {'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'yyy', 'score' : 0.9, 'ts' : '1508737410941'}, {'user' : '86209203000685', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'zzz', 'score' : 0.8, 'ts' : '1508717416320'}]
df = spark.createDataFrame(x)
df = df.groupBy(df['user'], df['domain'], df['subdomain']).agg(f.collect_list(df['value']).alias('value'), f.collect_list(df['score']).alias('score'))
df = df.select(df['user'], df['domain'], df['subdomain'], f.UserDefinedFunction(func, StringType())(df['value'], df['score']).alias('values'))
df.collect()
schema = ArrayType(StructType([StructField('value', StringType()), StructField('score', DoubleType())]))
df = df.select(df['user'], df['domain'], df['subdomain'], from_json(df['values'], schema).alias('values'))
df.collect()
但是spark2.1.0运行报错:java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
这个问题比较坑,2.1.0不支持ArrayType。
import json
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType
from pyspark.sql.functions import from_json
def func(value, score):
values = {}
for i in range(len(value)):
if value[i] in values:
values[value[i]] = values[value[i]] + score[i]
else:
values[value[i]] = score[i]
res = []
for k, v in values.items():
res.append({'value': k, 'score': v})
return json.dumps(res, ensure_ascii=False)
x = [{'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'xxx', 'score' : 0.8, 'ts' : '1508737410941'}, {'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'yyy', 'score' : 0.9, 'ts' : '1508737410941'}, {'user' : '86209203000685', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'zzz', 'score' : 0.8, 'ts' : '1508717416320'}]
df = spark.createDataFrame(x)
df = df.groupBy(df['user'], df['domain'], df['subdomain']).agg(f.collect_list(df['value']).alias('value'), f.collect_list(df['score']).alias('score'))
df = df.select(df['user'], df['domain'], df['subdomain'], f.UserDefinedFunction(func, StringType())(df['value'], df['score']).alias('values'))
df.collect()
schema = ArrayType(StructType([StructField('value', StringType()), StructField('score', DoubleType())]))
df = df.select(df['user'], df['domain'], df['subdomain'], from_json(df['values'], schema).alias('values'))
df.collect()
但是spark2.1.0运行报错:java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
这个问题比较坑,2.1.0不支持ArrayType。
2017年10月13日星期五
intellij添加python包
File -> Project Sturcture... 中先看下左侧的Modules中有没有添加Python的module,没有则先添加python module。
然后再点左侧的Global Libraries,添加需要的python source包目录即可。
例如,我进行pyspark开发,将spark-2.2.0-bin-hadoop2.7/python添加到source中,这样项目中就可以跳转到pyspark的代码并且有代码提示了。
然后再点左侧的Global Libraries,添加需要的python source包目录即可。
例如,我进行pyspark开发,将spark-2.2.0-bin-hadoop2.7/python添加到source中,这样项目中就可以跳转到pyspark的代码并且有代码提示了。
2017年10月9日星期一
linux命令后台运行
http://www.cnblogs.com/lwm-1988/archive/2011/08/20/2147299.html
一、在Linux中,如果要让进程在后台运行,一般情况下,我们在命令后面加上&即可,实际上,这样是将命令放入到一个作业队列中了:
$ ./test.sh &
[1] 17208
$ jobs -l
[1]+ 17208 Running ./test.sh &
二、对于已经在前台执行的命令,也可以重新放到后台执行,首先按ctrl+z暂停已经运行的进程,然后使用bg命令将停止的作业放到后台运行:
$ ./test.sh
[1]+ Stopped ./test.sh
$ bg %1
[1]+ ./test.sh &
$ jobs -l
[1]+ 22794 Running ./test.sh &
三、但是如上方到后台执行的进程,其父进程还是当前终端shell的进程,而一旦父进程退出,则会发送hangup信号给所有子进程,子进程收到hangup以后也会退出。如果我们要在退出shell的时候继续运行进程,则需要使用nohup忽略hangup信号,或者setsid将将父进程设为init进程(进程号为1)
$ echo $$
21734
$ nohup ./test.sh &
[1] 29016
$ ps -ef | grep test
515 29710 21734 0 11:47 pts/12 00:00:00 /bin/sh ./test.sh
515 29713 21734 0 11:47 pts/12 00:00:00 grep test
$ setsid ./test.sh &
[1] 409
$ ps -ef | grep test
515 410 1 0 11:49 ? 00:00:00 /bin/sh ./test.sh
515 413 21734 0 11:49 pts/12 00:00:00 grep test
四、上面的试验演示了使用nohup/setsid加上&使进程在后台运行,同时不受当前shell退出的影响。那么对于已经在后台运行的进程,该怎么办呢?可以使用disown命令(效果与setid相同,但是disown后无法通过jobs命令查看了):
$ ./test.sh &
[1] 2539
$ jobs -l
[1]+ 2539 Running ./test.sh &
$ disown -h %1
$ ps -ef | grep test
515 410 1 0 11:49 ? 00:00:00 /bin/sh ./test.sh
515 2542 21734 0 11:52 pts/12 00:00:00 grep test
五、另外还有一种方法,即使将进程在一个subshell中执行,其实这和setsid异曲同工。方法很简单,将命令用括号() 括起来即可:
$ (./test.sh &)
$ ps -ef | grep test
515 410 1 0 11:49 ? 00:00:00 /bin/sh ./test.sh
515 12483 21734 0 11:59 pts/12 00:00:00 grep test
一、在Linux中,如果要让进程在后台运行,一般情况下,我们在命令后面加上&即可,实际上,这样是将命令放入到一个作业队列中了:
$ ./test.sh &
[1] 17208
$ jobs -l
[1]+ 17208 Running ./test.sh &
二、对于已经在前台执行的命令,也可以重新放到后台执行,首先按ctrl+z暂停已经运行的进程,然后使用bg命令将停止的作业放到后台运行:
$ ./test.sh
[1]+ Stopped ./test.sh
$ bg %1
[1]+ ./test.sh &
$ jobs -l
[1]+ 22794 Running ./test.sh &
三、但是如上方到后台执行的进程,其父进程还是当前终端shell的进程,而一旦父进程退出,则会发送hangup信号给所有子进程,子进程收到hangup以后也会退出。如果我们要在退出shell的时候继续运行进程,则需要使用nohup忽略hangup信号,或者setsid将将父进程设为init进程(进程号为1)
$ echo $$
21734
$ nohup ./test.sh &
[1] 29016
$ ps -ef | grep test
515 29710 21734 0 11:47 pts/12 00:00:00 /bin/sh ./test.sh
515 29713 21734 0 11:47 pts/12 00:00:00 grep test
$ setsid ./test.sh &
[1] 409
$ ps -ef | grep test
515 410 1 0 11:49 ? 00:00:00 /bin/sh ./test.sh
515 413 21734 0 11:49 pts/12 00:00:00 grep test
四、上面的试验演示了使用nohup/setsid加上&使进程在后台运行,同时不受当前shell退出的影响。那么对于已经在后台运行的进程,该怎么办呢?可以使用disown命令(效果与setid相同,但是disown后无法通过jobs命令查看了):
$ ./test.sh &
[1] 2539
$ jobs -l
[1]+ 2539 Running ./test.sh &
$ disown -h %1
$ ps -ef | grep test
515 410 1 0 11:49 ? 00:00:00 /bin/sh ./test.sh
515 2542 21734 0 11:52 pts/12 00:00:00 grep test
五、另外还有一种方法,即使将进程在一个subshell中执行,其实这和setsid异曲同工。方法很简单,将命令用括号() 括起来即可:
$ (./test.sh &)
$ ps -ef | grep test
515 410 1 0 11:49 ? 00:00:00 /bin/sh ./test.sh
515 12483 21734 0 11:59 pts/12 00:00:00 grep test
linux中&、jobs、fg、bg等命令的使用方法
http://blog.sina.com.cn/s/blog_673ee2b50100iywr.html
一. & 最经常被用到
这个用在一个命令的最后,可以把这个命令放到后台执行
二. ctrl + z
可以将一个正在前台执行的命令放到后台,并且暂停
三. jobs
查看当前有多少在后台运行的命令
四. fg
将后台中的命令调至前台继续运行
如果后台中有多个命令,可以用 fg %jobnumber将选中的命令调出,%jobnumber是通过jobs命令查到的后台正在执行的命令的序号(不是pid)
五. bg
将一个在后台暂停的命令,变成继续执行
如果后台中有多个命令,可以用bg %jobnumber将选中的命令调出,%jobnumber是通过jobs命令查到的后台正在执行的命令的序号(不是pid)
一. & 最经常被用到
这个用在一个命令的最后,可以把这个命令放到后台执行
二. ctrl + z
可以将一个正在前台执行的命令放到后台,并且暂停
三. jobs
查看当前有多少在后台运行的命令
四. fg
将后台中的命令调至前台继续运行
如果后台中有多个命令,可以用 fg %jobnumber将选中的命令调出,%jobnumber是通过jobs命令查到的后台正在执行的命令的序号(不是pid)
五. bg
将一个在后台暂停的命令,变成继续执行
如果后台中有多个命令,可以用bg %jobnumber将选中的命令调出,%jobnumber是通过jobs命令查到的后台正在执行的命令的序号(不是pid)
订阅:
博文 (Atom)