离线下载
PDF版 ePub版

qyuhen · 更新于 2018-11-28 11:00:43

操作系统

time

Unix-Like 系统使用自基准点以来消逝的秒数来表达绝对时间。

  • 绝对时间: 某个绝对精确的时间值。如 2010-11-1 13:48:05 。
  • 相对时间: 相对于某个时间的前后差。如 5分钟以前。
  • epoch: 基准点。1970-01-01 00:00:00 UTC。
  • UTC: 协调世界时。世界不同时区的一个基准,比如中国为 UTC+8。
  • DST: 阳光节约时 (夏时制)。好在我国已经取消了,真麻烦。

用 time() 返回自 epoch 以来的秒数,gmtime()、localtime() 将其转换为 struct_time 结构体。

>>> from time import *

>>> t = time()
>>> t
1357761634.903692

>>> gmtime(t)   # epoch -> UTC
time.struct_time(tm_year=2013, tm_mon=1, tm_mday=9, tm_hour=20, tm_min=0, tm_sec=34,
tm_wday=2, tm_yday=9, tm_isdst=0)

>>> localtime(t)   # epoch -> Local (UTC+8)
time.struct_time(tm_year=2013, tm_mon=1, tm_mday=10, tm_hour=4, tm_min=0, tm_sec=34,
tm_wday=3, tm_yday=10, tm_isdst=0)

将 struct_time 转回 epoch。

>>> from calendar import timegm

>>> t = time()
>>> t
1357762219.162796

>>> utc = gmtime(t)  # epoch -> UTC
>>> timegm(utc)   # UTC -> epoch
1357762219

>>> local = localtime(t)  # epoch -> local
>>> mktime(local)   # local -> epoch
1357762219

与 datetime 的转换,注意返回的是 localtime 时间。

>>> from datetime import datetime
>>> from time import time

>>> t = time()

>>> d = datetime.fromtimestamp(t)  # localtime 时间
>>> d
datetime.datetime(2013, 1, 10, 4, 20, 27, 301148)

>>> d.timetuple()
time.struct_time(tm_year=2013, tm_mon=1, tm_mday=10, tm_hour=4, tm_min=20, tm_sec=27,
tm_wday=3, tm_yday=10, tm_isdst=-1)

相关函数:

ctime: 将 epoch 转换为字符串。 asctime: 将 struct_time 转换为字符串。

>>> t = time()

>>> ctime(t)
'Thu Jan 10 04:26:01 2013'

>>> asctime(localtime(t))
'Thu Jan 10 04:26:01 2013'

clock: 返回当前进程消耗的CPU时间 (秒)。 sleep: 暂停进程 (秒,可以是小数,以便设置毫秒、微秒级暂停)。

>>> clock()
0.56022400000000006

>>> sleep(0.1)

strftime: 将 struct_time 格式化为字符串。 strptime: 将字符串格式化为 struct_time。

>>> t = time()

>>> s = strftime("%Y-%m-%d %H:%M:%S", localtime(t))
>>> s
'2013-01-10 04:27:39'

>>> strptime(s, "%Y-%m-%d %H:%M:%S")
time.struct_time(tm_year=2013, tm_mon=1, tm_mday=10, tm_hour=4, tm_min=27, tm_sec=39,
tm_wday=3, tm_yday=10, tm_isdst=-1)

timezone: 与 UTC 的时差。 tzname: 当前时区名称。

>>> timezone / 3600
-8

>>> tzname    # 北京时间,China Standard Time
('CST', 'CST')

threading

尽管因为 GIL 的缘故,Python 多线程一直遭受种种非议。但作为多个并发执行流程,多线程是无法完全用 "手工" 切换的协程来替代的。

Thread

创建 Thread 实例,传入待执行函数。

>>> from threading import Thread, currentThread, activeCount

>>> def test(s):
...     print "ident:", currentThread().ident
...     print "count:", activeCount()
...     print s
...

>>> Thread(target = test, args = ("Hello",)).start()
ident: 4353970176
count: 3
Hello

除了标识符,还可以线程取个名字,这有助于调试。

还可以继承 Thread 实现自己的线程类。

>>> class MyThread(Thread):
...     def __init__(self, name, *args):
...         super(MyThread, self).__init__(name = name)
...         self.data = args
...
...     def run(self):
...         print self.name, self.data

>>> MyThread("abc", range(10)).start()
abc ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],)

