2008-02-18
Coroutine在并发程序中的应用
关键字: 并发 协程 coroutine
理论知识太少,表达能力太差,简单发点自己的总结,有意见请不要给我面子
下面所列的几个场景只是我接触过的几个,并不是最适合用来说明Coroutine应用的,如果你有更好的例子或是反对意见,我希望听到不同的声音。我对Coroutine的理解只是皮毛,并没有大规模应用过,也想听到一些应用经验。
首先我得承认我对于并发、异步、并行、分布式这些概念毫无兴趣,我甚至讨厌理论相关的东西,我也不能给它们做出准确的解释,我只是不断寻找更简单的编码方式。
并发应用场景
场景一:基于状态机的并发协议处理
状态机通常用于把一段处理过程分解成有限个稳定的状态,在处理过程中在这些状态间切换。举一个简单的协议例子,协议格式就是2字节长度前缀加上这个长度指定的协议内容。
这个例子比较简单,某些情况下可能会增加一些复杂性,比如这个例子中缓冲区可能为空,这时候可能会需要暂停或中断处理过程,接收新数据以后继续处理过程。
在并发系统中通常要避开recv这种阻塞操作,如果recv()无法立即获得数据,可能导致整个线程挂起,影响系统并发能力。所以并发系统经常使用事件驱动方式,遇到这种情况时不是主动接收数据,而是设置状态并返回,当收到新数据时再次调用处理过程,从上次的状态接着处理。
当然并非必须这样做,对于并发数目有限的系统来说,完全可以开启多个线程来运行阻塞处理,没有什么不妥,也是很高效的方式。多线程方式还可以编写更简单的处理过程:
对于高并发系统来说,操作系统线程资源是有限的,不能无限开启,这就限制了这种每连接一线程的并发能力,这种服务器只能同时服务少数几个客户,一旦遇到慢客户,其它客户就无法得到正常服务了。这和客服电话很相似,接线人员有限,高峰时很容易占线。
对于高并发应用,多线程和基于事件方式不能使用相同代码来实现,状态机处理部分也显得繁琐。
场景二:异步IO操作
异步IO操作通常是发起一个IO请求,由操作系统完成以后回调指定的方法或者使用其它方式通知,还有可能需要在回调方法里发起下一次请求。使用异步IO几乎总是需要把一个同步逻辑过程分解成多个繁琐的异步调用,代码量和开发难度以及BUG数目都很容易上升。由于同步过程被分解为多段无法顺序处理的过程,所以异步IO通常还需要和状态机一起使用,这也增加了使用它的难度。
异步IO的例子一般比较长,这里就不举例了,有兴趣可以看一下aio/iocp。
场景三:高并发网络服务器
高并发服务器除了要处理场景一的情况外,可能还要结合场景二,多线程方案有时候完全不能接受,更多的是基于事件、异步IO或者是混合事件和多线程的模型。
WEB服务器是很好的例子,早期的实现一般是多线程或多进程,最近几年的实现大部分是混合事件和多线程。
如果不涉及到耗时的磁盘IO操作,并发服务器只需要使用基于事件的方式即可,比如memcached这类应用,单线程处理就可以达到很高效率。memcached为了充分利用多CPU的处理能力,也使用多线程来处理逻辑,一个线程负责接受连接,接受到的连接被分配给其中一个线程来处理。(题外话:memcached的SMP实现其实不算高效,大量的线程加锁都是完全可以避免的,改进的余地还很大)
这里主要关注并发服务器的基于IO事件处理部分,因为多线程通常是为了处理无法避免的阻塞调用或者是提升在SMP环境下的性能。常用的并发网络设计模式有Reactor/Proactor,分别用于处理IO事件和异步IO,但也可以反过来,比如Reactor + 线程池来模拟Proactor,Proactor + 自定义事件通知方式来模拟Reactor。
场景四:客户端并发应用
客户端并发很少被提及,这里我给出一个项目中实际遇到的问题。
很多大型网站是使用Socket通讯来调用后台的业务中间件的,为了扩展、重用和效率方面的需要,业务被分布在多台服务器,一个页面就可要调用用户资料、积分人气、统计信息、好友关系、邮件等多台中间件服务器。如果顺序调用,每个业务平均耗时20ms,一次请求5个调用就要耗掉100ms。看似很短的时间,但这100ms是花费在等待socket数据上,很可能WEB服务器负载并不高,但并发处理能力即上不去,如果这5个调用能同时进行,一共只需要20ms就完成了。既然无法避免占用线程 (CGI、PHP都需要进程),那么占用线程时间越短,越能够提高服务器的并发能力。
我们可以使用非阻塞connect和select来解决这个问题,先发起5个非阻塞connect,然后一起加入select,再结合状态机处理协议交互部分,当几个调用都结束时就可以渲染到页面了(View部分)。随着业务越来越复杂,更多协议需要状态机处理,有些业务之间还有依赖关系,这种方式编写也是很复杂的。
二、Coroutine的处理方式
Coroutine处理以上几种情况都有独到之处。
一、Coroutine处理并发状态机协议
不妨直接把上面多线程的例子拿过来用:
这和多线程的代码一样,但它不阻塞线程。recv可以实现为:
它具有几个优点:
1、处理逻辑连贯性
2、几乎无限的并发。由于运行在少数几个或一个线程内,并发数仅限于内存。
3、统一的调用界面。调用代码完全看不出来非阻塞或异步IO的影子
二、Coroutine处理异步IO
由上面的例子可以想象,只要把异步IO纳入到事件系统中来,就可以和上面一样来处理。
IoLanguage就实现了一个AsyncRequest库,把aio平滑地整合进来,调用者还是像调用同步IO一样使用异步IO。
这段简单的代码如果使用同步IO是很容易实现的,如果是异步IO,又没有现成的框架,可以想象一下需要多少代码量?如果逻辑再复杂一些?
异步IO与Coroutine并不直接整合,需要把异步IO的通知或者是回调转化成Coroutine调度,所以这部分需要调度器来完成。
三、Coroutine高并发服务器编程
Reactor模式是经典的基于事件的并发模式,使用Coroutine处理并发则可以Erlang为例,IoLanguage也有简单的处理方法。
Coroutine是什么
Coroutine又被称为协程,是一种用户级线程,不由操作系统调度。协程之间切换是在线程内进行的,开销比较小,通常又叫作轻量级线程。Windows上的Fiber和Linux上的ucontext都是coroutine的实现,也有第三方库实现的,比如跨平台的Coroutine库PCL,IoLanguage使用的LibCoroutine。
Coroutine的调度需要用户自己处理,Coroutine执行过程中不可被打断,只能由过程自己交出执行权,这时候上下文状态被保存。用户可以控制执行另一个Coroutine,也可以在任意时候恢复一个暂停的Coroutine。
Coroutine的调度是友好的,所以不需要使用临界区。本质上它只是运行在线程内,所以对多CPU的利用不够充分,需要自己实现多线程的调度器。
Erlang和IoLanguage的比较
Erlang在底层实现了的轻量级进程和消息模型,对应于Coroutine和Actor模型。Erlang实现了多线程的调度器以充分利用CPU,但在某些情况下反而会降低性能,比如频繁的进程间消息传递。
IoLanguage底层只实现了Coroutine,是作为库来提供的,它也在库里面实现了简单的调度器,并通过简单的语法实现了异步调用操作。
Erlang鼓励你创建进程,使用进程来模拟对象,使用消息来处理它们之间的交互。IoLanguage没有建议你做这种底层操作,它认为你应该把把这看成是异步操作,有些异步操作是不需要获得返回值的,有些异步操作需要等待返回值。
IoLanguage并没有和Erlang对应的消息模型,当然这不意味着它没办法做到,这只需要在调度器上扩展就能完成,只是大部分情况下不需要这样,即便是Erlang也会建议你减少这种“脏”代码。Erlang里面扩展核心模块很不方便,所以消息模型直接集成在语言平台上(效率是另一个原因,但我比较怀疑),而IoLanguage这种扩展能力很强的语言则尽量保持语言核心的小巧。
实现Erlang这样的并发平台并非只能在VM这一层完成,所有支持Coroutine的语言都可以使用库来完成一个类似模型系统,这只需要实现一个调度器。有些扩展能力不强的语言需要你重写一整套库来适应,有些语言则不然,它可以简单地替换一些关键部分就能容纳这个新的模型。
我给个简单的例子说明Erlang在处理某些并发情况时的局限性,依旧是上面的客户端并发例子,我需要并行调用几个远程服务来减少等待时间。
先看一个Erlang的例子:
看起来挺不错的,不过缺点在于我需要等待几个调用全部结束才会转到页面渲染部分,并且我很不喜欢这个pmap,我只是要调用几个服务,为什么要接触这个东西?
(如果你有更好的做法请告诉我)
再看个IoLanguage的例子:
调用很简单,返回的都是一个Proxy对象,只有你使用它时,它才会等待调用结果。
IoLanguage的异步实现还有一个好处,如果get_mails调用比较慢,比如需要30ms,其它几个调用各需要20ms,如果我们的mails部分显示在页面的最下方,那么可以在20ms-30ms之间这段时间可以先把mails以上的页面部分给渲染出来,完全不用等待所有调用一起结束,这可能会让页面渲染节省几ms,当然它没有节省CPU占用时间,但减少了线程被占用的时间。使用Erlang我知道也可以做到这一点,但绝对不是这么自然的方式。
Erlang给了我们一个强大的并发平台,但却强行让我们使用FP(据说全世界仅1.5%的程序员使用FP
),还要求我们处处去处理这种底层的并发、并行、消息。
Ruby并发应用的前景
前面大着胆子下了个断言:所有支持Coroutine的语言都可以使用库来完成一个并发模型系统,我坚信这一点。Ruby1.9也实现了Fiber,所以只需要实现一个调度器就可以完成一个简单的并发模型。Ruby的扩展能力比IoLanguage有过之而无不及,所以我相信使用Ruby也可以实现一个像IoLanguage这样的并发、异步框架。
下面所列的几个场景只是我接触过的几个,并不是最适合用来说明Coroutine应用的,如果你有更好的例子或是反对意见,我希望听到不同的声音。我对Coroutine的理解只是皮毛,并没有大规模应用过,也想听到一些应用经验。
首先我得承认我对于并发、异步、并行、分布式这些概念毫无兴趣,我甚至讨厌理论相关的东西,我也不能给它们做出准确的解释,我只是不断寻找更简单的编码方式。
并发应用场景
场景一:基于状态机的并发协议处理
状态机通常用于把一段处理过程分解成有限个稳定的状态,在处理过程中在这些状态间切换。举一个简单的协议例子,协议格式就是2字节长度前缀加上这个长度指定的协议内容。
while(state != END)
{
switch(state)
{
case BEGIN:
// 解析包长字段
state = GET_BODY;
break;
case GET_BODY:
// 解析包内容
// 处理包内容
state = END;
break;
}
}
这个例子比较简单,某些情况下可能会增加一些复杂性,比如这个例子中缓冲区可能为空,这时候可能会需要暂停或中断处理过程,接收新数据以后继续处理过程。
while(state != END)
{
switch(state)
{
case BEGIN:
// 解析包长字段
state = GET_BODY;
break;
case GET_BODY:
// 解析包内容
// 处理包内容
state = END;
break;
}
if (buffer.empty())
buffer.append(recv());
}
在并发系统中通常要避开recv这种阻塞操作,如果recv()无法立即获得数据,可能导致整个线程挂起,影响系统并发能力。所以并发系统经常使用事件驱动方式,遇到这种情况时不是主动接收数据,而是设置状态并返回,当收到新数据时再次调用处理过程,从上次的状态接着处理。
while(state != END)
{
switch(state)
{
case BEGIN:
// 解析包长字段
state = GET_BODY;
break;
case GET_BODY:
// 解析包内容
// 处理包内容
state = END;
break;
}
if (buffer.empty())
return NEED_RECV;
}
当然并非必须这样做,对于并发数目有限的系统来说,完全可以开启多个线程来运行阻塞处理,没有什么不妥,也是很高效的方式。多线程方式还可以编写更简单的处理过程:
while(connected())
{
buffer = recv(2); // 接收2节字长度
int packlen = *(short*)buffer.c_str();
package = recv(packlen);
// 处理包内容
}
对于高并发系统来说,操作系统线程资源是有限的,不能无限开启,这就限制了这种每连接一线程的并发能力,这种服务器只能同时服务少数几个客户,一旦遇到慢客户,其它客户就无法得到正常服务了。这和客服电话很相似,接线人员有限,高峰时很容易占线。
对于高并发应用,多线程和基于事件方式不能使用相同代码来实现,状态机处理部分也显得繁琐。
场景二:异步IO操作
异步IO操作通常是发起一个IO请求,由操作系统完成以后回调指定的方法或者使用其它方式通知,还有可能需要在回调方法里发起下一次请求。使用异步IO几乎总是需要把一个同步逻辑过程分解成多个繁琐的异步调用,代码量和开发难度以及BUG数目都很容易上升。由于同步过程被分解为多段无法顺序处理的过程,所以异步IO通常还需要和状态机一起使用,这也增加了使用它的难度。
异步IO的例子一般比较长,这里就不举例了,有兴趣可以看一下aio/iocp。
场景三:高并发网络服务器
高并发服务器除了要处理场景一的情况外,可能还要结合场景二,多线程方案有时候完全不能接受,更多的是基于事件、异步IO或者是混合事件和多线程的模型。
WEB服务器是很好的例子,早期的实现一般是多线程或多进程,最近几年的实现大部分是混合事件和多线程。
如果不涉及到耗时的磁盘IO操作,并发服务器只需要使用基于事件的方式即可,比如memcached这类应用,单线程处理就可以达到很高效率。memcached为了充分利用多CPU的处理能力,也使用多线程来处理逻辑,一个线程负责接受连接,接受到的连接被分配给其中一个线程来处理。(题外话:memcached的SMP实现其实不算高效,大量的线程加锁都是完全可以避免的,改进的余地还很大)
这里主要关注并发服务器的基于IO事件处理部分,因为多线程通常是为了处理无法避免的阻塞调用或者是提升在SMP环境下的性能。常用的并发网络设计模式有Reactor/Proactor,分别用于处理IO事件和异步IO,但也可以反过来,比如Reactor + 线程池来模拟Proactor,Proactor + 自定义事件通知方式来模拟Reactor。
场景四:客户端并发应用
客户端并发很少被提及,这里我给出一个项目中实际遇到的问题。
很多大型网站是使用Socket通讯来调用后台的业务中间件的,为了扩展、重用和效率方面的需要,业务被分布在多台服务器,一个页面就可要调用用户资料、积分人气、统计信息、好友关系、邮件等多台中间件服务器。如果顺序调用,每个业务平均耗时20ms,一次请求5个调用就要耗掉100ms。看似很短的时间,但这100ms是花费在等待socket数据上,很可能WEB服务器负载并不高,但并发处理能力即上不去,如果这5个调用能同时进行,一共只需要20ms就完成了。既然无法避免占用线程 (CGI、PHP都需要进程),那么占用线程时间越短,越能够提高服务器的并发能力。
我们可以使用非阻塞connect和select来解决这个问题,先发起5个非阻塞connect,然后一起加入select,再结合状态机处理协议交互部分,当几个调用都结束时就可以渲染到页面了(View部分)。随着业务越来越复杂,更多协议需要状态机处理,有些业务之间还有依赖关系,这种方式编写也是很复杂的。
二、Coroutine的处理方式
Coroutine处理以上几种情况都有独到之处。
一、Coroutine处理并发状态机协议
不妨直接把上面多线程的例子拿过来用:
while(connected())
{
buffer = recv(2); // 接收2节字长度
int packlen = *(short*)buffer.c_str();
package = recv(packlen);
// 处理包内容
}
这和多线程的代码一样,但它不阻塞线程。recv可以实现为:
recv(len)
{
register_read_event();
wake_me_when_read_event_handled();
yield();
::recv(...);
// 返回buffer
}
它具有几个优点:
1、处理逻辑连贯性
2、几乎无限的并发。由于运行在少数几个或一个线程内,并发数仅限于内存。
3、统一的调用界面。调用代码完全看不出来非阻塞或异步IO的影子
二、Coroutine处理异步IO
由上面的例子可以想象,只要把异步IO纳入到事件系统中来,就可以和上面一样来处理。
IoLanguage就实现了一个AsyncRequest库,把aio平滑地整合进来,调用者还是像调用同步IO一样使用异步IO。
infile = open("1.txt");
outfile = open("2.txt", "w");
outfile.write(infile.read());
outfile.close();
infile.close();
这段简单的代码如果使用同步IO是很容易实现的,如果是异步IO,又没有现成的框架,可以想象一下需要多少代码量?如果逻辑再复杂一些?
异步IO与Coroutine并不直接整合,需要把异步IO的通知或者是回调转化成Coroutine调度,所以这部分需要调度器来完成。
三、Coroutine高并发服务器编程
Reactor模式是经典的基于事件的并发模式,使用Coroutine处理并发则可以Erlang为例,IoLanguage也有简单的处理方法。
Coroutine是什么
Coroutine又被称为协程,是一种用户级线程,不由操作系统调度。协程之间切换是在线程内进行的,开销比较小,通常又叫作轻量级线程。Windows上的Fiber和Linux上的ucontext都是coroutine的实现,也有第三方库实现的,比如跨平台的Coroutine库PCL,IoLanguage使用的LibCoroutine。
Coroutine的调度需要用户自己处理,Coroutine执行过程中不可被打断,只能由过程自己交出执行权,这时候上下文状态被保存。用户可以控制执行另一个Coroutine,也可以在任意时候恢复一个暂停的Coroutine。
Coroutine的调度是友好的,所以不需要使用临界区。本质上它只是运行在线程内,所以对多CPU的利用不够充分,需要自己实现多线程的调度器。
Erlang和IoLanguage的比较
Erlang在底层实现了的轻量级进程和消息模型,对应于Coroutine和Actor模型。Erlang实现了多线程的调度器以充分利用CPU,但在某些情况下反而会降低性能,比如频繁的进程间消息传递。
IoLanguage底层只实现了Coroutine,是作为库来提供的,它也在库里面实现了简单的调度器,并通过简单的语法实现了异步调用操作。
Erlang鼓励你创建进程,使用进程来模拟对象,使用消息来处理它们之间的交互。IoLanguage没有建议你做这种底层操作,它认为你应该把把这看成是异步操作,有些异步操作是不需要获得返回值的,有些异步操作需要等待返回值。
IoLanguage并没有和Erlang对应的消息模型,当然这不意味着它没办法做到,这只需要在调度器上扩展就能完成,只是大部分情况下不需要这样,即便是Erlang也会建议你减少这种“脏”代码。Erlang里面扩展核心模块很不方便,所以消息模型直接集成在语言平台上(效率是另一个原因,但我比较怀疑),而IoLanguage这种扩展能力很强的语言则尽量保持语言核心的小巧。
实现Erlang这样的并发平台并非只能在VM这一层完成,所有支持Coroutine的语言都可以使用库来完成一个类似模型系统,这只需要实现一个调度器。有些扩展能力不强的语言需要你重写一整套库来适应,有些语言则不然,它可以简单地替换一些关键部分就能容纳这个新的模型。
我给个简单的例子说明Erlang在处理某些并发情况时的局限性,依旧是上面的客户端并发例子,我需要并行调用几个远程服务来减少等待时间。
先看一个Erlang的例子:
Calls = [
fun() -> get_score("lijie") end,
fun() -> get_user_info("lijie") end,
fun() -> get_stat_info("lijie") end,
fun() -> get_friends_info("lijie") end,
fun() -> get_mails("lijie") end
],
Results = myutils:pmap(Calls, fun(Func) -> Func() end).
看起来挺不错的,不过缺点在于我需要等待几个调用全部结束才会转到页面渲染部分,并且我很不喜欢这个pmap,我只是要调用几个服务,为什么要接触这个东西?
(如果你有更好的做法请告诉我)
再看个IoLanguage的例子:
score := Service @get_score("lijie")
user_info := Service @get_user_info("lijie")
stat_info := Service @get_stat_info("lijie")
friends_info := Service @get_friends_info("lijie")
mails := Service @get_mails("lijie")
// 可以使用score, user_info, stat_info等变量了
调用很简单,返回的都是一个Proxy对象,只有你使用它时,它才会等待调用结果。
IoLanguage的异步实现还有一个好处,如果get_mails调用比较慢,比如需要30ms,其它几个调用各需要20ms,如果我们的mails部分显示在页面的最下方,那么可以在20ms-30ms之间这段时间可以先把mails以上的页面部分给渲染出来,完全不用等待所有调用一起结束,这可能会让页面渲染节省几ms,当然它没有节省CPU占用时间,但减少了线程被占用的时间。使用Erlang我知道也可以做到这一点,但绝对不是这么自然的方式。
Erlang给了我们一个强大的并发平台,但却强行让我们使用FP(据说全世界仅1.5%的程序员使用FP
Ruby并发应用的前景
前面大着胆子下了个断言:所有支持Coroutine的语言都可以使用库来完成一个并发模型系统,我坚信这一点。Ruby1.9也实现了Fiber,所以只需要实现一个调度器就可以完成一个简单的并发模型。Ruby的扩展能力比IoLanguage有过之而无不及,所以我相信使用Ruby也可以实现一个像IoLanguage这样的并发、异步框架。
评论
Thanks T1.
看来就差个Lazy Evaluation,不然都得显式地调用Score()。
Erlang里面异步调用这个麻烦事每次都要这么写?或许可以把你的field方法写成更通用的async_call:
看着还行,不过总得写这么些代码来包装。
挑点小毛病,Score()也不能多次调用,可能还要做点修改。
看来就差个Lazy Evaluation,不然都得显式地调用Score()。
Erlang里面异步调用这个麻烦事每次都要这么写?或许可以把你的field方法写成更通用的async_call:
render(Person)->
Score = async_call(fun() -> read(score, Person) end),
Stat_info= async_call(fun() -> read(stat_info, Person) end),
Friends_info= async_call(fun() -> read(friends_info, Person) end),
Mails = async_call(fun() -> read(mails, Person) end),
io:format("socre of ~p is ~p ~n",[socre,Score()]),
....
io:format("mails of ~p are ~p ~n",[socre,Mails()]).
看着还行,不过总得写这么些代码来包装。
挑点小毛病,Score()也不能多次调用,可能还要做点修改。
刚才想了一下,还应该有一种更为接近的方式
引用
Field(Field_Name,Person)->
Self=self(),
spawn(fun()->Self!{Field,read(Field,Person)}end),
fun()->
recieve {Field,Data}-> Data end.
render(Person)->
Score = Field(socre,Person),
Stat_info= Field(stat_info,Person),
Friends_info= Field(friends_info,Person),
Mails = Field(mails ,Person),
io:format("socre of ~p is ~p ~n",[socre,Score()]),
....
io:format("mails of ~p are ~p ~n",[socre,Mails()]).
Self=self(),
spawn(fun()->Self!{Field,read(Field,Person)}end),
fun()->
recieve {Field,Data}-> Data end.
render(Person)->
Score = Field(socre,Person),
Stat_info= Field(stat_info,Person),
Friends_info= Field(friends_info,Person),
Mails = Field(mails ,Person),
io:format("socre of ~p is ~p ~n",[socre,Score()]),
....
io:format("mails of ~p are ~p ~n",[socre,Mails()]).
其实还可以写的更简洁一点
引用
render(FieldList)->
lists:foreach(query,FieldList),
Value=fun(Field)-> recieve {Field,Data}-> Data end ,
Score=Value(score),
....
mail=Value(mail).
lists:foreach(query,FieldList),
Value=fun(Field)-> recieve {Field,Data}-> Data end ,
Score=Value(score),
....
mail=Value(mail).
引用
GC对于虚拟机调度的干扰.基于Share Heap的虚拟机的性能一定会存在瓶颈.
这方面还是可以提升的。如果不考虑smp支持的话,假定虚拟机以单线程执行,GC对调度影响不大。考虑到Coroutine被调度到不同线程中执行,私有堆可能是最佳解决办法,但从Erlang的测试情况来看,开启smp在性能上不一定能提高,特别是目前Erlang很多库都以消息通讯来调用的情况下。
-define(field(Field,Results),
(dict:fetch("score",Results))()).
query(Field)->
Self=self(),
spawn(fun()->Self!{Field,read(Field)}end).
render(FieldList)->
lists:foreach(query,FieldList),
Dict=dict:new();
Results=lists:fold(fun({Field,Value},Dict)->dict:append(Field,Value,Dict),[{Field,fun()-> recieve {Field,Data}->
Data end end}|Field<-FieldList]).
Score=?field(score,Results),
.....
Mail=?field(mails,Results).引用
看起来挺不错的,不过缺点在于我需要等待几个调用全部结束才会转到页面渲染部分,并且我很不喜欢这个pmap,我只是要调用几个服务,为什么要接触这个东西?
实际上,你提到的这个功能在FP语言里成为Lazy evaluation.这个特性在一般的FP语言里是一个标配,比如说Haskell的Lazy List
myprimelist = 2 : [x | x <- [3,5..], myprimecheck x (takeWhile (<x) myprimelist)]
[3,5....]代表一个无限长的List.这个List只有在做[H|T]时才对H进行求值.
但是Erlang为了降低FP的难度,取消了这个功能,要加入这个语法糖也是很小菜的事情
打补丁也有它的麻烦之处。除非你把所有东西都搞成Fiber,比如让Ruby程序默认就执行在一个Fiber里面,不然打了补丁可能影响原有的功能。IoLanguage/Erlang都是这样启动的。
要打补丁啊,把阻塞操作的部分换掉,改成其它方式。打补丁很简单,在ruby里面只要重写该方法就可以了。
这里有个IoLanguage的例子,它把aio包装成AsyncRequest库以后,对File库只做了这么点修改:
就是把实际执行阻塞读写的部分换成:
当然它的实现并不好,没有使用aio的回调通知或信号通知,轮询方式。
这里有个IoLanguage的例子,它把aio包装成AsyncRequest库以后,对File库只做了这么点修改:
//AsyncRequest turnOnForFiles := method(
File readToBufferLength := method(buffer, size,
request := AsyncRequest clone setDescriptor(self descriptor)
event := ReadEvent clone setDescriptorId(self descriptor)
pos := self position
request read(pos, size)
while(request isDone not, event waitOnOrExcept(1))
errormsg := request error
if(errormsg, Exception raise("AsyncRequest error: " .. errormsg); return nil)
request copyBufferTo(buffer)
self setPosition(pos + buffer size)
buffer
)
File readBufferOfLength := method(size,
buffer := Sequence clone
readToBufferLength(buffer, size)
buffer
)
File write := method(buffer,
request := AsyncRequest clone setDescriptor(descriptor)
event := WriteEvent clone setDescriptorId(descriptor)
pos := self position
request write(pos, buffer, 0, buffer size)
while(request isDone not, event waitOnOrExcept(1))
errormsg := request error
if(errormsg, Exception raise("AsyncRequest error: " .. errormsg); return nil)
//request sync
setPosition(pos + buffer size)
self
)
//)
就是把实际执行阻塞读写的部分换成:
register_event(); async_operate(); yield_and_wait_wakeup_event(); return_result();
当然它的实现并不好,没有使用aio的回调通知或信号通知,轮询方式。
很不错!的确如你所说,调度器的调度方式很可能是和应用的逻辑相关的,通用的调度器未必适合应用的需要。
有了调度器,还必须要对现有阻塞方式的IO库打一些补丁(ruby最擅长这个),否则会阻调度线程。
调度器和Fiber应该是运行在一个线程之内所以阻塞IO会带来问题。但如果使用多线程方式把可能引起阻塞的IO操作放在其他的线程当中运行呢?那就不需要patch IO库了吧。
引用
有了调度器,还必须要对现有阻塞方式的IO库打一些补丁(ruby最擅长这个),否则会阻调度线程。
调度器和Fiber应该是运行在一个线程之内所以阻塞IO会带来问题。但如果使用多线程方式把可能引起阻塞的IO操作放在其他的线程当中运行呢?那就不需要patch IO库了吧。
Ruby提供Fiber和本地线程是很合理的,这两个是并发编程的基本组件,调度器则可能是实现相关、平台相关的。一个好的调度器实际上是一个框架,完成应用中的通用逻辑,定制扩展标准。标准库当然可以提供,但不一定是你想要的。
这是我测试的一个简调度器:
测试:
运行结果:
真正实用的调度器需要支持一些更复杂的情况,比如:
1、IO事件,当IO事件触发时唤醒等待此事件的Fiber
2、超时事件,当IO事件超时或者计时器超时时,唤醒Fiber
3、用户自己挂起的Fiber,上面就是例子
4、支持消息模型
目前已经有一些不错的实现,比如上次simohayha提到的Revactor,以及这个项目页面上看到的python项目Eventlet和Kamaelia
有了调度器,还必须要对现有阻塞方式的IO库打一些补丁(ruby最擅长这个),否则会阻调度线程。
这是我测试的一个简调度器:
class Schedule
@@yielding_fibers = []
def self.yield(fiber=nil)
if fiber.nil?
@@yielding_fibers << @@current_fiber
Fiber.yield
else
@@yielding_fibers << fiber
@@current_fiber = fiber
end
end
def self.run
while @@yielding_fibers.size > 0
fiber = @@yielding_fibers.shift
@@current_fiber = fiber
fiber.resume
end
end
end
测试:
fib = Fiber.new do
3.times do
puts 1
Schedule.yield
end
end
Schedule.yield(fib)
fib = Fiber.new do
3.times do
puts 2
Schedule.yield
end
end
Schedule.yield(fib)
Schedule.run
运行结果:
1 2 1 2 1 2
真正实用的调度器需要支持一些更复杂的情况,比如:
1、IO事件,当IO事件触发时唤醒等待此事件的Fiber
2、超时事件,当IO事件超时或者计时器超时时,唤醒Fiber
3、用户自己挂起的Fiber,上面就是例子
4、支持消息模型
目前已经有一些不错的实现,比如上次simohayha提到的Revactor,以及这个项目页面上看到的python项目Eventlet和Kamaelia
有了调度器,还必须要对现有阻塞方式的IO库打一些补丁(ruby最擅长这个),否则会阻调度线程。
用Coroutine实现基于状态机的并发编程这部分我看懂了,感谢qiezi的精彩文章,后面的Erlang/IoLanguage不熟悉,没太理解。
就状态机这个例子,我大概明白了Coroutine的用途,可以用很简单的代码,方便的编写出来高性能的并发程序,而不需要像传统的状态机、异步IO和事件驱动编程实现并发那样复杂和烦琐。
最近Ruby 1.9发布以后,因为Fiber库的支持,似乎很多人都开始关注Coroutine,Ruby 1.9的Fiber也可以通过transfer来完整支持Coroutine,我看了Dave Thomas写的两篇博客,用Fiber来实现管道编程,但还没有qiezi的文章很简单清晰的说清楚Coroutine的作用。
我昨天写了个简单的demo测试了一下,Ruby 1.9的线程已经从1.8的用户线程改成了真正的内核线程,Fiber的推出似乎是在弥补内核多线程昂贵的调度开销问题吗?用户线程和内核线程各有各自的争议,Ruby 1.9既支持内核多线程来实现多核CPU的并行运算,又支持Coroutine来实现轻量级并发,似乎很讨巧,如果能有好的调度器来充分利用这两点,应该是一个很棒的并发库了。
不过让ruby程序员直接用Fiber写并发程序,似乎还是门槛高了一点,也许会有人提供一些高层抽象的Coroutine并发库来给普通的ruby程序员使用。当然,还是希望qiezi能用ruby 1.9的Fiber多举一些并发的例子,大家都好理解。
就状态机这个例子,我大概明白了Coroutine的用途,可以用很简单的代码,方便的编写出来高性能的并发程序,而不需要像传统的状态机、异步IO和事件驱动编程实现并发那样复杂和烦琐。
最近Ruby 1.9发布以后,因为Fiber库的支持,似乎很多人都开始关注Coroutine,Ruby 1.9的Fiber也可以通过transfer来完整支持Coroutine,我看了Dave Thomas写的两篇博客,用Fiber来实现管道编程,但还没有qiezi的文章很简单清晰的说清楚Coroutine的作用。
我昨天写了个简单的demo测试了一下,Ruby 1.9的线程已经从1.8的用户线程改成了真正的内核线程,Fiber的推出似乎是在弥补内核多线程昂贵的调度开销问题吗?用户线程和内核线程各有各自的争议,Ruby 1.9既支持内核多线程来实现多核CPU的并行运算,又支持Coroutine来实现轻量级并发,似乎很讨巧,如果能有好的调度器来充分利用这两点,应该是一个很棒的并发库了。
不过让ruby程序员直接用Fiber写并发程序,似乎还是门槛高了一点,也许会有人提供一些高层抽象的Coroutine并发库来给普通的ruby程序员使用。当然,还是希望qiezi能用ruby 1.9的Fiber多举一些并发的例子,大家都好理解。
Ruby的扩展能力也足够强了。
我尝试把IoLanguage的Socket模块实现方式移植到Ruby里面,发现还是比较容易实现的,目前已经完成了Event/EventManager这部分,也是使用LibEvent做的,C模块就只有这点工作,其它上层都是在Ruby代码里完成,目标是完成一个和IoLanguage一样的库,包括Socket/AIO,还有一个IoLanguage的异步模拟,不一定有实用性,但可以体验使用不同的方式编写并发程序。
Fiber还没有整合进来,不过我会参考IoLanguage实现一个调度器,也当作是熟悉IoLanguage的一个方法。
我尝试把IoLanguage的Socket模块实现方式移植到Ruby里面,发现还是比较容易实现的,目前已经完成了Event/EventManager这部分,也是使用LibEvent做的,C模块就只有这点工作,其它上层都是在Ruby代码里完成,目标是完成一个和IoLanguage一样的库,包括Socket/AIO,还有一个IoLanguage的异步模拟,不一定有实用性,但可以体验使用不同的方式编写并发程序。
Fiber还没有整合进来,不过我会参考IoLanguage实现一个调度器,也当作是熟悉IoLanguage的一个方法。
ShiningRay
2008-02-19
回复
Io的强大我认为在于它吸收了Self的基于原型的面向对象编程
而且我看好Io对语言的扩展能力比Ruby强很多
而且我看好Io对语言的扩展能力比Ruby强很多
发表评论
链接
最新评论
-
并行/分布式集群的一点想 ...
还没遇到,我做的就是一个map/reduce框架,没涉及到状态。
-- by fredzhang -
并行/分布式集群的一点想 ...
我这边也大量使用了这种方式,目前还在逐步推广中。。 如果是有状态的应用,数据可 ...
-- by qiezi -
并行/分布式集群的一点想 ...
我现在做的一个框架采用了consistent hashing,这样服务器可以动态 ...
-- by fredzhang -
开发人员变成运维人员—— ...
自认为还是能发现别人的一些闪光点的,并且和他们一起进步。以前那一组同事从来没听过 ...
-- by qiezi -
开发人员变成运维人员—— ...
哈哈,全是错别字,怎么不让改呀。
-- by qiezi







评论排行榜