LittleQ

爱好:写代码

开发环境:
OS: Ubuntu 16.04 64bit

安装步骤

下载解压

1
2
3
sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.1-bin-hadoop2.6.tgz
sudo tar -xvf spark-1.6.1-bin-hadoop2.6.tgz -C /usr/dev
sudo chown -R anonymous:anonymous spark-1.6.1-bin-hadoop2.6 # anonymous为当前用户名

运行Spark

Spark支持多种语言,我们可以通过Python,Scala进入Spark交互式环境。

  • Scala Shell
1
2
cd /usr/dev/spark-1.6.1-bin-hadoop2.6
./bin/spark-shell # 启动Scala Shell

**注意:**如果报错:

1
2
3
4
Tue Jun 21 23:30:04 CST 2016 Thread[main,5,main] java.io.FileNotFoundException: derby.log (权限不够)
16/06/21 23:30:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
Tue Jun 21 23:30:04 CST 2016 Thread[main,5,main] Cleanup action starting
ERROR XBM0H: Directory /usr/dev/spark-1.6.1-bin-hadoop2.6/bin/metastore_db cannot be created.

出现这个问题一般就是没有创建文件的权限,安装的最后一个命令就是起这个作用的

1
sudo chown -R anonymous:anonymous spark-1.6.1-bin-hadoop2.6	# anonymous为当前用户名
  • Python Shell
1
./bin/pyspark

为了在Shell里面写Python有补全提示,强烈建议安装ipython,Ubuntu安装也很简单

1
sudo apt install -y ipython

然后启动的时候可以这样:

1
IPYTHON=1 ./bin/pyspark

进入Shell大概是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
➜  spark-1.6.1-bin-hadoop2.6 IPYTHON=1 bin/pyspark
Python 2.7.11+ (default, Apr 17 2016, 14:00:29)
Type "copyright", "credits" or "license" for more information.

IPython 2.4.1 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/21 23:57:06 INFO SparkContext: Running Spark version 1.6.1
16/06/21 23:57:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.1
/_/

Using Python version 2.7.11+ (default, Apr 17 2016 14:00:29)
SparkContext available as sc, HiveContext available as sqlContext.

In [1]: lines = sc.textFile("README.md")

如果你觉得这样也有些麻烦,可以在~/.bashrc里面加一行:

1
alias ipython='IPYTHON=1'

然后就可以这么启动:

1
2
➜  ~ cd /usr/dev/spark-1.6.1-bin-hadoop2.6 
➜ spark-1.6.1-bin-hadoop2.6 ipython bin/pyspark

简单样例

Scala也不熟,就以Python为例吧,注意我的当前目录是在/usr/dev/spark-1.6.1-bin-hadoop2.6:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
In [1]: lines = sc.textFile("README.md")
n [2]: lines.count()
16/06/21 23:57:41 INFO FileInputFormat: Total input paths to process : 1
16/06/21 23:57:41 INFO SparkContext: Starting job: count at <ipython-input-2-44aeefde846d>:1
16/06/21 23:57:41 INFO DAGScheduler: Got job 0 (count at <ipython-input-2-44aeefde846d>:1) with 2 output partitions
16/06/21 23:57:41 INFO DAGScheduler: Final stage: ResultStage 0 (count at <ipython-input-2-44aeefde846d>:1)
16/06/21 23:57:41 INFO DAGScheduler: Parents of final stage: List()
16/06/21 23:57:41 INFO DAGScheduler: Missing parents: List()
16/06/21 23:57:41 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at count at <ipython-input-2-44aeefde846d>:1), which has no missing parents
16/06/21 23:57:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.6 KB, free 173.1 KB)
16/06/21 23:57:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 176.6 KB)
16/06/21 23:57:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:36609 (size: 3.4 KB, free: 511.5 MB)
16/06/21 23:57:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/21 23:57:41 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[2] at count at <ipython-input-2-44aeefde846d>:1)
16/06/21 23:57:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/06/21 23:57:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2151 bytes)
16/06/21 23:57:41 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2151 bytes)
16/06/21 23:57:41 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/21 23:57:41 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/06/21 23:57:41 INFO HadoopRDD: Input split: file:/usr/dev/spark-1.6.1-bin-hadoop2.6/README.md:0+1679
16/06/21 23:57:41 INFO HadoopRDD: Input split: file:/usr/dev/spark-1.6.1-bin-hadoop2.6/README.md:1679+1680
16/06/21 23:57:41 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/06/21 23:57:41 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/06/21 23:57:41 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/06/21 23:57:41 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/06/21 23:57:41 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/06/21 23:57:42 INFO PythonRunner: Times: total = 283, boot = 268, init = 14, finish = 1
16/06/21 23:57:42 INFO PythonRunner: Times: total = 291, boot = 272, init = 19, finish = 0
16/06/21 23:57:42 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2124 bytes result sent to driver
16/06/21 23:57:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2124 bytes result sent to driver
16/06/21 23:57:42 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 407 ms on localhost (1/2)
16/06/21 23:57:42 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 393 ms on localhost (2/2)
16/06/21 23:57:42 INFO DAGScheduler: ResultStage 0 (count at <ipython-input-2-44aeefde846d>:1) finished in 0.423 s
16/06/21 23:57:42 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/06/21 23:57:42 INFO DAGScheduler: Job 0 finished: count at <ipython-input-2-44aeefde846d>:1, took 0.489031 s
Out[2]: 95

In [3]: lines.first()
16/06/21 23:58:15 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
16/06/21 23:58:15 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:393) with 1 output partitions
16/06/21 23:58:15 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:393)
16/06/21 23:58:15 INFO DAGScheduler: Parents of final stage: List()
16/06/21 23:58:15 INFO DAGScheduler: Missing parents: List()
16/06/21 23:58:15 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents
16/06/21 23:58:15 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.8 KB, free 181.3 KB)
16/06/21 23:58:15 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.0 KB, free 184.3 KB)
16/06/21 23:58:15 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:36609 (size: 3.0 KB, free: 511.5 MB)
16/06/21 23:58:15 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/06/21 23:58:15 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43)
16/06/21 23:58:15 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/06/21 23:58:15 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2151 bytes)
16/06/21 23:58:15 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
16/06/21 23:58:15 INFO HadoopRDD: Input split: file:/usr/dev/spark-1.6.1-bin-hadoop2.6/README.md:0+1679
16/06/21 23:58:15 INFO PythonRunner: Times: total = 41, boot = -33244, init = 33285, finish = 0
16/06/21 23:58:15 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 2143 bytes result sent to driver
16/06/21 23:58:15 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 67 ms on localhost (1/1)
16/06/21 23:58:15 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:393) finished in 0.068 s
16/06/21 23:58:15 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/06/21 23:58:15 INFO DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:393, took 0.082887 s
Out[3]: u'# Apache Spark'

看看,其实很简单的,但是有个问题,每次输入一个操作,结果搞出这么一大堆的日志,看着确实影响输入,我们可以控制一下Spark的输出日志级别,编辑spark-1.6.1-bin-hadoop2.6/conf/log4j.properties.template文件,为了安全起见,我们直接拷贝一份模板作为我们的日志配置文件:

1
2
cd conf/
sudo cp log4j.properties.template log4j.properties

定位到这一行:

1
log4j.rootCategory=INFO, console

改为:

1
log4j.rootCategory=WARN, console