将线程 daemon 属性设为 True,那么表示这是一个背景线程,进程退出时不会等待该线程结束。

调用 join() 等待线程结束,可提供超时参数 (秒,浮点数设定更小粒度)。isAlive() 检查线程状态,join() 可多次调用。

>>> from time import sleep

>>> def test():
...     print "__thread_start__"
...     sleep(10)
...     print "__thread_exit__"

>>> def run():
...     t = Thread(target = test)
...     t.start()
...     t.join(2)   // 超时
...
...     print t.isAlive() // 检查状态
...     t.join()   // 再次等待
...
...     print "over"

>>> run()
__thread_start__
True
__thread_exit__
over!

Lock

Lock 不支持递归加锁,也就是说即便在同一线程中,也必须等待锁释放。通常建议改用 RLock,它会处理 "owning thread" 和 "recursion level" 状态,对于同一线程的多次请求锁行为,只累加计数器。每次调用 release() 将递减该计数器,直到 0 时释放锁,因此 acquire() 和 release() 必须要成对出现。

threading 中的成员大多实现了上下文协议,尽可能用 with 代替手工调用。

>>> lock = RLock()

>>> def show(i):
...     with lock:     // 递归请求锁
...         print currentThread().name, i
...         sleep(0.1)

>>> def test():
...     with lock:     // 加锁
...         for i in range(5):
...             show(i)

>>> for i in range(2):
...     Thread(target = test).start()

Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3
Thread-1 4
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4

Event

Event 通过通过一个内部标记来协调多线程运行。方法 wait() 阻塞线程执行,直到标记为 True。set() 将标记设为 True,clear() 更改标记为 False。isSet() 用于判断标记状态。

>>> def test():
...     e = Event()
...     def test():
...         for i in range(5):
...             e.wait()
...             e.clear()
...             print i
...
...     Thread(target = test).start()
...     return e

>>> e = test()

>>> e.set()
0
>>> e.set()
1

如果不调用 clear(),那么标记一直为 True,wait() 就不会发生阻塞行为。

在实际编程中,我们通常为每个线程准备一个独立的 Event,而不是多个线程共享,以避免未及时调用 clear() 时发生意外情况。

Condition

Condition 像 Lock 和 Event 的综合体,除基本的锁操作外,还提供了类似 yield 的功能。在获取锁以后,可以调用 wait() 临时让出锁,当前线程被阻塞,直到 notify() 发送通知后再次请求锁来恢复执行。将 wait 当做 yield,那么 notify 就是 send。

可以将已有的锁对象传给 Condition。

>>> def t1():
...     with cond:
...         for i in range(5):
...             print currentThread().name, i
...             sleep(0.1)
...             if i == 3: cond.wait()

>>> def t2():
...     with cond:
...         for i in range(5):
...             print currentThread().name, i
...             sleep(0.1)
...         cond.notify()

>>> Thread(target = t1).start(); Thread(target = t2).start()
Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3   // 让出锁
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4   // 重新获取锁,继续执行。

只有获取锁的线程才能调用 wait() 和 notify(),因此必须在锁释放前调用。

当 wait() 释放锁后,其他线程也可进入 wait 状态。notifyAll() 激活所有等待线程,让它们去抢锁然后完成后续执行。

>>> def test():
...     with cond:
...         for i in range(5):
...         print currentThread().name, i
...         sleep(0.1)
...         if i == 2: cond.wait()

>>> Thread(target = t1).start(); Thread(target = t1).start()
Thread-1 0
Thread-1 1
Thread-1 2     // Thread-1: 等待
Thread-2 0
Thread-2 1
Thread-2 2     // Thread-2: 等待

>>> with cond: cond.notifyAll()  // 通知所有 cond.wait 线程。
Thread-2 3     // Thread-1 和 Thread-2 再次抢锁以完成后续执行,
Thread-2 4     // 至于谁先抢到,就不好说了。
Thread-1 3
Thread-1 4

Semaphore

Semaphore 通过一个计数器来限制可同时运行的线程数量。计数器表示还可以运行的线程数量,acquire() 递减计数器,release() 则是增加计数器。

>>> sem = Semaphore(2)

>>> def test():
...     with sem:
...         for i in range(5):
...             print currentThread().name, i
...             sleep(0.1)

>>> for i in range(3):
...     Thread(target = test).start()

