2017年10月31日星期二

spark合并小文件

spark使用FileUtil.copyMerge来进行小文件合并:https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html

pyspark中dataframe union的一个问题

>>> a = [{'a': 1, 'b': 2}]
>>> x = spark.createDataFrame(a)
>>> b = sc.parallelize([(3, 4)])
>>> y = spark.createDataFrame(b, ['b', 'a'])
>>> x.collect()
[Row(a=1, b=2)]
>>> y.collect()
[Row(b=3, a=4)]
>>> z = x.union(y)
>>> z.collect()
[Row(a=1, b=2), Row(a=3, b=4)]

正确的结果应该是[Row(a=1, b=2), Row(a=4, b=3)],但实际输出的结果第二个Row的a和b反了。猜测DataFrame的union是按照顺序来的,并不是按照column的名称对应的。

Also as standard in SQL, this function resolves columns by position (not by name). Spark 2.3提供了unionByName可以解决问题,目前解决办法是把x与y的字段名排序要一样才行。

2017年10月30日星期一

spark读写mongodb的一个问题

从mongodb的某个collection中读取了df,做了一些操作后又overwrite写回该collection会有问题。因为在写的时候才action,猜测可能因为分布式的同时读写造成的问题。

问题确认:
将df cache,在回写之前先做一次action,让结果缓存到内存,然后再写mongo没有问题。

解决:
从一个collection读,写到另一个 collection

pyspark中判断DataFrame是否为空

if df.head() is not None:
  xxx

mongodb启动问题

在配置中修改了dbPath,无法启动,需要将修改的dbPath的用户和组设置为mongodb:mongodb即可。

还有可能在日志里面发现socket permission的问题,删除/tmp/mongodb-27017.sock后再sudo service mongod start

2017年10月29日星期日

npm install 速度慢

换为国内镜像:
npm install --registry=http://registry.npm.taobao.org

永久设置:
npm config set registry http://registry.npm.taobao.org

ubuntu下E: Unable to locate package问题

刚安装了ubuntu 16.04系统,sudo apt install tmux时候遇到E: Unable to locate package tmux的问题,解决:
sudo apt-get update

2017年10月27日星期五

tensorflow serving 运算特别慢

之前从源码编译的方式安装了tensorflow serving,但是部署到线上发现特别吃cpu,并且速度很慢。根据https://github.com/tensorflow/serving/issues/456,猜测应该是编译选项设置的问题。

后来换成了通过apt-get安装方式的二进制文件,问题解决。

spark 2.1.0 from_json使用中的问题

对于以下代码,spark2.2.0运行正常:
import json
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType
from pyspark.sql.functions import from_json

def func(value, score):
  values = {}
  for i in range(len(value)):
    if value[i] in values:
      values[value[i]] = values[value[i]] + score[i]
    else:
      values[value[i]] = score[i]
  res = []
  for k, v in values.items():
    res.append({'value': k, 'score': v})
  return json.dumps(res, ensure_ascii=False)

