前面说地Generator编写并发程序的优势,当然它本身没有这种能力,需要为它编写调度程序。

今天抽点时间写了个简单的,还是满好玩的,它可以调度多个“友好”的并发任务,包括:
1、用户自己编写的适时交出控制权的过程
2、网络IO

由于对C#不是很熟,所以写的可能比较难看,而且IO也只支持网络,有兴趣的可以研究改进一下,比如把它改成SMP版本,增加Actor模型,处理更多的IO模式等。

实现方式基本上是从IoLanguage里面抄过来的,我已经用它编写过Ruby/C++/D/C#版本,当然目前仅限于测试它的切换性能。

好消息是如果你实现得比较好,C#版本性能完全可以超过Erlang,Ruby的Fiber就比较差一些,C++/D里面使用Fiber/ucontext性能也不是很好,Generator性能是最好的。

由于时间短,只测试了ReadEvent,WriteEvent没有测试,这个也是有多种实现方式可以研究的。这里实现的调度器性能不是很好,可能是List用法性能太差,没有仔细测试。
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Net;


namespace ConsoleApplication1
{
    public enum EventType
    {
        ReadEvent = 1,
        WriteEvent = 2
    }

    public class Event
    {
        private Socket sock;
        private EventType eventType;
        private IEnumerator<int> task;

        public Event(Socket sock, EventType eventType)
        {
            this.sock = sock;
            this.eventType = eventType;
        }

        public Socket getSocket()
        {
            return sock;
        }

        public EventType getEventType()
        {
            return eventType;
        }

        public void setTask(IEnumerator<int> task)
        {
            this.task = task;
        }

        public IEnumerator<int> getTask()
        {
            return task;
        }

        public void onEvent()
        {
            Console.WriteLine("onEvent");
            Scheduler.scheduleNow(task);
        }
    }

    public class ReadEvent : Event
    {
        public ReadEvent(Socket sock)
            :base(sock, EventType.ReadEvent)
        {
            
        }
    }

    public class WriteEvent : Event
    {
        public WriteEvent(Socket sock)
            : base(sock, EventType.WriteEvent)
        {
        }
    }

    class EventManager
    {
        private IDictionary<Socket, Event> readEvents = new Dictionary<Socket, Event>();
        private IDictionary<Socket, Event> writeEvents = new Dictionary<Socket, Event>();

        private IEnumerator<int> myTask = null;

        public int registerHandler(Event ev)
        {
            Console.WriteLine("registerHandler");
            if ((ev.getEventType() & EventType.ReadEvent) > 0)
            {
                readEvents.Add(ev.getSocket(), ev);
            }
            else
            {
                writeEvents.Add(ev.getSocket(), ev);
            }

            if (myTask == null)
                myTask = runLoop();


            Scheduler.scheduleNow(myTask);

            return 0;
        }

        public IEnumerator<int> runLoop()
        {
            Console.WriteLine("enter run");
            do
            {
                Console.WriteLine("run loop");
                yield return 1;

                IList rlist = new List<Socket>(readEvents.Keys);
                IList wlist = new List<Socket>(writeEvents.Keys);

                int timeoutMS = 1000000;
                if (Scheduler.hasEvents()) timeoutMS = 0;
                //Console.WriteLine("begin select: " + rlist.Count + ", " + wlist.Count);
                Socket.Select(rlist, wlist, null, timeoutMS);
                //Console.WriteLine("end select: " + rlist.Count + ", " + wlist.Count);

                Socket sock = null;
                Event ev = null;

                if (rlist.Count > 0)
                {
                    sock = (Socket)rlist[0];
                    readEvents.TryGetValue(sock, out ev);
                    readEvents.Remove(sock);
                }
                else if (wlist.Count > 0)
                {
                    sock = (Socket)wlist[0];
                    writeEvents.TryGetValue(sock, out ev);
                    writeEvents.Remove(sock);
                }

                if (ev != null)
                    ev.onEvent();
            } while (readEvents.Count != 0 || writeEvents.Count != 0);

            myTask = null;
        }
    }

    public static class Scheduler
    {
        public delegate IEnumerable<int> T();

        [ThreadStatic]
        private static IList<IEnumerator<int>> tasks = new List<IEnumerator<int>>();

        [ThreadStatic]
        private static EventManager eventManager = new EventManager();

        [ThreadStatic]
        private static Event lastEvent;

        public static bool hasEvents()
        {
            return tasks.Count > 0;
        }

        public static void spawn(T del)
        {
            var task = del().GetEnumerator();
            schedule(task);
        }

        public static void schedule(IEnumerator<int> task)
        {
            tasks.Remove(task);
            if (lastEvent != null)
                lastEvent.setTask(task);
            else
                tasks.Add(task);
            lastEvent = null;
        }

        public static void scheduleNow(IEnumerator<int> task)
        {
            tasks.Remove(task);
            tasks.Insert(0, task);
        }

        public static void remove(IEnumerator<int> task)
        {
            Console.WriteLine("remove task");
            tasks.Remove(task);
        }

        public static void run()
        {
            while (tasks.Count > 0)
            {
                IEnumerator<int> task = tasks[0];
                tasks.RemoveAt(0);
                bool ret = task.MoveNext();

                if (ret)
                    schedule(task);
            }
        }

        public static int registerAndWaitEvent(Event ev)
        {
            lastEvent = ev;
            return eventManager.registerHandler(ev);
        }
    }

    // 测试程序:

    class Program
    {
        public static IEnumerable<int> test(string id)
        {
            for (int i = 0; i < 5; i++)
            {
                System.Console.WriteLine("id: " + id + ", " +i);
                yield return i;
            }
        }

        public static IEnumerable<int> clientLoop(Socket client)
        {
            while (true)
            {
                Console.WriteLine("client loop");
                yield return Scheduler.registerAndWaitEvent(new ReadEvent(client));
                byte[] buffer = new byte[1024];
                int ret = client.Receive(buffer);
                if (ret > 0)
                {
                    //Console.WriteLine(buffer);
                    client.Send(buffer);
                }
                else
                {
                    client.Close();
                    break;
                }
            }
        }

        public static IEnumerable<int> serverLoop()
        {
            Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            IPHostEntry IPHost = Dns.GetHostEntry("localhost"); 
            sock.Bind(new IPEndPoint(IPHost.AddressList[0], 23456));
            sock.Listen(128);
            while(true) {
                Console.WriteLine("server loop");
                yield return Scheduler.registerAndWaitEvent(new ReadEvent(sock));
                Console.WriteLine("accept event");
                Socket client = sock.Accept();
                Scheduler.spawn(() => clientLoop(client));
            }
        }

        static void Main(string[] args)
        {
            Scheduler.spawn(() => test("a"));
            Scheduler.spawn(() => test("b"));
            Scheduler.spawn(() => serverLoop());
            Scheduler.run();
        }
    }
}
评论
vdgame 2008-07-07   回复
不懂C#,老兄能不能给个c++的代码?谢谢先!
发表评论

您还没有登录,请登录后发表评论

qiezi
搜索本博客
存档
最新评论