Thread-1 0   // 1 和 2 同时执行。因为计数器为 0,所以 3 被阻塞。
Thread-2 0
Thread-1 1
Thread-2 1
Thread-1 2
Thread-2 2
Thread-1 3
Thread-2 3
Thread-1 4
Thread-2 4
Thread-3 0   // 1 和 2 释放信号量,3 开始执行。
Thread-3 1
Thread-3 2
Thread-3 3
Thread-3 4

Timer

用一个独立线程在 n 秒后执行某个函数。如定时器尚未执行,可用 cancel() 取消,定时器仅执行一次。

>>> def test():
...     print datetime.datetime.now()

>>> Timer(2, test).start()
2013-03-26 11:06:19.840455

Local

TLS (thread-local storage) 为线程提供独立的存储空间。

>>> data = local()

>>> def test(fn, x):
...     data.x = x
...     for i in range(5):
...         data.x = fn(data.x)
...         print currentThread().name, data.x
...         sleep(0.1)

>>> t1 = (lambda x: x + 1, 0)
>>> t2 = (lambda x: x + "a", "a")

>>> for d in (t1, t2):
...     Thread(target = test, args = d).start()

Thread-1 1
Thread-2 aa
Thread-2 aaa
Thread-1 2
Thread-2 aaaa
Thread-1 3
Thread-2 aaaaa
Thread-1 4
Thread-1 5
Thread-2 aaaaaa

multiprocessing

看上去和 threading 类似,区别在于用进程代替线程。这是规避 GIL,实现多核并发的常用方法。

Process

创建子进程执行指定函数。

from multiprocessing import Process, current_process

def test(*args, **kwargs):
    p = current_process()
    print p.name, p.pid
    print args
    print kwargs

if __name__ == "__main__":
    p = Process(target=test, args=(1, 2), kwargs = {"a": "hello"}, name = "TEST")
    p.start()
    p.join()

输出:

TEST, 2570
(1, 2)
{'a': 'hello'}

方法 start() 创建子进程,然后在新进程中通过 run() 执行目标函数。构建参数 args、kwargs 会传递给目标函数。在父进程中用 join() 等待并获取子进程退出状态,否则会留下僵尸进程,除非父进程先终止。

从下例输出结果,可以看到 init() 在父进程执行,但 run() 已经是子进程了。

class MyProcess(Process):
    def __init__(self):
        print "init:", os.getpid()
        super(MyProcess, self).__init__()

    def run(self):
        print "run:", os.getpid()

if __name__ == "__main__":
    print "parent:", os.getpid()
    p = MyProcess()
    p.start()
    p.join()

输出:

parent: 12093
init: 12093
run: 12094

子进程不会调用退出函数,而且只有后台 (daemon) 进程才可捕获主进程退出信号,默认处理自然是终止子进程。另外,后台进程不能创建新的子进程,这将导致僵尸出现。

from os import getpid
from time import sleep
from signal import signal, SIGTERM
from multiprocessing import Process

def test():
    def handler(signum, frame):
        print "child exit.", getpid()
        exit(0)

    signal(SIGTERM, handler)
    print "child start:", getpid()
    while True: sleep(1)

if __name__ == "__main__":
    p = Process(target = test)
    p.daemon = True   # 必须在 start() 前设置。
    p.start()

    sleep(2)    # 给点时间让子进程进入 "状态"。
    print "parent exit."

输出:

child start: 12185
parent exit.
child exit. 12185

调用 terminate() 会立即强制终止子进程 (不会执行任何清理操作)。有关状态还有: is_alive()、pid、exitcode。

Pool

进程池。用多个可重复使用的后台 (daemon) 进程执行函数,默认数量和 CPU 核相等。

from multiprocessing import Pool

def test(*args, **kwargs):
    print args
    print kwargs
    return 123
if __name__ == "__main__":
    pool = Pool()
    print pool.apply(test, range(3), dict(a=1, b=2))
    pool.close()
    pool.join()

输出:

(0, 1, 2)
{'a': 1, 'b': 2}
123

调用 join() 等待所有工作进程结束前,必须确保用 close() 或 terminate() 关闭进程池。close() 阻止提交新任务,通知工作进程在完成全部任务后结束。该方法立即返回,不会阻塞等待。

使用异步模型时,callback 是可选的。

from multiprocessing import Pool
from time import sleep

def test(*args, **kwargs):
    sleep(2)
    return 123