看看之前的操作:

1
2
3
4
5
6
7
8
9
10
11
12
Using Python version 2.7.11+ (default, Apr 17 2016 14:00:29)
SparkContext available as sc, HiveContext available as sqlContext.

In [1]: lines = sc.textFile("README.md")

In [2]: lines.count()
Out[2]: 95

In [3]: lines.first()
Out[3]: u'# Apache Spark'

In [4]:

这样就只会输出警告及以上的日志了。

前阵子想给服务器都装上Python的全家桶,结果怎么也装不上,开始安装

1
bash Anaconda2-4.0.0-Linux-x86_64.sh

但是装不了,一直报错:

1
2
mkdir: cannot create directory `/home/q/anaconda2': Permission denied
ERROR: Could not create directory: /home/q/anaconda2

网上查了之后发现很多人都碰到这个问题,后来大家都建议这么安装:

1
sudo bash Anaconda2-4.0.0-Linux-x86_64.sh

结果报错:

1
Sorry, user xxx is not allowed to execute '/bin/bash Anaconda2-4.0.0-Linux-x86_64.sh' as root on xxxx.

我去,简直无解了,没权限执行这个命令,普通的命令又装不了,结果折腾了好久也没办法,后来通过求助之后才发现,原来给这个包附个权限就可以执行了,原来如此,说干就干:

1
2
sudo chmod 777 Anaconda2-4.0.0-Linux-x86_64.sh
sudo ./Anaconda2-4.0.0-Linux-x86_64.sh

后面一路畅通,搞定。

**Tips:**如果想用这个代替默认的环境变量,有些包这里又没有,假设你的安装目录为/home/usr/anaconda2,那么你可以这么装:

1
sudo /home/usr/anaconda2/bin/pip install <module_name>

全局解释器锁

谈到Python多线程,不得不先说一下全局解释器锁(GIL),Python代码的执行由Python虚拟机(也叫解释器主循环)来控制,虽然有多个进程,但是某一个时刻只会有一个线程在执行,对Python虚拟机的访问由全局解释器锁(global interpreter lock,GIL)来控制,正是由于有GIL,同一时刻只会有一个线程在运行,具体的执行步骤为:

1
2
3
4
5
6
7
8
1. 设置GIL
2. 切换到一个线程去运行
3. 运行
a. 运行指定字节码的指令,或者
b. 线程主动让出控制
4. 把线程设置为睡眠状态
5. 解锁GIL
6. 重复以上所有步骤

所以在调用外部代码的时候,GIL会被锁定,直到调用函数结束为止,看到这里,你可能会觉得Python程序的效率会非常低,毕竟我们的程序会去访问数据库,外部接口,加载本地文件,如果是这样,那我们的程序基本上无时无刻都是卡在那等着。其实你完全不用担心,所有面向I/O的(即调用内建的操作系统C代码)的程序,GIL会在这个I/O调用之前被释放,这样其他程序是可以在等待I/O的时候执行的。不过如果一个程序并没有很多I/O操作,那他只要运行,就一直占用CPU,多线程只对那些I/O密集的程序更有好处。

threading模块

其实还有一个模块叫thread,但是这个模块使用特别麻烦,官方不建议使用,其中对于锁的操作特别麻烦,而且还有个很大的问题,一旦主进程退出,不管子线程运行完没有都会被强制结束。所以我也不介绍这个模块怎么用了,我们直接看threading模块的使用。

传入可调用函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
from time import sleep, ctime

__author__ = 'anonymous'

loops = [4, 2]


def loop(n_loop, n_sec):
print '开始线程', n_loop, '于', ctime()
sleep(n_sec)
print 'loop函数', n_loop, '完成于', ctime()


def main():
print '开始主线程:', ctime()
threads = [] # 一个用于储存线程对象的列表
n_loops = range(len(loops))
for i in n_loops:
t = threading.Thread(target=loop, args=(i, loops[i])) # 每次循环创建一个Thread的实例
threads.append(t) # 将新创建的对象放到一个列表中

for i in n_loops:
threads[i].start() # 每次循环运行一个线程

or i in n_loops:
threads[i].join() # 等待子线程的完成

print '主线程完成:', ctime()

if __name__ == '__main__':
main()

输出结果为:

1
2
3
4
5
6
7
8
开始主线程: Sat Jun 18 19:47:40 2016
开始线程 开始线程0 于 Sat Jun 18 19:47:40 20161
于 Sat Jun 18 19:47:40 2016
loop函数 1 完成于 Sat Jun 18 19:47:42 2016
loop函数 0 完成于 Sat Jun 18 19:47:44 2016
主线程完成: Sat Jun 18 19:47:44 2016

Process finished with exit code 0

一旦调用start()方法被调用的函数就开始执行了,如果你在主函数里面还要做其他操作,并且这个操作与子线程的执行无关,你完全不用调用join(),可以去做其他操作。调用join()会一直等待子线程执行完毕返回然后再执行后面的步骤。
**注意:**这个输出并不是连串的,因为我的CPU是多核的,所以日志看上去有点儿不太正常。target一个可调用对象,这里我们传入了一个函数;args是一个元祖,均以位置参数的方式传递给被调用对象。

传入可调用类

我们也可以传入一个可调用类给Thread实例,不过类必须要实现__call()__方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
from time import sleep, ctime

__author__ = 'anonymous'

loops = [4, 2]


def loop(n_loop, n_sec):
print '开始线程', n_loop, '于', ctime()
sleep(n_sec)
print 'loop函数', n_loop, '完成于', ctime()


class ThreadFunc(object):
def __init__(self, func, args):
self.func = func
self.args = args

def __call__(self): # 关键是要实现这个方法
apply(self.func, self.args)


def main():
print '开始主线程:', ctime()
threads = [] # 一个用于储存线程对象的列表
n_loops = range(len(loops))
for i in n_loops:
t = threading.Thread(target=ThreadFunc(func=loop, args=(i, loops[i])), ) # 每次循环创建一个Thread的实例,目标是一个类
threads.append(t) # 将新创建的对象放到一个列表中

for i in n_loops:
threads[i].start() # 每次循环运行一个线程

for i in n_loops:
threads[i].join() # 等待子线程的完成

print '主线程完成:', ctime()


if __name__ == '__main__':
main()

函数的执行结果和上面是一样的,这里我就不再贴运行结果了。

创建Thread子类

这种方式比较灵活,更加通用,只用在内部冲洗run方法即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
from time import sleep, ctime

__author__ = 'anonymous'

loops = [4, 2]


def loop(n_loop, n_sec):
print '开始线程', n_loop, '于', ctime()
sleep(n_sec)
print 'loop函数', n_loop, '完成于', ctime()


class MyThread(threading.Thread):
def __init__(self, func, args):
threading.Thread.__init__(self) # 调用父类的构造函数
self.func = func
self.args = args

def run(self):
apply(self.func, self.args)


def main():
print '开始主线程:', ctime()
threads = [] # 一个用于储存线程对象的列表
n_loops = range(len(loops))
for i in n_loops:
t = MyThread(func=loop, args=(i, loops[i])) # 使用我们自己的类来新建对象
threads.append(t) # 将新创建的对象放到一个列表中
for i in n_loops:
threads[i].start() # 每次循环运行一个线程

for i in n_loops:
threads[i].join() # 等待子线程的完成

print '主线程完成:', ctime()


if __name__ == '__main__':
main()

调用子类MyThreadstart()方法即可,同样join()用于等待自线程执行完。这种方式相比上面的两种方式好处在哪?想象一下,我们的函数执行如果返回的是一个二维数组,如果仅仅有start(),join()这样的方法,我们怎么获取最后我们执行的返回值呢?如果是子类,我们完全可以在调用run()方法里面把这个结果保存下来,然后最后调用子类的实例去获取这个变量就行了,像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
from time import sleep, ctime

__author__ = 'anonymous'

loops = [4, 2]


def loop(n_loop, n_sec):
return n_loop + n_sec # 返回两个参数的和


class MyThread(threading.Thread):
def __init__(self, func, args):
threading.Thread.__init__(self) # 调用父类的构造函数
self.func = func
self.args = args
self.result = None

def run(self):
self.result = apply(self.func, self.args) # 调用函数的结果作为一个属性


def main():
print '开始主线程:', ctime()
threads = [] # 一个用于储存线程对象的列表
n_loops = range(len(loops))
for i in n_loops:
t = MyThread(func=loop, args=(i, loops[i])) # 使用我们自己的类来新建对象
threads.append(t) # 将新创建的对象放到一个列表中
for i in n_loops:
threads[i].start() # 每次循环运行一个线程

for i in n_loops:
threads[i].join() # 等待子线程的完成

for i in n_loops:
print '执行结果为:', threads[i].result # 打印该对象的属性

print '主线程完成:', ctime()


if __name__ == '__main__':
main()

执行结果为:

1
2
3
4
5
6
开始主线程: Sat Jun 18 20:11:51 2016
执行结果为: 4
执行结果为: 3
主线程完成: Sat Jun 18 20:11:51 2016

Process finished with exit code 0

Python进程池

实际在项目中,我们可能会碰到这样的问题,有一个MySQL的表分库分表了,总数大概有1000张,我们想看看每天的数据量有多大,由于这个是线上的库,我们不能开太多的连接,如果把数据库搞挂了不好,但是一个一个去算又太慢,所以需要控制连接的数量,太大太小都不好,这个时候可以使用进程池,控制并发的数量。

进程池(非阻塞)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = 'anonymous'

from multiprocessing import freeze_support, Pool
import time


def Foo(i):
time.sleep(2)
print 'time start:%s' % time.strftime('%Y-%m-%d %H:%M:%S')
return i + 100


def Bar(arg):
print 'time done:%s %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), arg)


if __name__ == '__main__':
freeze_support()
pool = Pool(3) # 线程池中的同时执行的进程数为3

for i in range(4):
pool.apply_async(func=Foo, args=(i,), callback=Bar) # 线程池中的同时执行的进程数为3,当一个进程执行完毕后,如果还有新进程等待执行,则会将其添加进去

print('end')
pool.close()
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

输出结果为:

1
2
3
4
5
6
7
8
9
end
time start:2016-06-19 18:08:12
time done:2016-06-19 18:08:12 100
time start:2016-06-19 18:08:12
time done:2016-06-19 18:08:12 101
time start:2016-06-19 18:08:12
time done:2016-06-19 18:08:12 102
time start:2016-06-19 18:08:14
time done:2016-06-19 18:08:14 103

进程池的大小为3,很明显可以看到,前三个函数的开始时间都是一样的,并且都是不等Foo函数执行完就掉用Bar函数

  • apply_async(func[, args[, kwds[, callback]]])它是非阻塞apply(func[, args[, kwds]])阻塞的.
  • close()关闭pool,使其不再接受新的任务
  • join() 主进程阻塞,等待子进程退出,必须在close()terminate()之后调用

进程池(阻塞)

和上面的代码差不多,只是把对应部分改一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = 'anonymous'

from multiprocessing import freeze_support, Pool
import time


def Foo(i):
time.sleep(2)
print 'time start:%s' % time.strftime('%Y-%m-%d %H:%M:%S')
return i + 100


def Bar(arg):
print 'time done:%s %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), arg)


if __name__ == '__main__':
freeze_support()
pool = Pool(3) # 线程池中的同时执行的进程数为3

for i in range(4):
pool.apply(func=Foo, args=(i,))

print('end')
pool.close()
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

执行结果如下:

1
2
3
4
5
time start:2016-06-19 18:14:00
time start:2016-06-19 18:14:02
time start:2016-06-19 18:14:04
time start:2016-06-19 18:14:06
end

进程池(关注返回结果)

多半情况下我们还是要关注函数的执行返回结果,并不能完全想像上面那样阻塞运行或者非阻塞等函未执行完就调用回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocess

__author__ = 'anonymous'
import time


def func(msg):
print 'time start:%s' % time.strftime('%Y-%m-%d %H:%M:%S')
time.sleep(2)
print 'time end:%s' % time.strftime('%Y-%m-%d %H:%M:%S')
return 'done' + msg

if __name__ == '__main__':
pool = multiprocess.Pool(2)
result = []

for i in range(3):
msg = 'hello %s' % i
result.append(pool.apply_async(func=func, args=(msg,)))

pool.close()
pool.join()

for res in result:
print '***: %s' % res.get()

print 'end'

执行看一下返回结果是啥

1
2
3
4
5
6
7
8
9
10
time start:2016-06-19 18:43:22
time start:2016-06-19 18:43:22
time end:2016-06-19 18:43:24
time start:2016-06-19 18:43:24
time end:2016-06-19 18:43:24
time end:2016-06-19 18:43:26
***: done hello 0
***: done hello 1
***: done hello 2
end

可以看到,我们可以使用get()方法获取被调用函数的返回值

multiprocessing

还有一种方式,可以使用pool.map(),以我们最开始的例子为例,我们要查询1000张表,可以把参数放在一个可迭代对象里,使用pool.map()找依次多个处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing

__author__ = 'anonymous'


def m1(x):
return x * x


if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
result = pool.map(m1, i_list)

print sum(result)

执行结果:

1
140

最近刚装好了Ubuntu16.04,好不容易配置好了开发环境,突然发现IDEA,和PyCharm无法从启动栏和Dash菜单中启动,摸索来一阵之后,查阅资料发现来问题所在,记录一下,也希望给其他人一点参考,简单来说就是,环境变量不对导致无法启动。
以IntelliJ IDEA为例,我的IDEA和Pycharm都只能从终端启动,很不方便,即是从终端启动了,我把图标固定在左侧的快速启动栏,仍然启动不了,下面看看解决办法。
首先去~/.local/share/applications目录看下有没有你的应用程序启动文件,关键字搜索一下,如果没有就去/usr/share/applications文件夹看看,找到你的程序,双击,如果能够启动则应该没是没有问题的,但是双击不能启动,则即使你把图标固定到快速启动栏,程序仍然启动不了。
双击IntelliJ IDEA图标,提示报错信息,找不到JDK,问题就出在这里,但是奇怪的是我明明装了JDK,也设置来环境变量,怎么会找不到呢?于是我用vim查看了一下这个启动项的具体配置:

1
2
3
4
5
6
7
8
9
10
11
12
$ cd /usr/share/applications
$ cat jetbrains-idea.desktop
[Desktop Entry]
Version=1.0
Type=Application
Name=IntelliJ IDEA
Icon=/usr/dev/idea-IU-139.1117.1/bin/idea.png
Exec="/usr/dev/idea-IU-139.1117.1/bin/idea.sh" %f
Comment=Develop with pleasure!
Categories=Development;IDE;
Terminal=false
StartupWMClass=jetbrains-idea

可以看到启动程序调用的脚本路径/usr/dev/idea-IU-139.1117.1/bin/idea.sh,这个就是我IDEA的解压安装路径,但是奇怪的是我从终端启动也是运行的这个脚本,就可以启动,为什么系统调用就不行呢?而且错误信息提示的是找不到JDK,后来我才明白,原来我安装了zsh,所以设置在~/.zshrc里面的环境变量并不能对bash启动的程序生效,所以找不到JDK。
既然明白了问题所在,解决办法也就有了,要么把环境变量设置成系统级别的环境变量,要么直接在对应的启动脚本中加入JDK的路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ cat /usr/dev/idea-IU-139.1117.1/bin/idea.sh
# ---------------------------------------------------------------------
# Locate a JDK installation directory which will be used to run the IDE.
# Try (in order): IDEA_JDK, JDK_HOME, JAVA_HOME, "java" in PATH.
# ---------------------------------------------------------------------
if [ -n "$IDEA_JDK" -a -x "$IDEA_JDK/bin/java" ]; then
JDK="$IDEA_JDK"
elif [ -n "$JDK_HOME" -a -x "$JDK_HOME/bin/java" ]; then
JDK="$JDK_HOME"
elif [ -n "$JAVA_HOME" -a -x "$JAVA_HOME/bin/java" ]; then
JDK="$JAVA_HOME"
else
JAVA_BIN_PATH=`which java`
.......

分析启动脚本,反正就是找不到JAVA_HOME,JDK的路径了,简单粗暴的直接在检查变量之前定义好,在注释下面,条件判断的前面加上JAVA的配置

1
2
JAVA_HOME=/usr/dev/jdk1.7.0_40
JRE_HOME=${JAVA_HOME}/jre

这样再点击图标就可以运行了。

分析日志经常要用到正则来提取需要分析的关键信息,其中需要注意的就是,Hive本身史用Java写的,所以HQL里面的正则和Java里面史一样的,但是具体在使用的时候,会碰到很多问题,就是转义的问题,总结了一下在各个语言中使用HQL时正则的不同情况。

Hive Cli

平时使用的时候,很多情况下都是在Hive Cli客户端中使用,这里和正常的正则需要稍微有一些区别,例如匹配数字,正常情况下就\d,但是在Hive Cli中需要注意,应该使用\\d

1
2
3
4
select
regexp_extract(content, '.*id=(\\d*).*', 1) as id
from
test.table_test;

Shell

还有一种更为常见的使用场景,就是在Shell脚本中使用,Shell脚本中又涉及到单引号和双引号的区别,单引号史强引用类型,\不会被转义,但是双引号字符串则会转义,例如:

1
2
3
4
5
6
7
8
9
10
reg_str1='.*id=(\\d*).*'
reg_str2=".*id=(\\\\d*).*"

sudo -uhiev_user /usr/dev/hive-1.2.0/bin/hive -e "
select
regexp_extract(content, '${reg_str1}', 1) as id1,
regexp_extract(content, '${reg_str2}', 1) as id2
from
test.table_test;
"

这两个正则表达式是一样的,其实确认你的HQL正则表达式是否有用,可以先用echo在sudo前面看看,如果输出的史\\d则说明有用

Python

有时候需要在Python中执行HQL,好像Hive并不支持Python接口,所以我采取的是在Python中执行Shel命令

1
2
3
4
5
6
# bash_cmd为Shell命令
bash_cmd = """
sudo -uhive_user /usr/dev/hive-1.2.0/bin/hive -e "{0}"
""".format(sql) # sql即为要执行的HQL

hql_process = subprocess.Popen(bash_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在Python中我定义来一个sql

1
2
3
4
5
6
sql = """
select
regexp_extract(content, '.*id=(\\\\\\d*).*', 1) as id
from
test.table_test;
"""

从简单的例子学习Flask,因为是简单的例子,就用子单的sqlite3数据库了,如果项目大了,数据量大可以考虑使用MySQL

项目步骤

一步步来演示一个项目是如何创建的

  • 先创建文件夹
    1
    2
    3
    4
    ➜  flasker  tree
    .
    ├── static
    └── templates

数据库模式,将下面的内容保存在schema.sql,放在flasker目录下就行:

1
2
3
4
5
6
drop table if exists entries;
create table entries (
id integer primary key autoincrement,
title text not null,
text text not null
);

应用构建

创建应用模块,吧模块命名为flaskr.py,就放在flaskr文件夹中,为了方便学习,把库的导入和相关配置放在了一起,但是一个清晰的方案应该是放在一个独立的__init__.py文件中,然后在模块里导入配置,不过这个也有一个不好的地方,就是你在主文件里不知道的包到底是从哪导入的,flaskr.py内容如下:

1
2
3
4
5
6
7
8
9
10
11
# all the imports
import sqlite3
from flask import Flask, request, session, g, redirect, url_for, \
abort, render_template, flash

# configuration
DATABASE = '/tmp/r.db'
DEBUG = True
SECRET_KEY = 'development key'
USERNAME = 'admin'
PASSWORD = 'default'

然后在同一个文件中创建真正的应用,使用配置来初始化:

1
2
3
# create our little application :)
app = Flask(__name__)
app.config.from_object(__name__)

from_object的传入参数如果是字符串则直接导入,它会搜索加载多有大写的变量名,就是我们写在最前面的,你也可以把这个写在__init__.py文件中。一般都是总配置文件导入配置的,建议使用from_envvar()来导入配置,上面的第二行可以替换为:

1
app.config.from_envvar('FLASKR_SETTINGS', silent=True)

这个可以设置一个FLASKR_SETTINGS变量来指定一个配置文件,并根据该文件来重载缺省配置,silent意思是如果没有,则不报错。
然后添加一个用于连接数据库的方法。

1
2
def connect_db():
return sqlite3.connect(app.config['DATABASE'])

然后在最后以单机模式启动的代码:

1
2
if __name__ == '__main__':
app.run()

这样虽然可以启动服务器,但是无法访问界面,因为没有构建任何视图。

创建数据库

每次去执行命令导入不是很方便,受限于系统,所以添加一个数据库初始化函数,首先要导入contextlib.closing()函数,即:

1
from contextlib import closing

然后创建一个初始数据库的函数init_db():

1
2
3
4
5
def init_db():
with closing(connect_db()) as db:
with app.open_resource('schema.sql', mode='r') as f:
db.cursor().executescript(f.read())
db.commit()

closing()函数允许我们在with代码块中保持数据库打开,然后open_resouce()也支持这个功能,直接在with代码块中使用,sqlite3里面的sql都必须显示的提交才会生效。

请求数据库连接

当然你可以生成一个全局的数据库连接句柄,这样在每个函数里面就可以使用数据库连接了,但是并不推荐这样,会带来很多问题,也不够优雅。Flask里面利用装饰器能够做到优雅的访问数据库,before_request(),after_request(),teardown_request()这三个装饰器就可以满足需求:

1
2
3
4
5
6
7
8
9
10
@app.before_request
def before_request():
g.db = connect_db()

@app.teardown_request
def teardown_request(exception):
db = getattr(g, 'db', None)
if db is not None:
db.close()
g.db.close()

来看看这段代码是如何的优雅,使用before_request()装饰函数会在请求之前调用,不用传递任何参数,这样在每个视图函数里面都可以通过全局g对象获取数据库连接句柄。使用after_request()会在请求之后调用,并且会传递相应对象给客户端,所以出错了就不会执行。因此需要用到第三个装饰器teardown_request()装饰器,这个装饰器会在响应对象构建完之后才调用被装饰的函数,不允许修改请求,而且返回值也会被忽略,如果出错了,这个错误会传递给每个函数。这里的g对象简单理解就是一个神奇的全局对象,并且多线程也可以正常工作。

视图函数

在数据库连接处理完之后,就可以来构造视图了,下面简单介绍一个例子:

显示条目

这个视图将会显示所有数据库中的连接,模板为show_entries.html,并返回渲染结果:

1
2
3
4
5
@app.route('/')
def show_entries():
cur = g.db.execute('select title, text from entries order by id desc')
entries = [dict(title=row[0], text=row[1]) for row in cur.fetchall()]
return render_template('show_entries.html', entries=entries)

添加条目

添加一条新记录,添加完之后并不会显示,结果显示在show_entries页面中,如果成功,则会flash()一个消息给下一个请求并重定向到show_entries页面:

1
2
3
4
5
6
7
8
9
@app.route('/add', methods=['POST'])
def add_entry():
if not session.get('logged_in'):
abort(401)
g.db.execute('insert into entries (title, text) values (?, ?)',
[request.form['title'], request.form['text']])
g.db.commit()
flash('New entry was successfully posted')
return redirect(url_for('show_entries'))

这里还有个检查是否登陆的,就是logged_in是否为True。另外一个,为了防止SQL注入,劲量不要拼sql,用?代替。

登陆和注销

用于登陆和注销,根据配置中的用户名和密码验证用户会话中设置logged_in的键值。如果通过验证,则设置logged_in为True,然后重定向到show_entries页面。另外闪现一个信息,告诉用户已登陆成功,如果出错,则提示错误信息,并重新登陆:

1
2
3
4
5
6
7
8
9
10
11
12
13
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
if request.form['username'] != app.config['USERNAME']:
error = 'Invalid username'
elif request.form['password'] != app.config['PASSWORD']:
error = 'Invalid password'
else:
session['logged_in'] = True
flash('You were logged in')
return redirect(url_for('show_entries'))
return render_template('login.html', error=error)

注销视图则会相反,移除键值:

1
2
3
4
5
@app.route('/logout')
def logout():
session.pop('logged_in', None)
flash('You were logged out')
return redirect(url_for('show_entries'))

使用pop()函数如果传递了第二个参数(键的缺省值),如果有的话就会删掉,如果没有,就啥也不做。

模板

光有视图还不够,上面用到的模板还没有写好,访问也会报错,然后还用到了模板继承保存所有页面布局统一,这些文件都保存在templates文件夹中:

layout.html

这个模板包含了HTML的骨架,头部和一个登陆链接(如果用户已登陆则变为一个注销连接)。如果有闪现消息,则也显示出来:

1
{% block body %}

上面的块儿会被子模块中同名的body替换。而且session在模板中也可以使用,如果键值(属性)不存在也可以正常运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!doctype html>
<title>Flaskr</title>
<link rel=stylesheet type=text/css href="{{ url_for('static', filename='style.css') }}">
<div class=page>
<h1>Flaskr</h1>
<div class=metanav>
{% if not session.logged_in %}
<a href="{{ url_for('login') }}">log in</a>
{% else %}
<a href="{{ url_for('logout') }}">log out</a>
{% endif %}
</div>
{% for message in get_flashed_messages() %}
<div class=flash>{{ message }}</div>
{% endfor %}
{% block body %}{% endblock %}
</div>

show_entries.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{% extends "layout.html" %}
{% block body %}
{% if session.logged_in %}
<form action="{{ url_for('add_entry') }}" method=post class=add-entry>
<dl>
<dt>Title:
<dd><input type=text size=30 name=title>
<dt>Text:
<dd><textarea name=text rows=5 cols=40></textarea>
<dd><input type=submit value=Share>
</dl>
</form>
{% endif %}
<ul class=entries>
{% for entry in entries %}
<li><h2>{{ entry.title }}</h2>{{ entry.text|safe }}
{% else %}
<li><em>Unbelievable. No entries here so far</em>
{% endfor %}
</ul>
{% endblock %}

我们在第一行使用了layout.html模板,扩展了基础模板,用于显示信息。for遍历了我们通过render_template()函数所有传递信息。模板也指明了method为post提交数据。

login.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{% extends "layout.html" %}
{% block body %}
<h2>Login</h2>
{% if error %}<p class=error><strong>Error:</strong> {{ error }}{% endif %}
<form action="{{ url_for('login') }}" method=post>
<dl>
<dt>Username:
<dd><input type=text name=username>
<dt>Password:
<dd><input type=password name=password>
<dd><input type=submit value=Login>
</dl>
</form>
{% endblock %}

添加样式

现在数据库连接有了,视图也有了,然后模板也有了,最后就差样式了,我们来添加一下样式,保存为style.css保存在static文件夹中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
body            { font-family: sans-serif; background: #eee; }
a, h1, h2 { color: #377ba8; }
h1, h2 { font-family: 'Georgia', serif; margin: 0; }
h1 { border-bottom: 2px solid #eee; }
h2 { font-size: 1.2em; }

.page { margin: 2em auto; width: 35em; border: 5px solid #ccc;
padding: 0.8em; background: white; }
.entries { list-style: none; margin: 0; padding: 0; }
.entries li { margin: 0.8em 1.2em; }
.entries li h2 { margin-left: -1em; }
.add-entry { font-size: 0.9em; border-bottom: 1px solid #ccc; }
.add-entry dl { font-weight: bold; }
.metanav { text-align: right; font-size: 0.8em; padding: 0.3em;
margin-bottom: 1em; background: #fafafa; }
.flash { background: #cee5F5; padding: 0.5em;
border: 1px solid #aacbe2; }
.error { background: #f0d6d6; padding: 0.5em; }

测试Flask

这个必须介绍一下,因为写代码的过程中免不了要调试和测试,现在讲一下怎么用unittest包来测试flask应用

测试骨架

为了测试我们的应用,我们添加一个新的模块flaskr_tests.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import os
import flaskr
import unittest
import tempfile

class FlaskrTestCase(unittest.TestCase):

def setUp(self):
self.db_fd, flaskr.app.config['DATABASE'] = tempfile.mkstemp()
flaskr.app.config['TESTING'] = True
self.app = flaskr.app.test_client()
flaskr.init_db()

def tearDown(self):
os.close(self.db_fd)
os.unlink(flaskr.app.config['DATABASE'])

if __name__ == '__main__':
unittest.main()

稍微介绍一下这个测试是个啥意思:

  1. setUp()方法中会创建一个新的测试客户端并出书画了一个连接,每一个独立的测试函数运行之前都要调用这个函数
  2. tearDown()功能是在测试结束以后关闭文件,并在文件中删除数据库库文件,另外TESTING=True,这意味着在请求时关闭错误捕捉,这样可以真实的捕捉到错误,得到更好的错误报告。

测试客户端会提供一个简单的应用接口,我们通过这个接口向应用发送测试请求,还可以追踪cookies。
因为sqlite3是一个文件系统数据库,所以可以使用临时文件来创建一个临时数据库并初始化它。mkstemp()函数返回两个东西:一个低级别的文件句柄和一个随机文件名,这个文件名将会作为我们的数据库名称。必须把句柄保存到self.db_fd种,这样在整个测试类里面才能在其他方法中来关闭文件。
可以在终端中运行测试程序,如果没有报错,才说明没有语法错误:

1
2
3
4
5
6
(env)➜  flasker  python flaskr_tests.py 

----------------------------------------------------------------------
Ran 0 tests in 0.000s

OK

第一个测试

Web应用就是测试一些URL访问是否正常,添加一个访问URL(/)的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class FlaskrTestCase(unittest.TestCase):

def setUp(self):
self.db_fd, flaskr.app.config['DATABASE'] = tempfile.mkstemp()
self.app = flaskr.app.test_client()
flaskr.init_db()

def tearDown(self):
os.close(self.db_fd)
os.unlink(flaskr.app.config['DATABASE'])

def test_empty_db(self):
rv = self.app.get('/')
assert 'No entries here so far' in rv.data

注意:测试的函数都是以test开始的,这样unittest就会自动识别这些用于测试的函数并运行它们。通过使用self.app.get()可以给制定的URL发送HTTP GET请求,其返回的是一个~flask.Flask.reponse_class对象,通过检查其data属性来检测其返回值

由于平时经常使用Python,所以想使用Python的微框架开发一些简单的工具来提升工作效率,特此记录一下学习Flask的笔记。

开发环境

本机系统: Ubuntu 14.04 64bits
所有的学习第一步就是配置开发环境,还好Ubuntu自带了Python,这里采用的虚拟Python配置,

1
$ sudo apt-get install python-virtualenv

或者使用pip安装:

1
$ $ sudo pip install virtualenv

如果你是Windows用户,并且你已经安装了pip工具,则去掉sudo在cmd命令下也可以安装
然后创建一个包含venv的文件夹的项目文件夹,项目就建在这里面

1
2
3
$ sudo mkdir flask_project
$ cd flask_project
$ sudo virtualenv venv

现在每次要使用项目,就可以在flask_project文件夹中运行下面的命令

1
$ . venv/bin/activate

对应的Windows启动命令为

1
$ . venv/Scripts/activate

你现在进入你的virtualenv(注意查看你的shell提示符已经改变了)。每次需要安装包的时候就先激活虚拟环境,下面安装flask

1
pip install flask

**注意:**这里有个地方需要个别强调一下,安装包不能使用sudo权限来安装,如果不带sudo安装提示权限不够,请把flask_project文件夹的权限设置为当前用户,例如,当前用户为zhangsan,在flask_project的父目录执行:

1
sudo chown zhangsan:zhangsan -R flask_project

相应的退出使用:

1
$ deactivate

一个最简单的应用

flask_project文件夹中创建一个文件hello.py

1
2
3
4
5
6
7
8
9
from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():
return 'Hello World!'

if __name__ == '__main__':
app.run()

在命令行或者终端里运行:

1
$ python hello.py

然后在浏览器打开http://127.0.0.1:5000/就可以看到一个最简单的hello world页面了。大概解释一下:

  1. 首先我们导入了Flask类。这个类的实例将会成为我们的WSGI应用,即后面的app
  2. 接着我们创建了这个类的实例。第一个参数是应用模块或者包的名称。如果你使用一个 单一模块(就像本例),那么应当使用__name__,因为名称会根据这个模块是按 应用方式使用还是作为一个模块导入而发生变化(可能是__main__,也可能是 实际导入的名称)。这个参数是必需的,这样Flask就可以知道在哪里找到模板和 静态文件等东西。不明白也没关系,先这么写,后面慢慢会深入研究。
  3. 然后我们使用route()装饰器来告诉Flask触发函数的URL。函数名称可用于生成相关联的URL,并返回需要在用户浏览器中显示的信息。这个和Java里的Controller非常的像,用过Spring的同学对此应该很熟悉。
  4. 最后,使用run()函数来运行本地服务器和我们的应用。if __name__ == '__main__': 确保服务器只会在使用Python解释器运行代码的情况下运行,而不会在作为模块导入时运行。

本机访问没问题,但是局域网的其他机器如果也想访问,则需要这样启动:

1
app.run(host='0.0.0.0')

这行代码告诉你的操作系统监听一个公开的IP 。

调试模式

开发程序不可能这么简单,需要不断的调试,所以Flask也为我们想好了,专门有个调试模式,这样不用重启服务器,代码有变动,会自动重启,并且出错提示信息也很强大,打开调试器的有两种方式:

  1. 通过设置实例的属性
    1
    2
    app.debug = True
    app.run()
  2. 启动传参设置
    1
    app.run(debug=True)
    **注意:**调试器很强大,可以执行任意代码,所以千万不要在项目上线之后打开调试模式。

路由route()

路由装饰器用于将一个函数和一个URL绑定,例如:

1
2
3
4
5
6
7
@app.route('/')
def index():
return 'Index Page'

@app.route('/hello')
def hello():
return 'Hello World'

这只是最简单,最基础的,实际开发中不可能都是这么简单的URL,大多数情况下URL里面有一部分是动态变化的,例如好多URL里面会带ID,表示网页的一个标号。

变量规则

里面可以添加变量,converter:variable_name可以为变量加一个转换器,目前只支持int,float,path最后一个为路径,接受/,例如:

1
2
3
4
5
6
7
8
9
@app.route('/user/<username>')
def show_user_profile(username):
# 通过获取URL的user_name作为函数的传入参数
return 'User %s' % username

@app.route('/post/<int:post_id>')
def show_post(post_id):
# 传入post_id,并且必须是一个整数
return 'Post %d' % post_id

唯一URL/重定向问题

看下面一个例子:

1
2
3
4
5
6
7
@app.route('/projects/')
def projects():
return 'The project page'

@app.route('/about')
def about():
return 'The about page'

注意一个尾部有斜杠,一个没有。看上去带斜杠的URL很像一个文件夹,当我们访问一个没有带斜杠的URL的时候,为自动加一个斜杠。但是反过来,如果访问第二个URL你在尾部带了一个斜杠,则会报错。可能你有些疑惑,为啥要这么做,答案也很简单:这样在访问一个不带斜杠的URL时,不管真正的URL是否带斜杠我们都可以继续访问URL,并且URL是唯一的。

URL构建

设想一下,你想先设定好页面的访问URL,你需要测试你写的函数能不能被你指定的URL访问到,直接构建这些URL就可以了。url_for它把函数名称作为第一个参数,其余参数对应URL中的变量。未知变量将添加到URL中作为查询参数。 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from flask import Flask, url_for
app = Flask(__name__)
@app.route('/')
def index(): pass

@app.route('/login')
def login(): pass

@app.route('/user/<username>')
def profile(username): pass

with app.test_request_context():
print url_for('index')
print url_for('login')
print url_for('login', next='/')
print url_for('profile', username='John Doe')

运行程序会输出以下内容:

1
2
3
4
/
/login
/login?next=/
/user/John%20Doe

这里用到了一个方法test_request_context(),这个会告诉Flask我们正在处理一个请求,虽然我们并没有真正的去请一个URL.

HTTP方法

访问一个URL有好几个方法,常见的就是get,post方法,缺省情况下一个路由只回应GET请求,也可以像下面这样手动指定:

1
2
3
4
5
6
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
do_the_login()
else:
show_the_login_form()

静态文件

一般就是指css,js文件,如果工程里面要用到这些静态的库,可以在应用的根目录下新建static文件夹,设用选定的static端点就可以生成对应的URL:

1
url_for('static', filename='style.css')

因此该文件对应的在文件系统中的路径应该是static/style.css

渲染模板

这是个很强大的功能,基本上目前的框架都得支持这个,毕竟原生的写HTML很慢,还要考虑各种转义,乱码等,Flask内置Jinja2模板引擎。使用render_template()来渲染模板,只需要把模板的名字和对应的一些参数传进去就行了,例如:

1
2
3
4
5
6
from flask import render_template

@app.route('/hello/')
@app.route('/hello/<name>')
def hello(name=None):
return render_template('hello.html', name=name)

Flask默认会在templates文件夹内寻找模板,如果你的应用是一个模块,则templates文件夹应该和应用在一个目录下,即:

1
2
3
/application.py
/templates
/hello.html

如果你的应用是一个包,则应该在包里面:

1
2
3
4
/application
/__init__.py
/templates
/hello.html

这个就类似预JSP一类的东西,你也可以在模板内部访问**request, session, get_flashed_messages()等,不过比JSP更加强大,因为模板可以实现继承,这样就能保持很多特定的元素重用,保持一致。 默认特殊的一些变量会被转义,例如HTML,如果信任某个变量,也可以使用Markup`类来把它标记为安全的,具体的等学到了再详细研究。