x = [{'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'xxx', 'score' : 0.8, 'ts' : '1508737410941'}, {'user' : '86209203000295', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'yyy', 'score' : 0.9, 'ts' : '1508737410941'}, {'user' : '86209203000685', 'domain' : 'music', 'subdomain' : 'artist', 'value' : 'zzz', 'score' : 0.8, 'ts' : '1508717416320'}]
df = spark.createDataFrame(x)
df = df.groupBy(df['user'], df['domain'], df['subdomain']).agg(f.collect_list(df['value']).alias('value'), f.collect_list(df['score']).alias('score'))
df = df.select(df['user'], df['domain'], df['subdomain'], f.UserDefinedFunction(func, StringType())(df['value'], df['score']).alias('values'))
df.collect()
schema = ArrayType(StructType([StructField('value', StringType()), StructField('score', DoubleType())]))
df = df.select(df['user'], df['domain'], df['subdomain'], from_json(df['values'], schema).alias('values'))
df.collect()

但是spark2.1.0运行报错:java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType

这个问题比较坑,2.1.0不支持ArrayType。

2017年10月13日星期五

intellij添加python包

File -> Project Sturcture... 中先看下左侧的Modules中有没有添加Python的module,没有则先添加python module。

然后再点左侧的Global Libraries,添加需要的python source包目录即可。

例如,我进行pyspark开发,将spark-2.2.0-bin-hadoop2.7/python添加到source中,这样项目中就可以跳转到pyspark的代码并且有代码提示了。

2017年10月9日星期一

linux命令后台运行

http://www.cnblogs.com/lwm-1988/archive/2011/08/20/2147299.html

一、在Linux中,如果要让进程在后台运行,一般情况下,我们在命令后面加上&即可,实际上,这样是将命令放入到一个作业队列中了:
$ ./test.sh &
[1] 17208
$ jobs -l
[1]+ 17208 Running                 ./test.sh & 

二、对于已经在前台执行的命令,也可以重新放到后台执行,首先按ctrl+z暂停已经运行的进程,然后使用bg命令将停止的作业放到后台运行:
$ ./test.sh
[1]+  Stopped                 ./test.sh
$ bg %1
[1]+ ./test.sh &
$ jobs -l
[1]+ 22794 Running                 ./test.sh & 

三、但是如上方到后台执行的进程,其父进程还是当前终端shell的进程,而一旦父进程退出,则会发送hangup信号给所有子进程,子进程收到hangup以后也会退出。如果我们要在退出shell的时候继续运行进程,则需要使用nohup忽略hangup信号,或者setsid将将父进程设为init进程(进程号为1)
$ echo $$
21734
$ nohup ./test.sh &
[1] 29016
$ ps -ef | grep test
515      29710 21734  0 11:47 pts/12   00:00:00 /bin/sh ./test.sh
515      29713 21734  0 11:47 pts/12   00:00:00 grep test 

$ setsid ./test.sh &
[1] 409
$ ps -ef | grep test
515        410     1  0 11:49 ?        00:00:00 /bin/sh ./test.sh
515        413 21734  0 11:49 pts/12   00:00:00 grep test 

四、上面的试验演示了使用nohup/setsid加上&使进程在后台运行,同时不受当前shell退出的影响。那么对于已经在后台运行的进程,该怎么办呢?可以使用disown命令(效果与setid相同,但是disown后无法通过jobs命令查看了):
$ ./test.sh &
[1] 2539
$ jobs -l
[1]+  2539 Running                 ./test.sh &
$ disown -h %1
$ ps -ef | grep test
515        410     1  0 11:49 ?        00:00:00 /bin/sh ./test.sh
515       2542 21734  0 11:52 pts/12   00:00:00 grep test 

五、另外还有一种方法,即使将进程在一个subshell中执行,其实这和setsid异曲同工。方法很简单,将命令用括号() 括起来即可:
$ (./test.sh &)
$ ps -ef | grep test
515        410     1  0 11:49 ?        00:00:00 /bin/sh ./test.sh
515      12483 21734  0 11:59 pts/12   00:00:00 grep test 

linux中&、jobs、fg、bg等命令的使用方法

http://blog.sina.com.cn/s/blog_673ee2b50100iywr.html

一. & 最经常被用到
这个用在一个命令的最后,可以把这个命令放到后台执行
二. ctrl + z
可以将一个正在前台执行的命令放到后台,并且暂停
三. jobs
查看当前有多少在后台运行的命令
四. fg
将后台中的命令调至前台继续运行
如果后台中有多个命令,可以用 fg %jobnumber将选中的命令调出,%jobnumber是通过jobs命令查到的后台正在执行的命令的序号(不是pid)
五. bg
将一个在后台暂停的命令,变成继续执行
如果后台中有多个命令,可以用bg %jobnumber将选中的命令调出,%jobnumber是通过jobs命令查到的后台正在执行的命令的序号(不是pid)