def callback(ret):
    sleep(2)
    print "return:", ret

if __name__ == "__main__":
    pool = Pool()
    pool.apply_async(test, callback=callback)

    ar = pool.apply_async(test)
    print ar.get()

    pool.close()
    pool.join()

apply_async 返回 AsyncResult 实例,其 get([timeout])、wait()、successful() 等方法可获知任务执行状态和结果。

map() 和 imap() 用于批量执行,分别返回列表和迭代器结果。

from multiprocessing import Pool, current_process

def test(x):
    print current_process().pid, x
    return x + 100

def test2(s):
    print current_process().pid, s

if __name__ == "__main__":
    pool = Pool(3)

    print pool.map(test, xrange(5))
    pool.map(test2, "abc")

输出:

1566 0
1567 1
1566 3
1568 2
1567 4
[100, 101, 102, 103, 104]

1566 a
1568 b
1567 c

参数 chunksize 指定数据分块大小,如果待处理数据量很大,建议调高该参数。

if __name__ == "__main__":
    pool = Pool(3)
    print pool.map(test, xrange(10), chunksize=2)

输出:

1585 0    # 实际输出顺序可能不同。
1585 1
1586 2
1586 3
1587 4
1587 5
1585 6
1585 7
1586 8
1586 9
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

Queue

Queue 是最常用的数据交换方法。参数 maxsize 限制队列中的数据项数量,这会影响 get/put 等阻塞操作。默认值无限制。

通常直接使用 JoinableQueue,其内部使用 Semaphore 进行协调。在执行 put()、task_done()时调整信号量计数器。当 task_done() 发现计数值等于 0,立即通知 join() 解除阻塞。

from Queue import Empty
from multiprocessing import Process, current_process, JoinableQueue

def test(q):
    pid = current_process().pid
    while True:
        try:
        d = q.get(timeout=2)   # 阻塞 + 超时。照顾生产者未及生产情形。
        print pid, d
        q.task_done()
    except Empty:
        print pid, "empty"
        break

if __name__ == "__main__":
    q = JoinableQueue(maxsize=1000)

    map(q.put, range(5))     # 未超出队列容量限制,不会阻塞。
    print "put over"

    for i in range(3):      # 创建多个 consumer。
        Process(target=test, args=(q,)).start()

    q.join()       # 等待任务完成。
    print "task done"

输出:

put over
2127 0
2127 1
2127 2
2127 3
2127 4
task done
2127 empty
2128 empty
2129 empty

或许你会考虑压入同等数量的 None 作为结束标志,但无法保证每个 Consumer 都能获取。

argparse

命令行参数解析模块。原 optparse 已经停止开发,建议替换为 argparse。

parser

ArgumentParser 默认解析来源 sys.argv,也可提供显式参数进行解析。

构造参数中,通常只需关心 description 和 epilog。前者显示程序标题,后者在帮助信息的尾部显示详细的版权、使用描述等。

>>> from argparse import ArgumentParser
>>> parser = ArgumentParser(description="Test Program", epilog="author:qyuhen")
>>> parser.add_argument("-x", help="xxx...")

>>> parser.print_help()
usage: ipython [-h] [-x X]

Test Program

optional arguments:
-h, --help show this help message and exit
-x X xxx...

author:qyuhen

方法 parse_args 显式列表参数可用 string.split 或 shlex.split 分解。

>>> args = parser.parse_args("-x 123".split())
>>> args
Namespace(x='123')

argument

参数分为可选参数 (optional) 和 位置参数 (positional) 两种,前者用指定前缀 (默认是 "-") 标识。

>>> parser = ArgumentParser()
>>> parser.add_argument("-name", help="name...")
>>> parser.add_argument("x", help="x...")

>>> parser.print_help()
usage: ipython [-h] [-name NAME] x

positional arguments:
x x...

optional arguments:
    -h, --help show this help message and exit
    -name NAME name...

>>> parser.parse_args("-name q.yuhen 123".split())
Namespace(name='q.yuhen', x='123')

可选参数名可以有多个,键值间可以有 "=",而且单字符名称的参数键值可以合并。

>>> parser = ArgumentParser()
>>> parser.add_argument('-x', "-XX")

>>> parser.print_help()
usage: ipython [-h] [-x X]

optional arguments:
    -h, --help show this help message and exit
    -x X, -XX X