操作请求数据

一个Web应用其实就是服务器相应客户端发来的消息,Flask中由全局对象request来提供请求信息,如果你熟悉Python你就会知道,全局的对象大家都可以访问,其实也并不是通常意义上的全局变量,这个request只是特定环境下的本地对象的一个代理。

请求对象

首先必须从flask模块导入请求对象from flask import request,使用method属性可以操作当前请求的具体方法,form可以处理表单数据,例如:

1
2
3
4
5
6
7
8
9
10
11
@app.route('/login', methods=['POST', 'GET'])
def login():
error = None
if request.method == 'POST':
if valid_login(request.form['username'],
request.form['password']):
return log_the_user_in(request.form['username'])
else:
error = 'Invalid username/password'
# 如果请求访求是 GET 或验证未通过就会执行下面的代码
return render_template('login.html', error=error)

如果form属性中不存在这个键值,会像普通集合那样抛出一个KeyError的异常。如果不捕获的话,就会显示一个HTTP 400 Bad Request错误页面,虽然你可以不用处理这个异常,但是显然不是很友好,所以推荐捕获异常,然会一个预定义好的错误页面.
如果要处理URL中的参数,可以使用request.args.get('key', '')来获取.

文件上传

使用Flask实现文件上传很简单,需要在HTML表单中设置enctype="multipart/form-data属性即可,上传的文件被存储在内存或者文件系统临时位置,通过请求对象files属性可以访问上传文件,看下面的例子:

1
2
3
4
5
6
7
from flask import request

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
f = request.files['the_file']
f.save('/var/www/uploads/uploaded_file.txt')

要想知道文件上传之前在客户端系统的名字,可以使用filename属性,但是这个可以伪造,所以建议通过Werkzeug提供的secure_filename()函数:

1
2
3
4
5
6
7
8
from flask import request
from werkzeug import secure_filename

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
f = request.files['the_file']
f.save('/var/www/uploads/' + secure_filename(f.filename))

Cookies

对于Web应用来说,这个必不可少,要访问cookeis,可以使用cookies属性。通过请求对象的set_cookies方法设置cookies,请求对象的cookies包含了客户端的所有cookies字典,所以这很不安全,能使用会话就不要直接使用cookies.

  • 读cookies:
    1
    2
    3
    4
    5
    6
    7
    from flask import request

    @app.route('/')
    def index():
    username = request.cookies.get('username')
    # 使用 cookies.get(key) 来代替 cookies[key] ,
    # 以避免当 cookie 不存在时引发 KeyError 。
  • 存储cookies:
    1
    2
    3
    4
    5
    6
    7
    from flask import make_response

    @app.route('/')
    def index():
    resp = make_response(render_template(...))
    resp.set_cookie('username', 'the username')
    return resp
    **注意:**cookies设置在响应对象上,通常只是视图函数返回字符串,Flask会把它们转化为响应对象,显示的转化可以使用make_response()函数,然后再修改对应的值.

