|
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
|---|---|
| 作者 | 正文 |
|
时间:2007-09-15 关键字: Erlang风格
琢磨了好久,一直没时间来做它。在讨论这个问题的时候就已经有这想法了,后来发现tango里已经有Fiber的实现,昨天终于抽了点时间做了个简单的小玩意,离真实应用还差得很远。
测试代码:
import light_process;
import tango.io.Stdout;
void test() {
Stdout("test").newline;
receive(
(Process pid, float f) {
Stdout("test: receive float: ", f).newline;
pid.send(f);
},
(Process pid, int i) {
Stdout("test: receive int: ", i).newline;
pid.send(i * 2);
},
(NotMatched) {
Stdout("test: not matched").newline;
}
).timeout(3000, {
Stdout("test: timeout").newline;
}).end;
test();
}
void test1(Process pid){
Stdout("test1").newline;
//sleep(1);
pid.send(self(), 7.7f);
pid.send(self(), cast(int)3);
test1_loop();
}
void test1_loop() {
receive(
(float f) {
Stdout("test1: received result: ", f).newline;
},
(int value) {
Stdout("test1: received result: ", value).newline;
}
).timeout(3000, {
Stdout("test1: timeout").newline;
}).end;
test1_loop();
}
void sleep(size_t usecs) {
receive(
).timeout(usecs, {
}).end;
}
void de_start() {
auto pid = spawn(&test);
spawn(&test1, pid);
//sleep(3);
}
void main(){
de_start();
Process.run();
}
实现:
module light_process;
import tango.core.Thread;
import tango.core.Traits;
class NotMatched{
}
class Message {
TypeInfo typeInfo__;
public this(TypeInfo typeInfo__) {
this.typeInfo__ = typeInfo__;
}
}
class MessageT(Types...) : public Message{
Types values;
public this(Types values_){
super(typeid(typeof(Types)));
foreach(i, e; values_) {
values[i] = values_[i];
}
}
}
class Delegate {
abstract void call(Message msg);
abstract bool matchType(TypeInfo typeInfo__);
}
class DelegateT(Types...) : public Delegate{
void delegate(Types) dg;
void function(Types) func;
this(void delegate(Types) dg){
this.dg = dg;
}
this(void function(Types) func){
this.func = func;
}
override void call(Message msg) {
auto message = cast(MessageT!(Types))msg;
if (dg)
dg(message.values);
else
func(message.values);
}
override bool matchType(TypeInfo typeInfo__) {
if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types))
return true;
return false;
}
}
class Process : public Fiber{
// static members
public static Process getThis() {
return cast(Process)Fiber.getThis();
}
private static Process[] processes;
// instance members
private Message[] messages;
private Delegate[] handlers;
public this() {
super(&exec);
}
public void clearMessageHandlers() {
handlers.length = 0;
}
public void addMessageHandler(Delegate dg){
handlers ~= dg;
}
public Process timeout(int timeout, void delegate() dg) {
return this;
}
public void end() {
Fiber.yield();
domsg();
}
public Process send(Args ...)(Args args) {
messages ~= new MessageT!(Args)(args);
return this;
}
public void domsg() {
while (!messages.length) {
Fiber.yield();
}
Message msg = messages[0];
foreach(dg; handlers) {
if (dg.matchType(msg.typeInfo__)) {
dg.call(msg);
messages = messages[1..$];
break;
}
}
}
public static void run() {
while(true) {
foreach(process; Process.processes) {
if (process.state != Fiber.State.TERM) {
process.call();
}
}
}
}
protected abstract void exec();
}
class ProcessT(Ret, Args...) : public Process {
Ret delegate(Args) dg;
Ret function(Args) func;
Args args;
public this(Ret delegate(Args) dg, Args args) {
this.dg = dg;
foreach(i, e; args) {
this.args[i] = e;
}
}
public this(Ret function(Args) func, Args args) {
this.func = func;
foreach(i, e; args) {
this.args[i] = e;
}
}
public void exec() {
if (dg)
dg(args);
else
func(args);
}
}
Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) {
Process p = new ProcessT!(Ret, Args)(dg, args);
Process.processes ~= p;
return p;
}
Process spawn(Ret, Args...)(Ret function(Args) func, Args args) {
Process p = new ProcessT!(Ret, Args)(func, args);
Process.processes ~= p;
return p;
}
Process receive(Dgs ...)(Dgs dgs) {
Process self = Process.getThis();
self.clearMessageHandlers();
foreach(i, dg; dgs) {
Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))
(dgs[i]);
self.addMessageHandler(handler);
}
return self;
}
Process self() {
return Process.getThis();
}
依赖tango。 目前只实现了基本的消息发送接收,为了简单目前使用内置数组来保存数据。整个驱动过程是对所有进程顺序进行调度,没有过优化是否有需要调度,只是快速做了个原型。timeout功能也没有实现,这个有时间可以重写调度部分来完成,因为这个部分就是随意写了一个,测试是否有可能实现类似erlang的高度过程。所以这个驱动过程实际上是个死循环,测试几秒就强行结束程序吧,CPU被烧本人概不负责。 递归是比较麻烦的,不知道D有没有优化尾递归,我想应该还是有的吧,不然上面那个程序应该早就溢出了。receive的各个委托参数里是没办法优化成尾递归的,解决办法当然有,就是麻烦点,谁叫咱不是FP语言呢。。(这年头,不是FP你都感觉抬不起头。。。) 声明:JavaEye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
| 返回顶楼 | |
|
时间:2007-09-15
听说FP并行有优势,但是俺命令式陷入太深了!
|
|
| 返回顶楼 | |
|
时间:2007-09-15
language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错. D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数. BTW: C++ 的版本也是用 thread 的, 性能比D差. |
|
| 返回顶楼 | |
|
时间:2007-09-15
Colorful 写道 听说FP并行有优势,但是俺命令式陷入太深了!
是更容易做成并行,但这也依赖于实现。 |
|
| 返回顶楼 | |
|
时间:2007-09-15
redsea 写道 language shootout 那里有个 cheap concurrent 测试, 就是测试类似能力的.
er-lang 的表现不错. D 的程序记得没有用 fiber, 被人用 thread 去实现的, qiezi 不妨去提交一个 fiber 的版本, 提高D的分数. BTW: C++ 的版本也是用 thread 的, 性能比D差. fiber不一定会比thread性能高的,特别是大量计算的只会降低效率。只是更容易生成大量的“线程”,提高并行能力。如果实现erlang这种架构,就可以让程序写起来更方便,用同步的写法就完成了异步的调用,对于库的编写要求会比较高。 上面这个测试程序只是简单测试一下可行性,看有没有人有兴趣一起研究一下,这种架构最主要的难点在于调度,不过这是个不断优化的过程,关键是看有没有简化编程。D的垃圾收集对于委托的影响还没测试到呢,它和fiber的配合程度怎样也不清楚,毕竟fiber的堆栈是要不断切换的,而D的GC在扫描堆栈时只是扫描线程栈。 |
|
| 返回顶楼 | |
|
时间:2007-09-15
language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.
这个测试对 er-lang 来说, 简直是量身定做 |
|
| 返回顶楼 | |
|
时间:2007-09-15
redsea 写道 language shootout 里面那个测试基本没有实质计算, 每个 cheap thread 将自己收到的 token 传递给下一个 cheap thread 而已. 测试当中可以关闭 gc.
这个测试对 er-lang 来说, 简直是量身定做 :) 原来如此,这个测试D/C++成绩也太差了,线程开销还是太大了些,应该很容易用轻量级线程来提高吧。目前首先要做的是做个调度器,先让它跑起来再说。 |
|
| 返回顶楼 | |
|
时间:2007-09-16
tango的fibre据说是assertfalse.com mikolalysenko的coroutine。language shootout的coroutine测试我不太懂其他语言的实现,然而感觉是对D不公平的,D用的是系统的线程。其他的可能根本就没有启动系统线程
|
|
| 返回顶楼 | |
|
时间:2007-09-17
看样子要跑过erlang很难啊,那里有一个gcc的测试,使用ucontext的,竟然比python还差,更不用说erlang了。。
|
|
| 返回顶楼 | |
|
时间:2007-09-18
做了个可工作的版本,完成cheap_concurrency的功能,结果是正确的,不过效率那是相当地低,主要是调度部分并没有优化。优化的策略就是每次只调度处于receive状态并且有message的进程,有空再进行,可能要这周末了。先发出来,有兴趣的不妨改改看。纯属好玩,不一定有什么价值。
cheap_concurrency测试代码是从erlang版本改过来的,不过由于递归会导致堆栈溢出,所以这里改成了循环。 1.cheap_concurrency.d
import light_process;
import tango.io.Stdout;
import tango.text.convert.Integer;
void de_main(int N) {
Process Last = start(500, self());
Stdout(sendtimes(N, Last, 0)).newline;
}
Process start(int X, Process LastPID) {
if (X == 0) return LastPID;
return start(X-1, spawn(&loop, LastPID));
}
void loop(Process LastPID) {
while(true) {
receive(
(int N) {
LastPID.send(N+1);
}
).end;
}
}
int sendtimes(int N, Process Last, int X) {
for(int i=0; i<N; i++) {
Last.send(0);
int y;
receive(
(int Y) {
y = Y;
}
).end;
X += y;
}
return X;
}
void main(char[][] args){
if (args.length != 2){
Stdout("USAGE: " ~ args[0] ~ " N").newline;
return;
}
int n = toInt(args[1]);
spawn(&de_main, n);
Process.run();
}
2.light_process.d
module light_process;
import tango.io.Stdout;
import tango.core.Thread;
import tango.core.Traits;
import tango.util.collection.LinkSeq;
class NotMatched{
}
class Message {
TypeInfo typeInfo__;
public this(TypeInfo typeInfo__) {
this.typeInfo__ = typeInfo__;
}
}
class MessageT(Types...) : public Message{
Types values;
public this(Types values_){
super(typeid(typeof(Types)));
foreach(i, e; values_) {
values[i] = values_[i];
}
}
}
class Delegate {
abstract void call(Message msg);
abstract bool matchType(TypeInfo typeInfo__);
}
class DelegateT(Types...) : public Delegate{
void delegate(Types) dg;
void function(Types) func;
this(void delegate(Types) dg){
this.dg = dg;
}
this(void function(Types) func){
this.func = func;
}
override void call(Message msg) {
auto message = cast(MessageT!(Types))msg;
if (dg)
dg(message.values);
else
func(message.values);
}
override bool matchType(TypeInfo typeInfo__) {
if (typeid(Types) is typeid(NotMatched) || typeInfo__ is typeid(Types))
return true;
return false;
}
}
class Process : public Fiber{
// static members
public static Process getThis() {
return cast(Process)Fiber.getThis();
}
// receiving processes
private static bool[Process] receivingProcesses;
// instance members
private LinkSeq!(Message) messages;
private LinkSeq!(Delegate) handlers;
public this() {
super(&exec);
messages = new LinkSeq!(Message);
handlers = new LinkSeq!(Delegate);
}
public void startReceiving() {
receivingProcesses[this] = true;
}
public void stopReceiving() {
receivingProcesses.remove(this);
}
public void clearMessageHandlers() {
handlers.clear();
}
public void addMessageHandler(Delegate dg){
handlers.append(dg);
}
public Process timeout(int timeout, void delegate() dg) {
return this;
}
public void end() {
Fiber.yield();
domsg();
}
public Process send(Args ...)(Args args) {
messages.append(new MessageT!(Args)(args));
return this;
}
public void domsg() {
while(messages.size() == 0) {
Fiber.yield();
}
Message msg = messages.head();
foreach(dg; handlers) {
if (dg.matchType(msg.typeInfo__)) {
dg.call(msg);
messages.removeHead();
break;
}
}
}
public static void run() {
while(true) {
foreach(process, dummy; Process.receivingProcesses) {
if (process.state != Fiber.State.TERM) {
process.call();
}
}
}
}
protected abstract void exec();
}
class ProcessT(Ret, Args...) : public Process {
Ret delegate(Args) dg;
Ret function(Args) func;
Args args;
public this(Ret delegate(Args) dg, Args args) {
this.dg = dg;
foreach(i, e; args) {
this.args[i] = e;
}
}
public this(Ret function(Args) func, Args args) {
this.func = func;
foreach(i, e; args) {
this.args[i] = e;
}
}
public void exec() {
if (dg)
dg(args);
else
func(args);
}
}
Process spawn(Ret, Args...)(Ret delegate(Args) dg, Args args) {
Process p = new ProcessT!(Ret, Args)(dg, args);
p.call();
return p;
}
Process spawn(Ret, Args...)(Ret function(Args) func, Args args) {
Process p = new ProcessT!(Ret, Args)(func, args);
p.call();
return p;
}
Process receive(Dgs ...)(Dgs dgs) {
Process pid = self();
pid.clearMessageHandlers();
foreach(i, dg; dgs) {
Delegate handler = new DelegateT!(ParameterTupleOf!(typeof(dgs[i])))(dgs[i]);
pid.addMessageHandler(handler);
}
pid.startReceiving();
return pid;
}
Process self() {
return Process.getThis();
}
|
|
| 返回顶楼 | |