>>> parser.parse_args("-x 100".split()) # 普通方式
Namespace(x='100')

>>> parser.parse_args("-x=100".split()) # 使用等号
Namespace(x='100')

>>> parser.parse_args("-x100".split())  # 合并键值
Namespace(x='100')

>>> parser.parse_args("-XX 100".split()) # 其他名称
Namespace(x='100')

>>> parser.parse_args("-XX100".split()) # 仅单字符名可以合并
error: unrecognized arguments: -XX100

如果参数值是包含空格的字符串,注意用引号或转义处理。

>>> parser = ArgumentParser()
>>> parser.add_argument("-s")

>>> parser.parse_args(shlex.split("-s='a b c'")) # 不能用 string.split()
Namespace(s='a b c')

>>> parser.parse_args(shlex.split("-s=a\ b\ c"))
Namespace(s='a b c')

可选参数默认返回 None,可用 default 参数或 parser.set_defaults 方法指定默认值。如果参数是必须的,只需设定 required=True 即可。

>>> parser = ArgumentParser()
>>> parser.add_argument("-x", default=123)
>>> parser.add_argument("-y", required=True)

>>> parser.parse_args()
error: argument -y is required

>>> parser.parse_args("-y abc".split())
Namespace(x=123, y='abc')

除非用 dest 指定值存储名称,否则和参数名相同。metavar 用于修改参数值显示标记,默认使用 dest 大写名称。metavar 不会影响 dest 设置。

>>> parser = ArgumentParser()
>>> parser.add_argument("-n", dest="name")
>>> parser.add_argument("-x", dest="x", metavar="value")

>>> parser.print_help()
usage: ipython [-h] [-n NAME] [-x value]

optional arguments:
    -h, --help show this help message and exit
    -n NAME
    -x value

>>> parser.parse_args("-n q.yuhen -x 123".split())
Namespace(name='q.yuhen', x='123')

type 参数用于指定值转换函数,比如内置函数 int、float、file,也可以自定义函数。

>>> parser = ArgumentParser()
>>> parser.add_argument("-x", type=int)
>>> parser.add_argument("-s", type=lambda s: "s:"+s)

>>> parser.parse_args("-x 123 -s abc".split())
Namespace(s='s:abc', x=123)

nargs 指示参数值数量,默认为 1。除具体的数字外,还可以使用通配符。

  • : 0 或 1。
  • *: 0 或 N。
  • +: 1 或 N。
  • REMAINDER: 所有剩下的值。
>>> parser = ArgumentParser()
>>> parser.add_argument("-x", nargs="+")
>>> parser.add_argument("-y", nargs="")
>>> parser.add_argument("n", nargs=2)
>>> parser.add_argument("args", nargs=REMAINDER)

>>> parser.print_help()
usage: ipython [-h] [-x X [X ...]] [-y [Y]] n n ...

positional arguments:
    n
    args

optional arguments:
    -h, --help show this help message and exit
    -x X [X ...]
    -y [Y]
>>> parser.parse_args("-x x1 x2 -y y1 1 2 a b c -xxx".split())
Namespace(args=['a', 'b', 'c', '-xxx'], n=['1', '2'], x=['x1', 'x2'], y='y1')

action 用于指定参数取值行为。

  • store: 默认,仅存储。
  • store_const: 返回 const 或 default 值。
  • store_true/store_false: 返回 True 或 False。
  • append: 合并多个同名参数值。
  • append_const: 合并多个不同名参数的 const 值,注意这些参数的 dest 必须相同。
  • count: 统计参数名出现的次数,常见的就是 -vvvv 这样表示 level 的参数。
  • version: 版本信息.
# 1. store_const: 提供参数时返回 const 值,否则返回 default。

>>> parser = ArgumentParser()
>>> parser.add_argument("-x", action="store_const", const=100)
>>> parser.add_argument("-y", action="store_const", const=100, default=1)

>>> parser.parse_args("-x".split())
Namespace(x=100, y=1)

>>> parser.parse_args("-x -y".split())
Namespace(x=100, y=100)

>>> parser.parse_args()
Namespace(x=None, y=1)
# 2. store_true/store_false: 显式返回指定布尔值,否则返回相反值。

>>> parser = ArgumentParser()
>>> parser.add_argument("-x", action="store_true")
>>> parser.add_argument("-y", action="store_false")