重定向

redirect()函数可以重定向,abort()可以更早的退出请求,返回错误码:

1
2
3
4
5
6
7
8
9
10
from flask import abort, redirect, url_for

@app.route('/')
def index():
return redirect(url_for('login'))

@app.route('/login')
def login():
abort(401)
this_is_never_executed()

401表示一个无法访问的页面,缺省情况下每种错误代码都会对应显示一个黑白的出错页面。使用errorhandler()装饰器可以定制出错页面:

1
2
3
4
5
from flask import render_template

@app.errorhandler(404)
def page_not_found(error):
return render_template('page_not_found.html'), 404

关于响应

视图函数的返回值会自动转化为一个响应对象。如果你返回一个字符串,那么就会被转换为一个响应对象,其中包含这个字符串以及一些其他的必要信息,如果想在视图内部掌控响应对象的结果,可以使用一个make_response()函数进行强制转换,例如原始视图:

1
2
3
@app.errorhandler(404)
def not_found(error):
return render_template('error.html'), 404

可以手动包装,修改某些特定的内容,例如头部信息:

1
2
3
4
5
@app.errorhandler(404)
def not_found(error):
resp = make_response(render_template('error.html'), 404)
resp.headers['X-Something'] = 'A value'
return resp

会话

除了请求对象和响应对象之外,还有个session这个对象,这个主要是用来在不同请求之间存储信息的。你可以简单理解为用密钥签名加密的cookie,cookie都可以查看,但是如果没有密钥就无法修改。所以使用会话之前必须设置一个密钥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from flask import Flask, session, redirect, url_for, escape, request