>>> parser.parse_args("-x -y".split())
Namespace(x=True, y=False)

>>> parser.parse_args()
Namespace(x=False, y=True)
# 3. append: 将多个同名参数值合并成列表。

>>> parser = ArgumentParser()
>>> parser.add_argument("-x", action="append")

>>> parser.parse_args("-x 1 -x 2".split())
Namespace(x=['1', '2'])
# 4. append_const: 合并多个不同名参数 const 值,注意所有合并参数的 dest 相同。

>>> parser = ArgumentParser()
>>> parser.add_argument("-x", dest="numbers", action="append_const", const=1)
>>> parser.add_argument("-y", dest="numbers", action="append_const", const=2)

>>> parser.parse_args("-x -y".split())
Namespace(numbers=[1, 2])
# 5. count: 通常用于统计 -vvv 这类 level 参数,只能是单字符名。

>>> parser = ArgumentParser()
>>> parser.add_argument("--verbose", "-v", action="count")

>>> parser.parse_args("-vv".split())
Namespace(v=2)
# 6. version: 显示版本信息。

>>> parser = ArgumentParser()
>>> parser.add_argument('--version', "-V", action='version', version='%(prog)s 2.0')

>>> parser.parse_args("--version".split())
ipython 2.0

choices 用于指定参数取值范围。

>>> parser = ArgumentParser()
>>> parser.add_argument('-x', choices=range(1, 5), type=int)
>>> parser.add_argument('-s', choices=("a", "b"))

>>> parser.print_help()
usage: ipython [-h] [-x {1,2,3,4}] [-s {a,b}]

optional arguments:
    -h, --help show this help message and exit
    -x {1,2,3,4}
    -s {a,b}

>>> parser.parse_args("-x 2 -s a".split())
Namespace(s='a', x=2)

>>> parser.parse_args("-x 6".split())
error: argument -x: invalid choice: 6 (choose from 1, 2, 3, 4)

>>> parser.parse_args("-s abc".split())
error: argument -s: invalid choice: 'abc' (choose from 'a', 'b')

group

如果参数较多,分组显示更便于查看。

>>> parser = ArgumentParser()

>>> group1 = parser.add_argument_group("group1", "group1 description...")
>>> group1.add_argument("x", help="xxx...")

>>> group2 = parser.add_argument_group("group2", "group2 description...")
>>> group2.add_argument("y", help="yyy...")
>>> group2.add_argument("z", help="zzz...")

>>> parser.print_help()
usage: ipython [-h] x y z

optional arguments:
    -h, --help show this help message and exit

group1:
    group1 description...

    x           xxx...

group2:
    group2 description...
    y           yyy...
    z           zzz...

组的另外一个作用就是互斥,仅允许组中的一个参数出现。可对组设置 required=True。

>>> parser = ArgumentParser()

>>> group = parser.add_mutually_exclusive_group(required=True)
>>> group.add_argument("-x")
>>> group.add_argument("-y")

>>> parser.print_help()
usage: ipython [-h] (-x X | -y Y)

optional arguments:
    -h, --help show this help message and exit
    -x X
    -y Y
>>> parser.parse_args("-x 100 -y 200".split())
error: argument -y: not allowed with argument -x

>>> parser.parse_args("-x 100".split())
Namespace(x='100', y=None)

>>> parser.parse_args("-y 200".split())
Namespace(x=None, y='200')

ctypes

标准库 ctypes 模块可以非常方便地调用动态库 (.so),这有助于解决安全和性能问题。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int add(int x, int y)
{
    return x + y;
}

void inc(int* x)
{
    *x += 1;
}

void cprint(char* s)
{
    printf("%s: %s\n", __func__, s);
}

编译:

$ gcc -fPIC -shared -o test.so test.c

测试:

>>> from ctypes import *
>>> so = cdll.LoadLibrary("./test.so")

>>> so.add(10, 20)
30

>>> so.cprint("Hello, World")
cprint: Hello, World
22

>>> x = c_int(123)
>>> so.inc(byref(x))    # 传入指针
124
>>> x
c_int(124)

当然也可以直接调用系统库的函数。

>>> libc = cdll.LoadLibrary("libc.dylib") # Linux: libc.so.6

>>> libc.printf("Hi\n")
Hi
4

>>> import time
>>> time.time(), libc.time()
(1364284691.803043, 1364284691)
上一篇: 数据存储 下一篇: 进程通信