app = Flask(__name__)

@app.route('/')
def index():
if 'username' in session:
return 'Logged in as %s' % escape(session['username'])
return 'You are not logged in'

@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
session['username'] = request.form['username']
return redirect(url_for('index'))
return '''
<form action="" method="post">
<p><input type=text name=username>
<p><input type=submit value=Login>
</form>
'''

@app.route('/logout')
def logout():
# 如果会话中有用户名就删除它。
session.pop('username', None)
return redirect(url_for('index'))

# 设置密钥,复杂一点:
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'

escape()可以用来转义,如果你使用模板就简单的多,不用管这些。
生成密钥要保证做够随机,例如根据操作系统来生成一个密钥:

1
2
import os
os.urandom(24)

基于cookies的会话,Flask其实是把会话对象里面的值都放在了cookies里面,如果你访问的会话里没有对应的属性。

消息闪现

啥意思?简单来说就是在你请求结束的时候记录一个消息,然后你下次再请求的时候可以使用,然后用完就销毁了,所以叫闪现。flash()用于闪现一个消息,get_flashed_messages()来操作一个消息,具体的不介绍了,这个用的不是很多,用到再研究。

日志

一个Web应用当然少不了日志,万一崩了,直接看日志定位问题,使用也很简单:

1
2
3
app.logger.debug('A value for debugging')
app.logger.warning('A warning occurred (%d apples)', 42)
app.logger.error('An error occurred')

需求背景

最近为公司开发了一套邮件日报程序,邮件一般就是表格,图片,然后就是附件。附件一般都是默认写到txt文件里,但是PM希望邮件里的附件能直接用Excel这种软件打开,最开始想保存为Excel,但是一想Excel的文件体积会多出好多倍,csv文件默认也是使用Excel打开的,但是根本还是文本文件,体积小,保存也方便,于是最终决定使用csv模块来保存文件。

Python写csv文件

Python提供了内置模块读写csv文件,这里我只用到了写,读这里就不做介绍了,也不难,主要是解决乱码问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def save2csv(file_name=None, header=None, data=None):
"""
保存成CSV格式文件,方便Excel直接打开
:param file_name: 保存的文件名
:param header: 表头,每一列的名字
:param data: 具体填充数据
:return:
"""
if file_name is None or isinstance(file_name, basestring) is False:
raise Exception('保存CSV文件名不能为空,并且必须为字符串类型')

if file_name.endswith('.csv') is False:
file_name += '.csv'

file_obj = open(file_name, 'wb')
file_obj.write(codecs.BOM_UTF8) # 防止乱码
writer = csv.writer(file_obj)

if data is None or isinstance(data, (tuple, list)) is False:
raise Exception('保存CSV文件失败,数据为空或者不是数据类型')

if header is not None and isinstance(header, (tuple, list)) is True:
writer.writerow(header)

for row in data:
writer.writerow(row)

**注意:**有三句话就是为了防止乱码的

1
2
3
file_obj = open(file_name, 'wb')
file_obj.write(codecs.BOM_UTF8) # 防止乱码
writer = csv.writer(file_obj)

在文件头部写入codecs.BOM_UTF8就能防止乱码了,文件都是utf-8编码格式的。

Hive去重统计

相信使用Hive的人平时会经常用到去重统计之类的吧,但是好像平时很少关注这个去重的性能问题,但是当一个表的数据量非常大的时候,会发现一个简单的count(distinct order_no)这种语句跑的特别慢,和直接运行count(order_no)的时间差了很多,于是研究了一下。
**先说结论:**能使用group by代替distinc就不要使用distinct,例子:

实际论证

order_snap为订单的快照表 总记录条数763191489,即将近8亿条记录,总大小:108.877GB,存储的是公司所有的订单信息,表的字段大概有20个,其中订单号是没有重复的,所以在统计总共有多少订单号的时候去重不去重结果都一样,我们来看看:
统计所有的订单有多少条条数,一个count函数就可以搞定的sql性能如何。

  • DISTINCT
1
2
3
4
5
6
7
select count(distinct order_no) from order_snap;
Stage-Stage-1: Map: 396 Reduce: 1 Cumulative CPU: 7915.67 sec HDFS Read: 119072894175 HDFS Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 0 days 2 hours 11 minutes 55 seconds 670 msec
OK
_c0
763191489
Time taken: 1818.864 seconds, Fetched: 1 row(s)
  • GROUP BY
1
2
3
4
5
6
7
8
select count(t.order_no) from (select order_no from order_snap group by order_no) t;
Stage-Stage-1: Map: 396 Reduce: 457 Cumulative CPU: 10056.7 sec HDFS Read: 119074266583 HDFS Write: 53469 SUCCESS
Stage-Stage-2: Map: 177 Reduce: 1 Cumulative CPU: 280.22 sec HDFS Read: 472596 HDFS Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 0 days 2 hours 52 minutes 16 seconds 920 msec
OK
_c0
763191489
Time taken: 244.192 seconds, Fetched: 1 row(s)

**结论:**第二种写法的性能是第一种的7.448499541
注意到为什么会有这个差异,Hadoop其实就是处理大数据的,Hive并不怕数据有多大,怕的就是数据倾斜,我们看看两者的输出信息:

1
2
3
4
5
# distinct
Stage-Stage-1: Map: 396 Reduce: 1 Cumulative CPU: 7915.67 sec HDFS Read: 119072894175 HDFS Write: 10 SUCCESS
# group by
Stage-Stage-1: Map: 396 Reduce: 457 Cumulative CPU: 10056.7 sec HDFS Read: 119074266583 HDFS Write: 53469 SUCCESS

发现猫腻了没有,使用distinct会将所有的order_no都shuffle到一个reducer里面,这就是我们所说的数据倾斜,都倾斜到一个reducer这样性能能不低么?再看第二个,直接按订单号分组,起了457个reducer,将数据分布到多台机器上执行,时间当然快了.
由于没有手动指定Reduce的个数,Hive会根据数据的大小动态的指定Reduce大小,你也可以手动指定

1
hive> set mapred.reduce.tasks=100;

类似这样,所以如果数据量特别大的情况下,尽量不要使用distinct吧。
但是如果你想在一条语句里看总记录条数以及去重之后的记录条数,那没有办法过滤,所以你有两个选择,要么使用两个sql语句分别跑,然后union all或者就使用普通的distinct。具体来说得看具体情况,直接使用distinct可读性好,数据量如果不大的话推荐使用,如果数据太大了,性能受到影响了,再考虑优化。

开发工作中,经常需要使用ssh来登录服务器,密码一般都是机器生成的,特别难记,关键是机器多了这样也麻烦,所以配置一下ssh面密码登录,节省时间。

系统说明

本地机器:Ubuntu
服务器:CentOS

大概流程

需要服务器和本机都做一定的配置才能免密码登陆

本地机器配置

  1. 通过ssh-keygen产生RSA公私密钥对
1
$ ssh-keygen`

然后一路回车,不要输入任何密码和字符,最后在~/.ssh/文件夹中会生成两个文件id_rsaid_rsa.pub两个文件,然后需要修改问年间权限

1
$ chmod 775 ~/.ssh

id_rsa.pub上传到服务器的~/.ssh/文件夹下,其实也不用上传,直接把这个文件的内容复制然后在服务器的对应文件夹下创建文件,然后写入也是一样的。
2. 在~/.ssh/文件夹里创建配置文件config

1
2
3
4
Host my_server // 服务器别名
HostName 192.168.1.120 // 服务器ip
User root //登录用户名
Port 22 // ssh端口

服务器机器配置

  1. 修改/etc/ssh/sshd_config文件,将下面几行前面的注释去掉
    1
    2
    3
    RSAAuthentication yes
    PubkeyAuthentication yes
    AuthorizedKeysFile %h/.ssh/authorized_keys
    如果已经注释掉了就不用配置则这里了,
  2. 在用户目录下创建.ssh文件夹,如果有就不用创建了,具体路径为~/.ssh/
    然后在~/.ssh/文件夹下面创建authorized_keys文件,并将之前上传到服务器上的id_rsa.pub文件里的内容拷贝到authorized_keys中,保存之后重启ssh服务
1
2
3
4
$ cd ~/.ssh/
$ sudo cat id_rsa.pub > authorized_keys
$ sudo chmod 644 authorized_keys
$ sudo service sshd restart

本机测试

通过终端连接服务器

1
$ ssh my_server

**注意:**如果这一步出现bad owers等错误,记得要修改本机的文件权限

1
2
$ chmod 700 ~/.ssh
$ chmod go+rwx ~/.ssh/*
0%